Introduction
Welcome to the rsActor User Guide!
rsActor is a lightweight, Tokio-based actor framework in Rust focused on providing a simple and efficient actor model for local, in-process systems. It emphasizes clean message-passing semantics and straightforward actor lifecycle management while maintaining high performance for Rust applications.
This guide will walk you through the core concepts, features, and usage patterns of rsActor to help you build robust and concurrent applications.
Note: This project is actively evolving. While core APIs are stable, some features may be refined in future releases.
Core Features
- Minimalist Actor System: Focuses on core actor model primitives without unnecessary complexity.
- Message Passing: Comprehensive communication patterns (
ask,tell, timeout variants, blocking versions). - Clean Lifecycle Management:
on_start,on_run, andon_stophooks provide intuitive actor lifecycle control. - Graceful Termination: Actors can be stopped gracefully or killed immediately, with differentiated cleanup via the
killedparameter. - Rich Result Types:
ActorResultenum captures detailed completion states and error information. - Macro-Assisted Development:
#[message_handlers]and#[derive(Actor)]reduce boilerplate significantly. - Type Safety: Compile-time type safety with
ActorRef<T>prevents runtime type errors. - Handler Traits:
TellHandlerandAskHandlerenable type-erased message sending for heterogeneous actor collections. - Actor Control:
ActorControltrait provides type-erased lifecycle management. - Weak References:
ActorWeak<T>prevents circular dependencies without memory leaks. - Dead Letter Tracking: Automatic logging of undelivered messages with structured tracing.
- Optional Metrics: Per-actor performance metrics (message count, processing time, error count) via the
metricsfeature. - Optional Tracing Instrumentation:
#[tracing::instrument]spans for observability via thetracingfeature. - Minimal Constraints: Only
Sendtrait required for actor structs, enabling flexible internal state management.
Getting Started
This section will guide you through setting up rsActor in your project and running a basic example.
1. Add Dependency
To use rsActor in your Rust project, add it as a dependency in your Cargo.toml file:
[dependencies]
rsactor = "0.14" # Check crates.io for the latest version
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
Make sure to replace "0.14" with the latest version available on crates.io.
2. Basic Usage Example
Let’s create a simple counter actor. This actor will maintain a count and increment it when it receives an IncrementMsg.
use rsactor::{Actor, ActorRef, message_handlers, spawn};
use anyhow::Result;
use tracing::info;
// Define actor struct
#[derive(Debug)] // Added Debug for printing the actor in ActorResult
struct CounterActor {
count: u32,
}
// Implement Actor trait
impl Actor for CounterActor {
type Args = u32; // Define an args type for actor creation
type Error = anyhow::Error;
// on_start is required and must be implemented.
// on_run and on_stop are optional and have default implementations.
async fn on_start(initial_count: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!("CounterActor (id: {}) started. Initial count: {}", actor_ref.identity(), initial_count);
Ok(CounterActor {
count: initial_count,
})
}
}
// Define message types
struct Increment(u32);
// Use message_handlers macro with handler attributes
#[message_handlers]
impl CounterActor {
#[handler]
async fn handle_increment(&mut self, msg: Increment, _: &ActorRef<Self>) -> u32 {
self.count += msg.0;
self.count
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init(); // Initialize tracing
info!("Creating CounterActor");
let (actor_ref, join_handle) = spawn::<CounterActor>(0u32); // Pass initial count
info!("CounterActor spawned with ID: {}", actor_ref.identity());
let new_count: u32 = actor_ref.ask(Increment(5)).await?;
info!("Incremented count: {}", new_count);
actor_ref.stop().await?;
info!("Stop signal sent to CounterActor (ID: {})", actor_ref.identity());
let actor_result = join_handle.await?;
info!(
"CounterActor (ID: {}) task completed. Result: {:?}",
actor_ref.identity(),
actor_result
);
Ok(())
}
Key Points Demonstrated:
- Actor State Management:
CounterActorencapsulates its state (count) and provides controlled access through messages. - Type-Safe Communication: The
Message<T>trait ensures compile-time verification of message handling. - Lifecycle Control:
spawncreates the actor,askcommunicates with it, andstopterminates it gracefully. - Error Handling: Proper use of
Resulttypes and?operator for clean error propagation.
This example shows the basic actor pattern that forms the foundation for more complex actor systems.
Core Concepts
This section delves into the fundamental concepts of the rsActor framework. Understanding these concepts is key to effectively using the library.
What is an Actor?
What is an Actor?
In rsActor, an actor is an independent computational entity that encapsulates:
- State: Actors can have internal state that they manage exclusively. No other part of the system can directly access or modify an actor’s state.
- Behavior: Actors define how they react to messages they receive. This behavior is implemented through message handlers.
- Mailbox: Each actor has a mailbox where incoming messages are queued. Messages are processed one at a time, ensuring that an actor’s state is modified sequentially and avoiding data races.
Key Characteristics of Actors in rsActor:
- Isolation: An actor’s internal state is protected from direct external access. Communication happens solely through messages.
- Concurrency: Actors can run concurrently with each other.
rsActorleverages Tokio to manage these concurrent tasks. - Asynchronous Operations: Actors primarily operate asynchronously, allowing them to perform non-blocking I/O and other long-running tasks efficiently.
- Lifecycle: Actors have a defined lifecycle, including startup, execution, and shutdown phases, which developers can hook into.
The Actor Trait
To define an actor in rsActor, you implement the Actor trait for your struct:
#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, ActorWeak};
use anyhow::Result;
#[derive(Debug)]
struct MyActor {
// ... internal state
}
impl Actor for MyActor {
type Args = (); // Arguments needed to create the actor
type Error = anyhow::Error; // Error type for lifecycle methods
async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
// Initialize and return the actor instance
println!("MyActor (id: {}) started with args: {:?}", actor_ref.identity(), args);
Ok(MyActor { /* ... initial state ... */ })
}
async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
// Optional: Idle handler called when the message queue is empty.
// Return Ok(true) to continue calling on_run, Ok(false) to stop idle processing.
println!("MyActor (id: {}) idle processing.", actor_weak.identity());
Ok(false) // Stop idle processing, only handle messages
}
async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
// Optional: Called when the actor is stopping.
// `killed` indicates whether the actor was stopped gracefully (false) or killed (true).
if killed {
println!("MyActor (id: {}) was killed.", actor_weak.identity());
} else {
println!("MyActor (id: {}) is stopping gracefully.", actor_weak.identity());
}
Ok(())
}
}
}
Args: An associated type defining the arguments required by theon_startmethod to initialize the actor.Error: An associated type for the error type that lifecycle methods can return. Must implementSend + Debug.on_start: A required asynchronous method called when the actor is first created and started. It receives the initialization arguments and anActorRefto itself. It should return aResultcontaining the initialized actor instance or an error.on_run: An optional idle handler called when the message queue is empty. It receives anActorWeak(weak reference). ReturnOk(true)to continue callingon_run, orOk(false)to stop idle processing. Defaults toOk(false).on_stop: An optional cleanup method. It receives anActorWeakand akilledboolean indicating whether termination was graceful (false) or forced viakill()(true).
By encapsulating state and behavior, actors provide a powerful model for building concurrent and resilient applications.
Actor References (ActorRef)
Actor References (ActorRef)
An ActorRef<A> is a handle or a reference to an actor of type A. It is the primary way to interact with an actor from outside. You cannot directly access an actor’s state or call its methods. Instead, you send messages to it via its ActorRef.
Key Features of ActorRef:
- Message Sending:
ActorRefprovides methods likeask,tell,ask_with_timeout, andtell_with_timeoutto send messages to the associated actor. - Type Safety:
ActorRef<A>is generic over the actor typeA. This ensures that you can only send messages that the actorAis defined to handle, providing compile-time safety. - Decoupling: It decouples the sender of a message from the actor itself. Senders don’t need to know the actor’s internal implementation details, only the messages it accepts.
- Location Transparency (Conceptual): While
rsActoris currently focused on in-process actors, theActorRefconcept is fundamental to actor systems and can be extended to support remote actors in the future. The reference itself abstracts away the actor’s actual location. - Lifecycle Management:
ActorRefalso provides methods to manage the actor’s lifecycle, such asstop()andkill(). - Identity: Each
ActorRefhas a uniqueidentity()that can be used for logging or tracking purposes.
Obtaining an ActorRef
An ActorRef is typically obtained when an actor is spawned:
use rsactor::{spawn, Actor, ActorRef, Message, impl_message_handler};
use anyhow::Result;
#[derive(Debug)]
struct MyActor;
impl Actor for MyActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_args: Self::Args, _actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(MyActor)
}
}
// Dummy message for demonstration
struct PingMsg;
impl Message<PingMsg> for MyActor {
type Reply = ();
async fn handle(&mut self, _msg: PingMsg, _actor_ref: &ActorRef<Self>) -> Self::Reply {}
}
impl_message_handler!(MyActor, [PingMsg]);
#[tokio::main]
async fn main() {
let (actor_ref, join_handle) = spawn::<MyActor>(());
// actor_ref is an ActorRef<MyActor>
// You can now use actor_ref to send messages to MyActor
actor_ref.tell(PingMsg).await.unwrap();
actor_ref.stop().await.unwrap();
join_handle.await.unwrap();
}
Type Safety
rsActor provides compile-time type safety through ActorRef<A>. This ensures that only messages that the actor can handle are sent, and reply types are correctly typed at compile time.
The type-safe approach prevents runtime type errors and provides better IDE support with autocomplete and type checking. All actor communication should use ActorRef<A> for the best development experience and runtime safety.
ActorRef<A> is cloneable and can be safely shared across tasks.
Messages
Messages
Messages are the sole means of communication between actors in rsActor. They are plain Rust structs or enums that carry data from a sender to a recipient actor.
Defining Messages
A message can be any struct or enum. It’s a common practice to define messages specific to the interactions an actor supports.
#![allow(unused)]
fn main() {
// Example message structs
struct GetData { id: u32 }
struct UpdateData { id: u32, value: String }
// Example message enum
enum CounterCommand {
Increment(u32),
Decrement(u32),
GetValue,
}
}
The Message<T> Trait
For an actor to handle a specific message type T, the actor must implement the Message<T> trait. This trait defines how the actor processes the message and what kind of reply (if any) it produces.
#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, Message, impl_message_handler};
use anyhow::Result;
#[derive(Debug)]
struct DataStore { content: String }
impl Actor for DataStore {
type Args = String;
type Error = anyhow::Error;
async fn on_start(initial_content: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(DataStore { content: initial_content })
}
}
// Message to retrieve current content
struct GetContent;
impl Message<GetContent> for DataStore {
type Reply = String;
async fn handle(&mut self, _: GetContent, _: &ActorRef<Self>) -> Self::Reply {
self.content.clone()
}
}
// Message to update content
struct SetContent(String);
impl Message<SetContent> for DataStore {
type Reply = (); // No direct reply needed
async fn handle(&mut self, msg: SetContent, _: &ActorRef<Self>) -> Self::Reply {
self.content = msg.0;
}
}
impl_message_handler!(DataStore, [GetContent, SetContent]);
}
Key components of the Message<T> trait implementation:
type Reply: An associated type that specifies the type of the value the actor will send back to the sender if the message was sent usingask. If the message doesn’t warrant a direct reply (e.g., fortelloperations),()can be used.async fn handle(&mut self, msg: T, actor_ref: &ActorRef<Self>) -> Self::Reply: This asynchronous method contains the logic for processing the messagemsgof typeT.- It takes a mutable reference to the actor’s state (
&mut self), allowing it to modify the actor’s internal data. - It also receives a reference to the actor’s own
ActorRef, which can be useful for various purposes, such as spawning child actors or sending messages to itself. - It must return a value of type
Self::Reply.
- It takes a mutable reference to the actor’s state (
Message Immutability
Once a message is sent, it should be considered immutable by the sender. The actor receiving the message effectively takes ownership of the data within the message (or a copy of it).
Message Design Principles:
- Clarity: Messages should clearly represent the intended operation or event.
- Granularity: Design messages that are neither too coarse (bundling unrelated operations) nor too fine-grained (leading to chatty communication).
- Immutability: Prefer messages that are immutable or contain immutable data to avoid shared mutable state issues.
- Serializability (for future/distributed systems): While
rsActoris in-process, if you ever plan to distribute your actors, designing messages that can be serialized (e.g., using Serde) is a good practice.
By using well-defined messages and implementing the Message<T> trait, you create a clear and type-safe communication protocol for your actors.
Actor Lifecycle
Actor Lifecycle
Actors in rsActor go through a well-defined lifecycle. Understanding this lifecycle is crucial for managing actor state, resources, and behavior correctly.
The lifecycle consists of several stages, with specific methods (hooks) in the Actor trait that you can implement to customize behavior at each stage:
-
Spawning: An actor’s lifecycle begins when it is spawned using
rsactor::spawn()orrsactor::spawn_with_mailbox_capacity(). -
Starting (
on_start): Once spawned, the framework callson_start.- Purpose: Initialize the actor’s state. It receives the arguments passed during
spawn()and anActorRefto itself. - Required: This method must be implemented.
- Outcome: Return
Ok(Self)with the initialized actor instance orErrif initialization fails. Ifon_startfails, theActorResultwill beFailedwithFailurePhase::OnStart.
#![allow(unused)] fn main() { async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> { // Initialize state, acquire resources Ok(Self { /* ... */ }) } } - Purpose: Initialize the actor’s state. It receives the arguments passed during
-
Running (
on_run): Afteron_startsucceeds,on_runis called as an idle handler when the message queue is empty.- Purpose: Background processing or periodic tasks. Messages always have higher priority.
- Optional: Defaults to
Ok(false)(called once, then disabled). - Return values:
Ok(true): Continue callingon_runOk(false): Stop callingon_run, only process messagesErr(e): Terminate the actor with an error
- Note: Receives
&ActorWeak<Self>, not&ActorRef<Self>.
#![allow(unused)] fn main() { async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> { // Perform idle processing when message queue is empty Ok(false) } } -
Message Processing: The actor concurrently handles incoming messages and executes
on_run. Messages are always given priority viabiasedtokio::select!. -
Stopping (
on_stop): The actor can be stopped in several ways:- Graceful Stop:
actor_ref.stop().await?— processes remaining messages first. - Immediate Kill:
actor_ref.kill()— stops immediately. - Error in
on_run:on_stopis called for cleanup before termination. - All references dropped: When all
ActorRefinstances are dropped.
The
killedparameter distinguishes the termination mode:#![allow(unused)] fn main() { async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> { if killed { println!("Forcefully terminated, performing minimal cleanup"); } else { println!("Gracefully shutting down, performing full cleanup"); } Ok(()) } } - Graceful Stop:
-
Termination: After
on_stopcompletes, the actor’s Tokio task finishes. TheJoinHandleresolves with anActorResult<T>:ActorResult::Completed { actor, killed }— successful completionActorResult::Failed { actor, error, phase, killed }— failure with details
Graceful Stop vs. Kill
| Feature | stop() | kill() |
|---|---|---|
| Drains mailbox | Yes | No |
on_stop called | Yes | Yes |
killed parameter | false | true |
| Blocking | Async (await) | Synchronous |
Visualizing the Lifecycle
graph TD
A[Spawn Actor] --> B{on_start};
B -- Success --> D[Message Processing + on_run Loop];
B -- Failure --> F[ActorResult::Failed OnStart];
D -- stop signal --> E{on_stop killed=false};
D -- kill signal --> E2{on_stop killed=true};
D -- on_run Error --> E3{on_stop for cleanup};
D -- All refs dropped --> E;
E --> G[ActorResult::Completed];
E2 --> G2[ActorResult::Completed killed=true];
E3 --> F2[ActorResult::Failed OnRun];
Understanding these lifecycle hooks allows you to build actors that manage their resources and state reliably throughout their existence.
Creating Actors
rsActor provides flexible ways to define and spawn actors. The core of actor creation revolves around implementing the Actor trait and then using spawn functions to bring actors to life.
Actor Creation Methods
There are several approaches to creating actors in rsActor:
1. Manual Implementation
Implement the Actor trait manually for full control over initialization and lifecycle:
#![allow(unused)]
fn main() {
struct DatabaseActor {
connection: DbConnection,
}
impl Actor for DatabaseActor {
type Args = String; // connection string
type Error = anyhow::Error;
async fn on_start(conn_str: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
let connection = DbConnection::connect(&conn_str).await?;
Ok(Self { connection })
}
}
}
2. Derive Macro
Use #[derive(Actor)] for simple actors that don’t need complex initialization:
#![allow(unused)]
fn main() {
#[derive(Actor)]
struct CacheActor {
data: HashMap<String, String>,
}
}
Spawning Actors
Once you’ve defined an actor, spawn it using one of these functions:
#![allow(unused)]
fn main() {
// Basic spawning
let (actor_ref, join_handle) = spawn(my_actor_args);
// Spawning with custom mailbox capacity
let (actor_ref, join_handle) = spawn_with_mailbox_capacity(my_actor_args, 1000);
}
Actor Categories
Basic Actors
Simple actors that process messages and maintain state without complex background tasks.
Async Actors
Actors that perform continuous asynchronous work using the on_run lifecycle method, such as:
- Periodic tasks with timers
- Network I/O operations
- Database queries
- File system operations
Blocking Task Actors
Actors designed to handle CPU-intensive or blocking operations that might block the async runtime:
- Heavy computations
- Synchronous I/O
- Legacy blocking APIs
- CPU-bound algorithms
Key Concepts
Actor Arguments (Args)
The Args associated type defines what data is needed to initialize your actor:
Self- Pass the entire actor struct as initialization data- Custom types - Define specific initialization parameters
()- No initialization data needed
Error Handling
The Error associated type defines how initialization failures are handled:
anyhow::Error- General error handlingstd::convert::Infallible- For actors that never fail to initialize- Custom error types - Domain-specific error handling
Lifecycle Integration
Different actor types leverage lifecycle methods differently:
on_start: Always required for initializationon_run: Optional, used for background tasks and periodic workon_stop: Optional, used for cleanup when the actor shuts down
This section covers these different actor patterns and when to use each approach.
Basic Actors
Basic Actors
A basic actor is the foundation of the rsActor system. It encapsulates state and processes messages sequentially, ensuring thread safety through the actor model.
Essential Components
Every basic actor requires:
- Actor Struct: Holds the actor’s state
- Actor Trait Implementation: Defines initialization and lifecycle
- Message Types: Define communication protocol
- Message Handlers: Process incoming messages
Complete Example
use rsactor::{Actor, ActorRef, Message, impl_message_handler, spawn};
use anyhow::Result;
// 1. Define the actor struct
#[derive(Debug)]
struct BankAccount {
account_id: String,
balance: u64,
}
// 2. Implement the Actor trait
impl Actor for BankAccount {
type Args = (String, u64); // (account_id, initial_balance)
type Error = anyhow::Error;
async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
let (account_id, initial_balance) = args;
println!("BankAccount {} (ID: {}) opened with balance: ${}",
account_id, actor_ref.identity(), initial_balance);
Ok(BankAccount { account_id, balance: initial_balance })
}
}
// 3. Define message types
struct Deposit(u64);
struct Withdraw(u64);
struct GetBalance;
// 4. Implement message handlers
impl Message<Deposit> for BankAccount {
type Reply = u64; // Returns new balance
async fn handle(&mut self, msg: Deposit, _: &ActorRef<Self>) -> Self::Reply {
self.balance += msg.0;
println!("Account {}: Deposited ${}, new balance: ${}",
self.account_id, msg.0, self.balance);
self.balance
}
}
impl Message<Withdraw> for BankAccount {
type Reply = Result<u64, String>; // Returns new balance or error
async fn handle(&mut self, msg: Withdraw, _: &ActorRef<Self>) -> Self::Reply {
if self.balance >= msg.0 {
self.balance -= msg.0;
println!("Account {}: Withdrew ${}, new balance: ${}",
self.account_id, msg.0, self.balance);
Ok(self.balance)
} else {
Err(format!("Insufficient funds: requested ${}, available ${}",
msg.0, self.balance))
}
}
}
impl Message<GetBalance> for BankAccount {
type Reply = u64;
async fn handle(&mut self, _: GetBalance, _: &ActorRef<Self>) -> Self::Reply {
self.balance
}
}
// 5. Wire up message handlers
impl_message_handler!(BankAccount, [Deposit, Withdraw, GetBalance]);
// Usage example
#[tokio::main]
async fn main() -> Result<()> {
let (account_ref, _) = spawn::<BankAccount>(
("Alice".to_string(), 1000)
);
// Perform operations
let new_balance = account_ref.ask(Deposit(500)).await?;
println!("After deposit: ${}", new_balance);
match account_ref.ask(Withdraw(200)).await? {
Ok(balance) => println!("After withdrawal: ${}", balance),
Err(error) => println!("Withdrawal failed: {}", error),
}
let final_balance = account_ref.ask(GetBalance).await?;
println!("Final balance: ${}", final_balance);
Ok(())
}
## Key Design Patterns
**State Management**: Actors encapsulate state privately, ensuring thread safety without locks.
**Error Handling**: Use `Result` types in message replies to handle business logic errors gracefully.
**Message Design**: Keep messages focused on single operations for clarity and maintainability.
**Type Safety**: Leverage Rust's type system to ensure compile-time verification of message handling.
This pattern scales from simple stateful services to complex business logic processors while maintaining the benefits of the actor model.
}
This covers the creation of a standard actor. The actor runs in its own Tokio task, processes messages sequentially, and manages its state privately.
Async Actors
Async Actors
Actors in rsActor are inherently asynchronous, thanks to their integration with Tokio. However, the term “Async Actor” here refers to actors that, within their message handlers or lifecycle methods, perform operations that involve .awaiting other asynchronous tasks. This is a common pattern for I/O-bound work or when an actor needs to coordinate with other asynchronous services.
rsActor fully supports this: message handlers (handle method) and lifecycle hooks (on_start, on_run, on_stop) are already async fn.
Example: Actor Performing Async Work in handle
Consider an actor that, upon receiving a message, needs to perform an asynchronous HTTP request or a database query.
use rsactor::{Actor, ActorRef, Message, impl_message_handler, spawn};
use anyhow::Result;
use log::info;
use tokio::time::{sleep, Duration};
#[derive(Debug)]
struct AsyncWorkerActor {
worker_id: u32,
}
impl Actor for AsyncWorkerActor {
type Args = u32; // Worker ID
type Error = anyhow::Error;
async fn on_start(worker_id: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!("AsyncWorkerActor (id: {}, worker_id: {}) starting", actor_ref.identity(), worker_id);
Ok(AsyncWorkerActor { worker_id })
}
}
// Message to perform an async task
struct PerformAsyncTaskMsg {
task_data: String,
delay_ms: u64,
}
impl Message<PerformAsyncTaskMsg> for AsyncWorkerActor {
type Reply = String; // Result of the async task
async fn handle(&mut self, msg: PerformAsyncTaskMsg, actor_ref: &ActorRef<Self>) -> Self::Reply {
info!(
"Actor (id: {}, worker_id: {}) received async task: {}. Will delay for {}ms",
actor_ref.identity(),
self.worker_id,
msg.task_data,
msg.delay_ms
);
// Simulate an asynchronous operation (e.g., an HTTP call, DB query)
sleep(Duration::from_millis(msg.delay_ms)).await;
let result = format!(
"Task '{}' completed by worker {} after {}ms",
msg.task_data,
self.worker_id,
msg.delay_ms
);
info!("Actor (id: {}) finished async task: {}", actor_ref.identity(), result);
result
}
}
impl_message_handler!(AsyncWorkerActor, [PerformAsyncTaskMsg]);
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let (actor_ref, jh) = spawn::<AsyncWorkerActor>(101); // Spawn with worker_id 101
let task1 = PerformAsyncTaskMsg { task_data: "Process Payment".to_string(), delay_ms: 100 };
let task2 = PerformAsyncTaskMsg { task_data: "Fetch User Data".to_string(), delay_ms: 50 };
// Send messages and await replies
// Since message handling is sequential for an actor, task2 will only start after task1's handle completes.
let result1 = actor_ref.ask(task1).await?;
info!("Main: Result 1: {}", result1);
let result2 = actor_ref.ask(task2).await?;
info!("Main: Result 2: {}", result2);
actor_ref.stop().await?;
jh.await??;
Ok(())
}
In this example:
- The
handlemethod forPerformAsyncTaskMsgisasync. - It uses
tokio::time::sleep(...).awaitto simulate a non-CPU-bound asynchronous operation. - While the actor is
awaiting insidehandle, its specific Tokio task yields control, allowing other Tokio tasks (including other actors or other asynchronous operations in the system) to run. However, this specific actor instance will not process further messages from its mailbox until the currenthandlemethod completes.
Spawning Additional Tokio Tasks from an Actor
Sometimes, an actor might need to initiate multiple asynchronous operations concurrently or offload work to separate Tokio tasks without blocking its own message processing loop for the entire duration of that work. This is a common pattern for achieving higher concurrency within the scope of a single actor’s responsibilities.
The actor_async_worker.rs example in the examples/ directory demonstrates this pattern:
- A
RequesterActorsends tasks to aWorkerActor. - The
WorkerActor, in itshandlemethod forProcessTask, usestokio::spawnto launch a new asynchronous task for the actual work. - This allows the
WorkerActor’shandlemethod to return quickly, making theWorkerActoravailable to process new messages while the spawned Tokio tasks run in the background. - The spawned tasks, upon completion, send their results back to the
RequesterActor(or another designated actor) using messages.
Key snippet from examples/actor_async_worker.rs (WorkerActor):
#![allow(unused)]
fn main() {
// In WorkerActor's impl Message<ProcessTask>
async fn handle(&mut self, msg: ProcessTask, _actor_ref: &ActorRef<Self>) -> Self::Reply {
let task_id = msg.task_id;
let data = msg.data;
let requester = msg.requester; // ActorRef to send the result back
println!("WorkerActor received task {}: {}", task_id, data);
// Spawn a new Tokio task to do the processing asynchronously
tokio::spawn(async move {
// Simulate some processing time
let processing_time = (task_id % 3 + 1) as u64;
println!("Processing task {} will take {} seconds", task_id, processing_time);
tokio::time::sleep(Duration::from_secs(processing_time)).await;
let result = format!("Result of task {} ...", task_id, /* ... */);
// Send the result back to the requester
match requester.tell(WorkCompleted { task_id, result }).await {
Ok(_) => println!("Worker sent back result for task {}", task_id),
Err(e) => eprintln!("Failed to send result for task {}: {:?}", task_id, e),
}
});
// The handle method returns quickly, allowing WorkerActor to process other messages.
}
}
This pattern is powerful for actors that manage or delegate multiple concurrent operations.
Considerations:
- Sequential Message Processing: Even if an actor spawns other Tokio tasks, its own messages are still processed one by one from its mailbox. The
handlemethod for a given message must complete before the next message is taken from the mailbox. - State Management: If spawned tasks need to interact with the actor’s state, they must do so by sending messages back to the actor. Direct mutable access to
selfis not possible from a separately spawnedtokio::spawntask that outlives thehandlemethod’s scope (unless usingArc<Mutex<...>>or similar, which moves away from the actor model’s state encapsulation benefits for that specific piece of state). - Error Handling: Errors in spawned tasks need to be handled appropriately, potentially by sending an error message back to the parent actor or another designated error-handling actor.
- Graceful Shutdown: If an actor spawns long-running tasks, consider how these tasks should behave when the actor itself is stopped. They might need to be cancelled or allowed to complete.
rsActoritself doesn’t automatically manage tasks spawned viatokio::spawnfrom within an actor; this is the developer’s responsibility.
Async actors are fundamental to building responsive, I/O-bound applications with rsActor.
Blocking Task Actors
Blocking Task Actors
While Tokio and rsActor are designed for asynchronous operations, there are scenarios where you need to integrate with blocking code, such as CPU-bound computations, synchronous I/O libraries, or FFI calls.
Running blocking code directly within an actor’s message handler (which runs on a Tokio worker thread) can stall the Tokio runtime, preventing other asynchronous tasks from making progress. To handle this, Tokio provides tokio::task::spawn_blocking.
rsActor facilitates interaction with tasks running in spawn_blocking by providing blocking message sending methods: blocking_ask and blocking_tell on ActorRef.
When to Use spawn_blocking with Actors
- CPU-Bound Work: For lengthy computations that would otherwise block an async task for too long.
- Synchronous Libraries: When interacting with libraries that use blocking I/O or do not have an async API.
- FFI Calls: If foreign function interface calls are blocking.
Pattern: Actor Managing a Blocking Task
A common pattern is an actor that either:
- Offloads specific blocking operations to
spawn_blockingwithin its message handlers. - Manages a dedicated, long-running blocking task that communicates with the actor.
The examples/actor_blocking_task.rs file demonstrates an actor that spawns a dedicated synchronous background task in its on_start method.
Key elements:
-
Spawning the Blocking Task (in
on_start):#![allow(unused)] fn main() { // Inside SyncDataProcessorActor::on_start let task_actor_ref = actor_ref.clone(); // For task -> actor communication let handle = task::spawn_blocking(move || { loop { thread::sleep(interval); // Blocking sleep let raw_value = rand::random::<f64>() * 100.0; // Send data back to the actor using blocking_tell if let Err(e) = task_actor_ref.blocking_tell(ProcessedData { /* ... */ }, None) { break; // Exit task on error } } }); } -
Communication from Blocking Task to Actor (
blocking_tell): The blocking task usestask_actor_ref.blocking_tell(...)to send messages back to the actor. -
Communication from Actor to Blocking Task: The actor can send commands to the blocking task using standard Tokio MPSC channels.
blocking_tell and blocking_ask
These methods on ActorRef are designed for use from any thread, including non-async contexts:
blocking_tell(message, timeout: Option<Duration>): Sends a message and blocks the current thread until enqueued. Fire-and-forget.blocking_ask(message, timeout: Option<Duration>): Sends a message and blocks until the actor processes it and a reply is received.
Timeout Behavior
timeout: None: Uses Tokio’sblocking_senddirectly. Most efficient, but blocks indefinitely if the mailbox is full.timeout: Some(duration): Spawns a separate thread with a temporary Tokio runtime. Has additional overhead (~50-200us for thread creation) but guarantees bounded waiting.
Thread Safety
These methods are safe to call from within an existing Tokio runtime context because the timeout implementation spawns a separate thread with its own runtime, avoiding the “cannot start a runtime from within a runtime” panic.
Deprecated Methods
The older ask_blocking and tell_blocking methods are deprecated since v0.10.0. Use blocking_ask and blocking_tell instead:
#![allow(unused)]
fn main() {
// Old (deprecated):
// actor_ref.ask_blocking(msg, timeout);
// actor_ref.tell_blocking(msg, timeout);
// New:
actor_ref.blocking_ask(msg, timeout);
actor_ref.blocking_tell(msg, timeout);
}
Considerations
- Thread Pool:
spawn_blockinguses a dedicated thread pool in Tokio. Be mindful of pool size if spawning many blocking tasks. - Communication: Use
ActorRefwithblocking_*methods for task-to-actor communication, and Tokio MPSC channels for actor-to-task communication. - Shutdown: Ensure graceful shutdown of blocking tasks when the managing actor stops.
By using spawn_blocking and the blocking_* methods, rsActor allows you to integrate synchronous, blocking code into your asynchronous actor system safely and efficiently.
Communicating with Actors
Once an actor is spawned and you have its ActorRef, you can communicate with it by sending messages. rsActor provides two primary modes of message passing: “tell” (fire-and-forget) and “ask” (request-response).
All message sending methods are asynchronous and return a Future that resolves when the operation completes (e.g., message enqueued for tell, or reply received for ask).
Tell (Fire and Forget)
Tell (Fire and Forget)
The “tell” pattern, accessed via actor_ref.tell(message), is used to send a message to an actor without expecting a direct reply. This is a form of asynchronous, one-way communication.
Characteristics of tell:
- Asynchronous:
actor_ref.tell(message)returns aFuturethat completes once the message is enqueued into the actor’s mailbox. It does not wait for the actor to process the message. - No Direct Reply: The sender does not receive a value back from the actor.
ReplyType: For messages intended fortell, their handler typically returns().- Error Handling: Returns
Result<(), rsactor::Error>. An error occurs if the actor is no longer alive.
Usage Example:
use rsactor::{Actor, ActorRef, message_handlers, spawn};
use anyhow::Result;
use tracing::info;
#[derive(Actor)]
struct LoggerActor;
// Message to log a string
struct LogMessage(String);
#[message_handlers]
impl LoggerActor {
#[handler]
async fn handle_log(&mut self, msg: LogMessage, actor_ref: &ActorRef<Self>) -> () {
info!("LoggerActor (id: {}): {}", actor_ref.identity(), msg.0);
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let (logger_ref, jh) = spawn::<LoggerActor>(LoggerActor);
// Send a log message using tell
logger_ref.tell(LogMessage("Application started successfully".to_string())).await?;
logger_ref.tell(LogMessage("Processing item #123".to_string())).await?;
logger_ref.stop().await?;
jh.await?;
Ok(())
}
tell_with_timeout
rsActor also provides actor_ref.tell_with_timeout(message, timeout).
- Similar to
tell, but allows specifying aDurationas a timeout for enqueuing the message. - Returns a
Timeouterror if the message cannot be enqueued within the given duration.
#![allow(unused)]
fn main() {
use std::time::Duration;
match logger_ref.tell_with_timeout(
LogMessage("Critical event".to_string()),
Duration::from_millis(100)
).await {
Ok(_) => info!("Log message sent."),
Err(e) => info!("Failed to send log message: {:?}", e),
}
}
blocking_tell
For sending messages from non-async contexts (e.g., spawn_blocking tasks):
#![allow(unused)]
fn main() {
// Without timeout (most efficient)
actor_ref.blocking_tell(LogMessage("from blocking".into()), None)?;
// With timeout
actor_ref.blocking_tell(LogMessage("from blocking".into()), Some(Duration::from_secs(1)))?;
}
When to Use tell:
- Notifications/Events: When an actor needs to notify another without needing an immediate response.
- Commands: Issuing commands where the sender doesn’t need to wait for completion.
- Decoupling: When you want to minimize coupling between actors.
- Avoiding Deadlocks: In complex actor interactions,
tellcan break circular dependency cycles thataskmight create.
tell is a fundamental communication pattern for building reactive and event-driven systems with actors.
Ask (Request-Response)
Ask (Request-Response)
The “ask” pattern, accessed via actor_ref.ask(message), is used to send a message to an actor and asynchronously await a reply. This is a form of two-way communication, suitable for request-response interactions.
Characteristics of ask:
- Asynchronous Request, Asynchronous Reply:
actor_ref.ask(message)returns aFuturethat completes when the target actor has processed the message and produced a reply. - Direct Reply: The sender receives a typed value back from the actor, defined by the handler’s return type.
- Error Handling: Returns
Result<Reply, rsactor::Error>. Errors can occur if:- The actor is not alive (
Error::Send) - The reply channel was dropped (
Error::Receive) - A timeout occurs (
Error::Timeout)
- The actor is not alive (
Usage Example:
use rsactor::{Actor, ActorRef, message_handlers, spawn};
use anyhow::Result;
use tracing::info;
#[derive(Actor)]
struct CalculatorActor {
last_result: i32,
}
struct AddMsg(i32, i32);
struct GetLastResult;
#[message_handlers]
impl CalculatorActor {
#[handler]
async fn handle_add(&mut self, msg: AddMsg, _: &ActorRef<Self>) -> i32 {
let sum = msg.0 + msg.1;
self.last_result = sum;
sum
}
#[handler]
async fn handle_get_last(&mut self, _: GetLastResult, _: &ActorRef<Self>) -> i32 {
self.last_result
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let (calc_ref, jh) = spawn::<CalculatorActor>(CalculatorActor { last_result: 0 });
// Send an AddMsg using ask and await the reply
let sum: i32 = calc_ref.ask(AddMsg(10, 25)).await?;
info!("Sum: {}", sum); // Sum: 35
let last: i32 = calc_ref.ask(GetLastResult).await?;
assert_eq!(sum, last);
calc_ref.stop().await?;
jh.await?;
Ok(())
}
ask_with_timeout
Specify a timeout for the entire request-response cycle:
#![allow(unused)]
fn main() {
use std::time::Duration;
match calc_ref.ask_with_timeout(AddMsg(5, 7), Duration::from_secs(1)).await {
Ok(sum) => info!("Sum: {}", sum),
Err(e) => info!("Failed: {:?}", e),
}
}
This is crucial for building resilient systems where you cannot wait indefinitely.
ask_join
For handlers that return a JoinHandle<R> (spawning long-running tasks), ask_join automatically awaits the task completion:
#![allow(unused)]
fn main() {
// Handler returns JoinHandle<String>
let result: String = actor_ref.ask_join(HeavyTask { data: "input".into() }).await?;
}
This avoids manually awaiting the JoinHandle from a regular ask call.
blocking_ask
For sending messages from non-async contexts:
#![allow(unused)]
fn main() {
// Without timeout
let result = actor_ref.blocking_ask(GetLastResult, None)?;
// With timeout
let result = actor_ref.blocking_ask(GetLastResult, Some(Duration::from_secs(5)))?;
}
When to Use ask:
- Requesting Data: When an actor needs to retrieve information from another actor.
- Performing Operations with Results: When the caller needs the result to proceed.
- Synchronizing Operations: To ensure one operation completes before another begins.
Potential Pitfalls:
- Deadlocks: If Actor A
asks Actor B, and Actor B concurrentlyasks Actor A, a deadlock occurs. Design interaction flows carefully, or usetellto break cycles. rsActor provides a runtime deadlock detection feature that catches these cycles before they happen — see Deadlock Detection for details. - Performance: Excessive
askwheretellwould suffice leads to performance bottlenecks. If a reply isn’t strictly needed, prefertell.
ask is a powerful tool for interactions requiring a response, but should be used with awareness of its blocking nature and potential complexities.
Error Handling
Robust error handling is essential in any concurrent system. rsActor provides comprehensive mechanisms to deal with errors at every level.
Error Categories
Errors in rsActor fall into these categories:
- Actor Lifecycle Errors: Errors in
on_start,on_run, oron_stop(your customActor::Errortype) - Communication Errors: Errors during message sending (
rsactor::Error) - Message Handling Errors: Errors within handler methods (your custom reply types)
- Panics: If an actor task panics (caught by JoinHandle)
rsactor::Error
The framework’s built-in error type for communication failures:
| Variant | Description | Retryable? |
|---|---|---|
Error::Send | Actor’s mailbox is closed (actor stopped) | No |
Error::Receive | Reply channel was dropped before response | No |
Error::Timeout | Operation exceeded the specified timeout | Yes |
Error::Downcast | Reply type downcast failed (programming error) | No |
Error::Runtime | Actor lifecycle runtime failure | No |
Error::MailboxCapacity | Mailbox capacity configuration error | No |
Error::Join | Spawned task panicked or was cancelled | No |
Error Utilities
#![allow(unused)]
fn main() {
match actor_ref.tell(msg).await {
Ok(()) => { /* delivered */ }
Err(e) => {
// Check if retrying might help
if e.is_retryable() {
// Only Timeout errors are retryable
}
// Get actionable debugging suggestions
for tip in e.debugging_tips() {
println!(" - {}", tip);
}
}
}
}
Custom Actor Errors
Define your own error type for lifecycle methods. Any type implementing Send + Debug + 'static works:
#![allow(unused)]
fn main() {
#[derive(Debug, thiserror::Error)]
enum MyActorError {
#[error("Database error: {0}")]
DbError(#[from] sqlx::Error),
#[error("Network timeout")]
NetworkTimeout,
}
impl Actor for MyActor {
type Error = MyActorError;
// ...
}
}
on_tell_result Hook
For fire-and-forget (tell) messages, errors in the handler are silently dropped by default. The on_tell_result hook provides observability:
#![allow(unused)]
fn main() {
#[message_handlers]
impl MyActor {
#[handler]
async fn handle_command(&mut self, msg: Command, _: &ActorRef<Self>) {
self.execute(msg)?;
}
}
// In the Message<Command> trait implementation (auto-generated by #[handler]):
// on_tell_result is called with the handler's return value when sent via tell()
}
When implementing Message<T> manually, override on_tell_result:
#![allow(unused)]
fn main() {
impl Message<Command> for MyActor {
type Reply = Result<(), CommandError>;
async fn handle(&mut self, msg: Command, _: &ActorRef<Self>) -> Self::Reply {
self.execute(msg)
}
fn on_tell_result(result: &Self::Reply, _actor_ref: &ActorRef<Self>) {
if let Err(e) = result {
tracing::error!("Command failed (fire-and-forget): {e:?}");
}
}
}
}
Use #[handler(no_log)] to suppress the default warning log for tell errors.
Dead Letters
When a message cannot be delivered, rsActor automatically records a dead letter with structured tracing. See Dead Letter Tracking for details.
ActorResult
The ActorResult<T: Actor> enum is returned when an actor’s lifecycle completes. This is typically what you get when you .await the JoinHandle returned by rsactor::spawn().
It provides detailed information about how the actor terminated.
#![allow(unused)]
fn main() {
/// Represents the phase during which an actor failure occurred.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FailurePhase {
/// Actor failed during the `on_start` lifecycle hook.
OnStart,
/// Actor failed during execution in the `on_run` lifecycle hook.
OnRun,
/// Actor failed during the `on_stop` lifecycle hook.
OnStop,
}
/// Result type returned when an actor's lifecycle completes.
#[derive(Debug)]
pub enum ActorResult<T: Actor> {
/// Actor completed successfully and can be recovered.
Completed {
/// The successfully completed actor instance
actor: T,
/// Whether the actor was killed (`true`) or stopped gracefully (`false`)
killed: bool,
},
/// Actor failed during one of its lifecycle phases.
Failed {
/// The actor instance (if recoverable), or None if not recoverable.
/// This will be `None` specifically when the failure occurred during `on_start`,
/// as the actor wasn't fully initialized.
actor: Option<T>,
/// The error that caused the failure
error: T::Error,
/// The lifecycle phase during which the failure occurred
phase: FailurePhase,
/// Whether the actor was killed (`true`) or was attempting to stop gracefully (`false`)
killed: bool,
},
}
}
Variants
-
ActorResult::Completed { actor: T, killed: bool }- Indicates that the actor finished its lifecycle without any errors defined by
T::Error. actor: The instance of the actor. You can retrieve it if you need to inspect its final state.killed: A boolean indicating if the actor was terminated via akill()signal (true) or if it stopped gracefully (e.g., viastop()or by naturally ending its run loop) (false).
- Indicates that the actor finished its lifecycle without any errors defined by
-
ActorResult::Failed { actor: Option<T>, error: T::Error, phase: FailurePhase, killed: bool }- Indicates that the actor encountered an error (
T::Error) during its operation. actor: AnOption<T>containing the actor instance if it was recoverable. This will beNoneif the failure occurred during theOnStartphase, as the actor instance wouldn’t have been fully created.error: The specific error of typeT::Error(defined in yourimpl Actor for YourActor) that caused the failure.phase: AFailurePhaseenum (OnStart,OnRun,OnStop) indicating when the error occurred in the actor’s lifecycle.killed: A boolean indicating if the actor was attempting to stop due to akill()signal when the failure occurred.
- Indicates that the actor encountered an error (
Utility Methods
ActorResult provides several helpful methods:
is_completed(): Returnstrueif the actor completed successfully.is_failed(): Returnstrueif the actor failed.was_killed(): Returnstrueif the actor was killed, regardless of completion or failure.stopped_normally(): ReturnstrueifCompletedand notkilled.is_startup_failed(): Returnstrueif failed duringOnStart.is_runtime_failed(): Returnstrueif failed duringOnRun.is_stop_failed(): Returnstrueif failed duringOnStop.actor(): ReturnsOption<&T>to the actor instance if available.into_actor(): Consumes self and returnsOption<T>.error(): ReturnsOption<&T::Error>if failed.into_error(): Consumes self and returnsOption<T::Error>.has_actor(): Returnstrueif the result contains an actor instance.to_result(): ConvertsActorResult<T>intostd::result::Result<T, T::Error>.
Example Usage
use rsactor::{spawn, Actor, ActorRef, ActorWeak, ActorResult, FailurePhase, message_handlers};
use anyhow::{Result, anyhow};
#[derive(Debug)]
struct MyErrActor { should_fail_on_start: bool, should_fail_on_run: bool }
impl Actor for MyErrActor {
type Args = (bool, bool); // (should_fail_on_start, should_fail_on_run)
type Error = anyhow::Error;
async fn on_start(args: Self::Args, ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
tracing::info!("MyErrActor (id: {}) on_start called", ar.identity());
if args.0 {
return Err(anyhow!("Deliberate failure in on_start"));
}
Ok(MyErrActor { should_fail_on_start: args.0, should_fail_on_run: args.1 })
}
async fn on_run(&mut self, ar: &ActorWeak<Self>) -> Result<bool, Self::Error> {
tracing::info!("MyErrActor (id: {}) on_run called", ar.identity());
if self.should_fail_on_run {
return Err(anyhow!("Deliberate failure in on_run"));
}
Ok(true) // Return true to indicate run loop should stop
}
}
// Message type
struct Ping;
#[message_handlers]
impl MyErrActor {
#[handler]
async fn handle_ping(&mut self, _msg: Ping, _: &ActorRef<Self>) {}
}
async fn check_actor_termination(fail_on_start: bool, fail_on_run: bool) {
let (actor_ref, join_handle) = spawn::<MyErrActor>((fail_on_start, fail_on_run));
let id = actor_ref.identity();
match join_handle.await {
Ok(actor_result) => match actor_result {
ActorResult::Completed { actor, killed } => {
tracing::info!(
"Actor (id: {}) COMPLETED. Killed: {}. Final state: {:?}",
id, killed, actor
);
}
ActorResult::Failed { actor, error, phase, killed } => {
tracing::info!(
"Actor (id: {}) FAILED. Phase: {:?}, Killed: {}. Error: {}. State: {:?}",
id, phase, killed, error, actor
);
if fail_on_start {
assert!(matches!(phase, FailurePhase::OnStart));
assert!(actor.is_none());
}
if fail_on_run && !fail_on_start {
assert!(matches!(phase, FailurePhase::OnRun));
assert!(actor.is_some());
}
}
},
Err(join_error) => {
tracing::info!("Actor (id: {}) task JOIN ERROR: {:?}", id, join_error);
}
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
tracing::info!("--- Checking normal completion ---");
check_actor_termination(false, false).await;
tracing::info!("--- Checking failure in on_start ---");
check_actor_termination(true, false).await;
tracing::info!("--- Checking failure in on_run ---");
check_actor_termination(false, true).await;
}
This ActorResult provides a comprehensive way to understand the termination reason and final state of an actor, which is crucial for supervision strategies and debugging.
Timeouts
Timeouts are essential for building resilient actor systems. rsActor provides timeout functionality for both message sending (tell) and request-response patterns (ask). This prevents your application from hanging indefinitely when actors become unresponsive or slow.
Overview
rsActor provides two timeout-enabled communication methods:
ask_with_timeout: Sends a message and waits for a reply within a specified timeouttell_with_timeout: Sends a fire-and-forget message with a timeout for the send operation
Both methods use tokio::time::Duration to specify timeout values and return a Result that will contain a Timeout error if the operation exceeds the specified duration.
ask_with_timeout
The ask_with_timeout method is similar to the regular ask method, but allows you to specify a timeout for the entire request-response cycle.
Syntax
#![allow(unused)]
fn main() {
pub async fn ask_with_timeout<M>(&self, msg: M, timeout: Duration) -> Result<T::Reply>
where
T: Message<M>,
M: Send + 'static,
T::Reply: Send + 'static,
}
Usage
use rsactor::{spawn, Actor, ActorRef, Message, impl_message_handler};
use std::time::Duration;
use anyhow::Result;
#[derive(Debug)]
struct MathActor;
impl Actor for MathActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_args: Self::Args, _ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(MathActor)
}
}
struct MultiplyMsg(i32, i32);
impl Message<MultiplyMsg> for MathActor {
type Reply = i32;
async fn handle(&mut self, msg: MultiplyMsg, _ar: &ActorRef<Self>) -> Self::Reply {
msg.0 * msg.1
}
}
impl_message_handler!(MathActor, [MultiplyMsg]);
#[tokio::main]
async fn main() -> Result<()> {
let (math_ref, _join_handle) = spawn::<MathActor>(());
// Ask with timeout - will succeed if the actor responds within 1 second
match math_ref.ask_with_timeout(MultiplyMsg(6, 7), Duration::from_secs(1)).await {
Ok(product) => println!("Product: {}", product),
Err(e) => println!("Failed to get product: {:?}", e),
}
Ok(())
}
When ask_with_timeout Times Out
If the actor doesn’t respond within the specified timeout, you’ll receive a Timeout error:
#![allow(unused)]
fn main() {
// This will likely timeout if the actor takes longer than 10ms to respond
match math_ref.ask_with_timeout(MultiplyMsg(1, 2), Duration::from_millis(10)).await {
Ok(result) => println!("Result: {}", result),
Err(e) => {
if e.to_string().contains("timed out") {
println!("Request timed out!");
} else {
println!("Other error: {}", e);
}
}
}
}
tell_with_timeout
The tell_with_timeout method is similar to the regular tell method, but allows you to specify a timeout for the send operation itself.
Syntax
#![allow(unused)]
fn main() {
pub async fn tell_with_timeout<M>(&self, msg: M, timeout: Duration) -> Result<()>
where
T: Message<M>,
M: Send + 'static,
}
Usage
use rsactor::{spawn, Actor, ActorRef, Message, impl_message_handler};
use std::time::Duration;
use anyhow::Result;
use log::info;
#[derive(Debug)]
struct EventActor;
impl Actor for EventActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_args: Self::Args, _ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(EventActor)
}
}
struct EventMessage(String);
impl Message<EventMessage> for EventActor {
type Reply = ();
async fn handle(&mut self, msg: EventMessage, _ar: &ActorRef<Self>) -> Self::Reply {
println!("EVENT: {}", msg.0);
}
}
impl_message_handler!(EventActor, [EventMessage]);
#[tokio::main]
async fn main() -> Result<()> {
let (event_ref, _join_handle) = spawn::<EventActor>(());
// Tell with timeout - will fail if the message can't be enqueued within 100ms
match event_ref.tell_with_timeout(
EventMessage("System started".to_string()),
Duration::from_millis(100)
).await {
Ok(_) => info!("Event message sent successfully"),
Err(e) => info!("Failed to send event message: {:?}", e),
}
Ok(())
}
When tell_with_timeout Times Out
tell_with_timeout typically only times out if:
- The actor’s mailbox is full and cannot accept new messages
- The actor has terminated and can no longer receive messages
- There are system-level issues preventing message delivery
In most cases, tell_with_timeout completes quickly since sending a message is usually a fast operation.
Practical Example: Slow Actor with Timeouts
Here’s a comprehensive example demonstrating timeout behavior with actors that simulate different response times:
use rsactor::{message_handlers, spawn, Actor, ActorRef};
use std::time::Duration;
use anyhow::Result;
use tracing::info;
// Define an actor that can process requests with varying response times
struct TimeoutDemoActor {
name: String,
}
impl Actor for TimeoutDemoActor {
type Args = String;
type Error = anyhow::Error;
async fn on_start(name: Self::Args, _ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(Self { name })
}
}
struct FastQuery(String);
struct SlowQuery(String);
#[message_handlers]
impl TimeoutDemoActor {
#[handler]
async fn handle_fast_query(&mut self, msg: FastQuery, _: &ActorRef<Self>) -> String {
// Fast response - completes immediately
format!("{}: Fast response to: {}", self.name, msg.0)
}
#[handler]
async fn handle_slow_query(&mut self, msg: SlowQuery, _: &ActorRef<Self>) -> String {
// Slow response - takes 500ms to complete
tokio::time::sleep(Duration::from_millis(500)).await;
format!("{}: Slow response to: {}", self.name, msg.0)
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let (actor_ref, _join_handle) = spawn::<TimeoutDemoActor>("Demo".to_string());
// Fast query with sufficient timeout - should succeed
info!("=== Fast query with long timeout ===");
match actor_ref.ask_with_timeout(
FastQuery("What is your name?".to_string()),
Duration::from_millis(100)
).await {
Ok(response) => info!("Success: {}", response),
Err(e) => info!("Failed: {}", e),
}
// Slow query with insufficient timeout - should fail
info!("=== Slow query with short timeout ===");
match actor_ref.ask_with_timeout(
SlowQuery("Complex calculation".to_string()),
Duration::from_millis(100) // Less than 500ms needed
).await {
Ok(response) => info!("Success: {}", response),
Err(e) => info!("Failed: {}", e),
}
// Slow query with sufficient timeout - should succeed
info!("=== Slow query with sufficient timeout ===");
match actor_ref.ask_with_timeout(
SlowQuery("Another calculation".to_string()),
Duration::from_millis(1000) // More than 500ms needed
).await {
Ok(response) => info!("Success: {}", response),
Err(e) => info!("Failed: {}", e),
}
Ok(())
}
Best Practices
1. Choose Appropriate Timeout Values
- Too short: May cause unnecessary failures due to normal processing delays
- Too long: May not protect against truly unresponsive actors
- Consider your use case: Interactive UI vs batch processing will have different timeout requirements
#![allow(unused)]
fn main() {
// For interactive applications
let timeout = Duration::from_millis(100);
// For backend processing
let timeout = Duration::from_secs(30);
// For critical real-time systems
let timeout = Duration::from_millis(10);
}
2. Handle Timeout Errors Gracefully
Always handle timeout errors appropriately for your application:
#![allow(unused)]
fn main() {
match actor_ref.ask_with_timeout(msg, timeout).await {
Ok(response) => {
// Process successful response
handle_response(response);
}
Err(e) if e.to_string().contains("timed out") => {
// Handle timeout specifically
log::warn!("Actor response timed out, using fallback");
use_fallback_value();
}
Err(e) => {
// Handle other errors
log::error!("Actor communication failed: {}", e);
return Err(e);
}
}
}
3. Consider Retries with Backoff
For important operations, consider implementing retry logic:
#![allow(unused)]
fn main() {
use tokio::time::{sleep, Duration};
async fn ask_with_retries<T, M>(
actor_ref: &ActorRef<T>,
msg: M,
timeout: Duration,
max_retries: u32
) -> Result<T::Reply>
where
T: Actor + Message<M>,
M: Send + Clone + 'static,
T::Reply: Send + 'static,
{
for attempt in 1..=max_retries {
match actor_ref.ask_with_timeout(msg.clone(), timeout).await {
Ok(response) => return Ok(response),
Err(e) if attempt == max_retries => return Err(e),
Err(e) => {
log::warn!("Attempt {} failed: {}", attempt, e);
sleep(Duration::from_millis(100 * attempt as u64)).await;
}
}
}
unreachable!()
}
}
4. Use Timeouts in Supervision Strategies
Timeouts are particularly useful in supervision scenarios where you need to detect and handle unresponsive child actors:
#![allow(unused)]
fn main() {
// Supervisor checking child actor health
match child_ref.ask_with_timeout(HealthCheck, Duration::from_secs(5)).await {
Ok(_) => {
// Child is responsive
}
Err(_) => {
// Child is unresponsive, consider restarting
log::warn!("Child actor unresponsive, restarting...");
restart_child_actor().await?;
}
}
}
Error Types
When a timeout occurs, you’ll receive an error that contains information about:
- The actor’s identity
- The timeout duration that was exceeded
- The operation that timed out (“ask” or “tell”)
This information helps with debugging and monitoring actor system health.
Performance Considerations
- Timeouts add minimal overhead to message passing
- Very short timeouts (< 1ms) may be unreliable due to system scheduling
- Consider your system’s typical response times when setting timeouts
- Monitor timeout rates to detect system performance issues
Timeouts are a crucial tool for building resilient actor systems that can handle failures gracefully and maintain responsive behavior even when some actors become slow or unresponsive.
Macros
rsActor provides several macros to reduce boilerplate code and make actor development more ergonomic. These macros handle the repetitive aspects of actor implementation while maintaining type safety and performance.
Available Macros
1. #[derive(Actor)] - Actor Derive Macro
Automatically implements the Actor trait for simple structs that don’t require complex initialization logic.
2. impl_message_handler! - Message Handler Macro
Generates the necessary boilerplate to wire up message handlers for an actor, supporting both regular and generic actors.
When to Use Macros
Use #[derive(Actor)] when:
- Your actor doesn’t need complex initialization logic
- The actor can be created directly from its field values
- You want to minimize boilerplate for simple actors
- Error handling can use
std::convert::Infallible(never fails)
Use impl_message_handler! when:
- You have multiple message types to handle
- You want to avoid manual implementation of the
MessageHandlertrait - You’re working with generic actors
- You need consistent message routing logic
Basic Usage Example
use rsactor::{Actor, ActorRef, Message, impl_message_handler, spawn};
// Simple actor using derive macro - no custom initialization needed
#[derive(Actor, Debug)]
struct UserAccount {
username: String,
balance: u64,
}
// Define messages
struct Deposit(u64);
struct GetBalance;
// Implement message handlers
impl Message<Deposit> for UserAccount {
type Reply = u64; // Returns new balance
async fn handle(&mut self, msg: Deposit, _: &ActorRef<Self>) -> Self::Reply {
self.balance += msg.0;
self.balance
}
}
impl Message<GetBalance> for UserAccount {
type Reply = u64;
async fn handle(&mut self, _: GetBalance, _: &ActorRef<Self>) -> Self::Reply {
self.balance
}
}
// Wire up message handlers
impl_message_handler!(UserAccount, [Deposit, GetBalance]);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create actor instance directly - no complex initialization
let account = UserAccount {
username: "alice".to_string(),
balance: 100
};
let (actor_ref, _) = spawn(account);
let new_balance = actor_ref.ask(Deposit(50)).await?;
println!("New balance: {}", new_balance); // 150
Ok(())
}
Key Benefits
- Reduced Boilerplate: Automatically generate repetitive code
- Type Safety: Compile-time verification of actor implementations
- Zero Runtime Overhead: All code generation happens at compile time
- Easy Maintenance: Simple to add new message types and actors
Derive Macro Features
The #[derive(Actor)] macro is perfect for simple actors:
#![allow(unused)]
fn main() {
#[derive(Actor)]
struct SimpleActor {
name: String,
value: i32,
}
// Automatically implements Actor trait with:
// - Args = Self (takes the struct instance)
// - Error = std::convert::Infallible (never fails)
// - on_start simply returns the provided instance
}
For detailed information about each macro, see the individual sections:
- Unified Macro - Details about the
impl_message_handler!macro
Message Handlers Macro
The #[message_handlers] attribute macro combined with #[handler] method attributes is the recommended approach for defining message handlers in rsActor. It automatically generates the necessary Message trait implementations from annotated methods.
Purpose
When an actor needs to handle multiple message types, manually implementing the Message trait for each one is verbose. The #[message_handlers] macro automates this by generating all the boilerplate from simple method signatures.
Basic Example
#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, message_handlers, spawn};
#[derive(Actor)]
struct CounterActor {
value: i32,
}
// Define message types
struct Add(i32);
struct GetValue;
// Use message_handlers macro with handler attributes
#[message_handlers]
impl CounterActor {
#[handler]
async fn handle_add(&mut self, msg: Add, _: &ActorRef<Self>) -> i32 {
self.value += msg.0;
self.value
}
#[handler]
async fn handle_get(&mut self, _: GetValue, _: &ActorRef<Self>) -> i32 {
self.value
}
// Regular methods can coexist without the #[handler] attribute
fn reset(&mut self) {
self.value = 0;
}
}
}
How It Works
Each #[handler] method is transformed into a Message<T> trait implementation:
- The first parameter after
&mut selfdetermines the message typeT - The return type becomes
Message<T>::Reply - The method body becomes the
handle()implementation
The macro generates the equivalent of:
#![allow(unused)]
fn main() {
impl Message<Add> for CounterActor {
type Reply = i32;
async fn handle(&mut self, msg: Add, actor_ref: &ActorRef<Self>) -> Self::Reply {
self.value += msg.0;
self.value
}
}
}
Generic Actor Support
The macro fully supports generic actors:
#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, message_handlers, spawn};
use std::fmt::Debug;
#[derive(Debug)]
struct GenericActor<T: Send + Debug + Clone + 'static> {
value: Option<T>,
}
impl<T: Send + Debug + Clone + 'static> Actor for GenericActor<T> {
type Error = String;
type Args = ();
async fn on_start(_: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(GenericActor { value: None })
}
}
struct SetValue<T: Send + Debug + 'static>(T);
struct GetValue;
#[message_handlers]
impl<T: Send + Debug + Clone + 'static> GenericActor<T> {
#[handler]
async fn handle_set(&mut self, msg: SetValue<T>, _: &ActorRef<Self>) -> () {
self.value = Some(msg.0);
}
#[handler]
async fn handle_get(&mut self, _: GetValue, _: &ActorRef<Self>) -> Option<T> {
self.value.clone()
}
}
}
Error Handling with on_tell_result
When a handler returns Result<T, E>, the macro automatically generates an on_tell_result implementation that logs errors for tell() calls:
#![allow(unused)]
fn main() {
#[message_handlers]
impl MyActor {
// Returning Result<T, E> auto-generates error logging for tell() calls
#[handler]
async fn handle_process(&mut self, msg: ProcessData, _: &ActorRef<Self>) -> Result<(), MyError> {
self.process(msg.data)?;
Ok(())
}
// Use #[handler(no_log)] to suppress automatic error logging
#[handler(no_log)]
async fn handle_silent(&mut self, msg: SilentOp, _: &ActorRef<Self>) -> Result<(), MyError> {
// Errors won't be automatically logged for tell() calls
Ok(())
}
}
}
#[derive(Actor)] Macro
For simple actors that don’t need custom initialization logic, use #[derive(Actor)]:
#![allow(unused)]
fn main() {
#[derive(Actor)]
struct SimpleActor {
name: String,
count: u32,
}
// spawn takes the struct instance directly as Args
let actor = SimpleActor { name: "test".into(), count: 0 };
let (actor_ref, handle) = spawn::<SimpleActor>(actor);
}
This generates an Actor impl where:
type Args = Self(the struct itself)type Error = std::convert::Infallible(never fails)on_startsimply returns the provided instance
Benefits
- Selective Processing: Only methods with
#[handler]become message handlers - Clean Separation: Regular methods coexist with handlers in the same
implblock - Automatic
on_tell_result: Error logging forResultreturn types (suppressed via#[handler(no_log)]) - Generic Support: Full support for generic actors and message types
- Compile-Time Safety: Message handler signatures are verified at compile time
- Reduced Boilerplate: No manual
Messagetrait implementations needed
Running the Example
cargo run --example derive_macro_demo
cargo run --example unified_macro_demo
Advanced Features
This section covers advanced features of rsActor that enable more sophisticated actor system designs:
- Handler Traits — Type-erased message sending for heterogeneous actor collections
- Actor Control — Type-erased lifecycle management
- Weak References — Preventing memory leaks in actor graphs
- Metrics — Per-actor performance monitoring
- Dead Letter Tracking — Observability for failed message delivery
- Tracing & Observability — Structured logging and instrumentation spans
- Deadlock Detection — Runtime detection of circular
askdependencies
Handler Traits
Handler traits enable type-erased message sending, allowing different actor types that handle the same message to be stored in a unified collection. This is essential for building systems where you need to broadcast messages to heterogeneous actors.
Overview
rsActor provides four handler traits:
| Trait | Reference | Message Pattern |
|---|---|---|
TellHandler<M> | Strong | Fire-and-forget |
AskHandler<M, R> | Strong | Request-response |
WeakTellHandler<M> | Weak | Fire-and-forget |
WeakAskHandler<M, R> | Weak | Request-response |
Strong handlers keep actors alive. Weak handlers do not prevent actors from being dropped.
TellHandler — Fire-and-Forget
Store different actor types that handle the same message in one collection:
#![allow(unused)]
fn main() {
use rsactor::{TellHandler, ActorRef, spawn, message_handlers, Actor};
// Two different actors that both handle PingMsg
#[derive(Actor)]
struct ActorA;
#[derive(Actor)]
struct ActorB;
struct PingMsg;
#[message_handlers]
impl ActorA {
#[handler]
async fn handle_ping(&mut self, _: PingMsg, _: &ActorRef<Self>) -> () {}
}
#[message_handlers]
impl ActorB {
#[handler]
async fn handle_ping(&mut self, _: PingMsg, _: &ActorRef<Self>) -> () {}
}
// Store both in a single collection
let (ref_a, _) = spawn::<ActorA>(ActorA);
let (ref_b, _) = spawn::<ActorB>(ActorB);
let handlers: Vec<Box<dyn TellHandler<PingMsg>>> = vec![
(&ref_a).into(), // From<&ActorRef<T>> — clones the reference
ref_b.into(), // From<ActorRef<T>> — moves ownership
];
// Broadcast to all handlers
for handler in &handlers {
handler.tell(PingMsg).await?;
}
}
AskHandler — Request-Response
For actors that return the same reply type for a message:
#![allow(unused)]
fn main() {
use rsactor::AskHandler;
struct GetStatus;
// Both actors return String for GetStatus
let handlers: Vec<Box<dyn AskHandler<GetStatus, String>>> = vec![
(&actor_a).into(),
(&actor_b).into(),
];
for handler in &handlers {
let status = handler.ask(GetStatus).await?;
println!("Status: {}", status);
}
}
Weak Handlers
Weak handlers do not keep actors alive. You must upgrade() before sending messages:
#![allow(unused)]
fn main() {
use rsactor::{WeakTellHandler, ActorRef};
let weak_handlers: Vec<Box<dyn WeakTellHandler<PingMsg>>> = vec![
ActorRef::downgrade(&actor_a).into(),
ActorRef::downgrade(&actor_b).into(),
];
for handler in &weak_handlers {
if let Some(strong) = handler.upgrade() {
strong.tell(PingMsg).await?;
}
// else: actor was dropped, skip it
}
}
Available Methods
All handler traits provide these methods:
| Method | TellHandler | AskHandler |
|---|---|---|
tell(msg) / ask(msg) | Async send | Async send + reply |
tell_with_timeout / ask_with_timeout | With timeout | With timeout |
blocking_tell / blocking_ask | Blocking send | Blocking send + reply |
clone_boxed() | Clone handler | Clone handler |
downgrade() | To weak handler | To weak handler |
as_control() | Access lifecycle | Access lifecycle |
Lifecycle Access via as_control()
Handler traits provide access to ActorControl for lifecycle management:
#![allow(unused)]
fn main() {
for handler in &handlers {
let control = handler.as_control();
println!("Actor {} alive: {}", control.identity(), control.is_alive());
}
}
Running the Example
cargo run --example handler_demo
Actor Control
The ActorControl trait provides type-erased lifecycle management for actors. It allows you to manage different actor types through a unified interface without knowing their message types.
Use Case
When you need to manage a collection of actors of different types (e.g., stopping all actors during shutdown), ActorControl lets you store them in a single Vec and control their lifecycle uniformly.
ActorControl — Strong Reference
#![allow(unused)]
fn main() {
use rsactor::{ActorControl, ActorRef, spawn, Actor};
// Store different actor types in a single collection
let controls: Vec<Box<dyn ActorControl>> = vec![
(&worker_ref).into(), // ActorRef<WorkerActor>
(&logger_ref).into(), // ActorRef<LoggerActor>
(&cache_ref).into(), // ActorRef<CacheActor>
];
// Check status of all actors
for control in &controls {
println!("Actor {} alive: {}", control.identity(), control.is_alive());
}
// Stop all actors gracefully
for control in &controls {
control.stop().await?;
}
}
Available Methods
| Method | Description |
|---|---|
identity() | Returns the actor’s unique Identity |
is_alive() | Checks if the actor is still running |
stop() | Gracefully stops the actor (async) |
kill() | Immediately terminates the actor |
downgrade() | Creates a Box<dyn WeakActorControl> |
clone_boxed() | Clones the control reference |
WeakActorControl — Weak Reference
WeakActorControl does not keep actors alive and requires upgrading before performing operations:
#![allow(unused)]
fn main() {
use rsactor::{WeakActorControl, ActorRef};
let weak_controls: Vec<Box<dyn WeakActorControl>> = vec![
ActorRef::downgrade(&worker_ref).into(),
ActorRef::downgrade(&logger_ref).into(),
];
for control in &weak_controls {
println!("Actor {} might be alive: {}", control.identity(), control.is_alive());
if let Some(strong) = control.upgrade() {
strong.stop().await?;
}
}
}
Conversion
ActorControl is automatically implemented for all ActorRef<T> types:
#![allow(unused)]
fn main() {
// From ownership transfer
let control: Box<dyn ActorControl> = actor_ref.into();
// From reference (clones)
let control: Box<dyn ActorControl> = (&actor_ref).into();
// Similarly for weak references
let weak: Box<dyn WeakActorControl> = ActorRef::downgrade(&actor_ref).into();
}
Combining with Handler Traits
Handler traits (TellHandler, AskHandler) also provide access to ActorControl via as_control():
#![allow(unused)]
fn main() {
let handler: Box<dyn TellHandler<PingMsg>> = (&actor_ref).into();
// Access lifecycle through handler
let control = handler.as_control();
if control.is_alive() {
handler.tell(PingMsg).await?;
}
}
Weak References
ActorWeak<T> is a weak reference to an actor that does not prevent the actor from being dropped. It is used to avoid circular references and memory leaks in complex actor graphs.
Creating Weak References
Weak references are created by calling ActorRef::downgrade():
#![allow(unused)]
fn main() {
use rsactor::{ActorRef, ActorWeak};
let weak_ref: ActorWeak<MyActor> = ActorRef::downgrade(&actor_ref);
}
Upgrading to Strong References
A weak reference can be upgraded back to a strong ActorRef if the actor is still alive:
#![allow(unused)]
fn main() {
if let Some(strong_ref) = weak_ref.upgrade() {
// Actor is still alive, send a message
strong_ref.tell(MyMessage).await?;
} else {
// Actor has been dropped
println!("Actor is no longer alive");
}
}
Checking Liveness
#![allow(unused)]
fn main() {
// Heuristic check — may return true even if upgrade would fail (race condition)
if weak_ref.is_alive() {
// Actor might be alive, but always use upgrade() for certainty
}
// The identity is always available, even after the actor is dropped
println!("Actor ID: {}", weak_ref.identity());
}
Use in Lifecycle Methods
on_run and on_stop receive ActorWeak instead of ActorRef. This prevents the actor from holding a strong reference to itself, which would prevent graceful shutdown:
#![allow(unused)]
fn main() {
impl Actor for MyActor {
// ...
async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
// Use weak reference for identity/logging
println!("Actor {} processing", actor_weak.identity());
// Upgrade if you need to send messages to self
if let Some(strong) = actor_weak.upgrade() {
// Use strong_ref for operations that need ActorRef
}
Ok(false)
}
async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
println!("Actor {} stopping (killed: {})", actor_weak.identity(), killed);
Ok(())
}
}
}
Common Patterns
Avoiding Circular References
When actors reference each other, use weak references to break cycles:
#![allow(unused)]
fn main() {
struct ParentActor {
children: Vec<ActorRef<ChildActor>>, // Strong refs — parent owns children
}
struct ChildActor {
parent: ActorWeak<ParentActor>, // Weak ref — child doesn't keep parent alive
}
}
Observer Pattern
Use weak references to observers so they don’t prevent the subject from being dropped:
#![allow(unused)]
fn main() {
struct EventBus {
listeners: Vec<ActorWeak<ListenerActor>>,
}
impl EventBus {
async fn notify_all(&mut self, event: Event) {
// Retain only alive listeners
self.listeners.retain(|weak| weak.is_alive());
for weak in &self.listeners {
if let Some(strong) = weak.upgrade() {
let _ = strong.tell(event.clone()).await;
}
}
}
}
}
Running the Example
cargo run --example weak_reference_demo
Metrics
rsActor provides optional per-actor performance metrics. Enable with the metrics feature flag:
[dependencies]
rsactor = { version = "0.14", features = ["metrics"] }
Tracked Metrics
| Metric | Type | Description |
|---|---|---|
message_count | u64 | Total messages processed |
avg_processing_time | Duration | Average message processing time |
max_processing_time | Duration | Maximum processing time observed |
error_count | u64 | Total errors during message handling |
uptime | Duration | Time since actor was started |
last_activity | Option<SystemTime> | Timestamp of last message processing |
Usage
Snapshot
Get all metrics at once:
#![allow(unused)]
fn main() {
let metrics = actor_ref.metrics();
println!("Messages: {}", metrics.message_count);
println!("Avg time: {:?}", metrics.avg_processing_time);
println!("Max time: {:?}", metrics.max_processing_time);
println!("Errors: {}", metrics.error_count);
println!("Uptime: {:?}", metrics.uptime);
println!("Last activity: {:?}", metrics.last_activity);
}
Individual Accessors
#![allow(unused)]
fn main() {
let count = actor_ref.message_count();
let avg = actor_ref.avg_processing_time();
let max = actor_ref.max_processing_time();
let errors = actor_ref.error_count();
let uptime = actor_ref.uptime();
let last = actor_ref.last_activity();
}
Design
- Lock-free: Uses
AtomicU64counters for zero-contention updates - Zero overhead when disabled: When the
metricsfeature is not enabled, no metrics code is compiled - Automatic collection: Metrics are collected transparently during message processing
- Post-mortem analysis: Metrics survive actor drop via
ActorWeak(strong reference to metrics collector)
Philosophy
rsActor exposes raw metrics data. How you monitor, alert, or visualize this data is your responsibility. You can integrate with any monitoring system (Prometheus, Grafana, custom dashboards, etc.).
Running the Example
cargo run --example metrics_demo --features metrics
Dead Letter Tracking
Dead letters are messages that could not be delivered to their intended recipients. rsActor automatically tracks and logs all dead letters via structured tracing.
When Dead Letters Occur
| Reason | Description | Example |
|---|---|---|
ActorStopped | Actor’s mailbox channel closed | Sending to a stopped actor |
Timeout | Send or ask operation exceeded timeout | tell_with_timeout expired |
ReplyDropped | Reply channel dropped before response | Actor crashed during ask processing |
Observability
Dead letters are always logged as structured tracing::warn! events:
WARN dead_letter: Dead letter: message could not be delivered
actor.id=42
actor.type_name="MyActor"
message.type_name="PingMessage"
dead_letter.reason="actor stopped"
dead_letter.operation="tell"
To see these logs, initialize a tracing subscriber:
#![allow(unused)]
fn main() {
tracing_subscriber::fmt()
.with_env_filter("rsactor=warn")
.init();
}
Performance
Dead letter recording has minimal overhead:
| Scenario | Overhead |
|---|---|
| Successful message delivery (hot path) | Zero — no code executes |
| Dead letter, no tracing subscriber | ~5-50 ns |
| Dead letter, subscriber active | ~1-10 us |
The record function is marked #[cold] to optimize the hot path.
Testing with test-utils
Enable the test-utils feature to count dead letters in tests:
[dev-dependencies]
rsactor = { version = "0.14", features = ["test-utils"] }
#![allow(unused)]
fn main() {
use rsactor::{dead_letter_count, reset_dead_letter_count};
#[tokio::test]
async fn test_dead_letters() {
reset_dead_letter_count();
let (actor_ref, handle) = spawn::<MyActor>(args);
actor_ref.stop().await.unwrap();
handle.await.unwrap();
// Sending to a stopped actor creates a dead letter
let _ = actor_ref.tell(MyMessage).await;
assert_eq!(dead_letter_count(), 1);
}
}
Warning: Never enable test-utils in production builds. It exposes internal metrics that could be misused.
Error Integration
Dead letter tracking works seamlessly with rsActor’s error types:
#![allow(unused)]
fn main() {
match actor_ref.tell(msg).await {
Ok(_) => { /* delivered successfully */ }
Err(rsactor::Error::Send { .. }) => {
// Dead letter was automatically logged
// Error contains actor identity and details
}
Err(rsactor::Error::Timeout { .. }) => {
// Dead letter recorded with Timeout reason
// Check if error is retryable
if e.is_retryable() {
// Retry logic
}
}
_ => {}
}
}
Tracing & Observability
rsActor provides comprehensive observability through the tracing crate.
Architecture
Tracing in rsActor has two layers:
-
Core logging (always available):
tracing::warn!,tracing::error!,tracing::debug!for lifecycle events, dead letters, and errors. Thetracingcrate is a required dependency. -
Instrumentation spans (opt-in via
tracingfeature):#[tracing::instrument]attributes that create structured spans with timing and context for actor lifecycle and message processing.
Enabling Instrumentation
[dependencies]
rsactor = { version = "0.14", features = ["tracing"] }
tracing = "0.1"
tracing-subscriber = "0.3"
What Gets Traced
Always Available (Core Logging)
- Dead letter warnings with structured fields
- Actor
on_startsuccess/failure - Actor
on_stoperrors on_runerrors- Reply channel failures
With tracing Feature (Instrumentation Spans)
| Span | Fields | Description |
|---|---|---|
actor_lifecycle | actor_id, actor_type | Entire actor lifecycle |
actor_on_start | — | Initialization phase |
actor_on_run | — | Each idle handler invocation |
actor_on_stop | killed | Shutdown phase |
actor_process_message | — | Each message processing |
actor_tell | actor_id, message_type | Tell operation |
actor_ask | actor_id, message_type, reply_type | Ask operation |
actor_tell_with_timeout | actor_id, message_type, timeout_ms | Tell with timeout |
actor_ask_with_timeout | actor_id, message_type, reply_type, timeout_ms | Ask with timeout |
actor_kill | actor_id | Kill signal |
actor_stop | actor_id | Stop signal |
Setup Example
use tracing_subscriber;
#[tokio::main]
async fn main() {
// Simple setup
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
// Your actor code here...
}
Running with Tracing
# Basic logging (always works)
RUST_LOG=debug cargo run --example basic
# With instrumentation spans
RUST_LOG=debug cargo run --example tracing_demo --features tracing
Feature Flag Behavior
| Configuration | Core Logging | Instrumentation Spans |
|---|---|---|
| Default | Available | Disabled |
features = ["tracing"] | Available | Enabled |
The tracing feature only controls #[tracing::instrument] attributes. Core logging via tracing::warn!, tracing::error!, etc. is always available regardless of feature flags.
Running the Example
RUST_LOG=debug cargo run --example tracing_demo --features tracing
Deadlock Detection
rsActor provides runtime deadlock detection that catches circular ask dependencies before they cause your application to hang.
Why Deadlocks Happen
Each actor processes messages sequentially in a single loop. When an actor calls ask, it pauses its loop to wait for the reply. If Actor A asks Actor B, and B simultaneously asks A, both loops are paused — neither can process the other’s request.
Actor A handler: actor_ref_b.ask(msg).await ← A's loop paused, waiting for B
Actor B handler: actor_ref_a.ask(msg).await ← B's loop paused, waiting for A
→ Both wait forever = deadlock
This can also happen with:
- Self-ask: An actor asking itself
- Indirect chains: A → B → C → A
Enabling Detection
Add the deadlock-detection feature:
[dependencies]
rsactor = { version = "0.14", features = ["deadlock-detection"] }
When disabled, all detection code is removed at compile time — zero overhead in production.
Auto-Enabling in Debug Builds
Cargo does not support enabling features based on build profile (debug vs release). Here are practical ways to keep detection active during development while excluding it from release builds.
Cargo Aliases (Recommended)
Define aliases in .cargo/config.toml:
# .cargo/config.toml
[alias]
dev-run = "run --features deadlock-detection"
dev-test = "test --features deadlock-detection"
cargo dev-run # debug + deadlock detection
cargo dev-test # test + deadlock detection
cargo run --release # release, no overhead
Makefile / justfile
Centralize dev commands with the feature flag included:
# Makefile
run:
cargo run --features deadlock-detection
test:
cargo test --features deadlock-detection
release:
cargo build --release
build.rs Guardrail
For a compile-time reminder, add a build.rs that detects debug profile and a compile_error! that fires when the feature is missing:
# Cargo.toml
[features]
deadlock-detection = ["rsactor/deadlock-detection"]
// build.rs
fn main() {
let profile = std::env::var("PROFILE").unwrap();
println!("cargo::rustc-check-cfg=cfg(debug_build)");
if profile == "debug" {
println!("cargo:rustc-cfg=debug_build");
}
}
#![allow(unused)]
fn main() {
// main.rs — reminds you to enable the feature in debug
#[cfg(all(debug_build, not(feature = "deadlock-detection")))]
compile_error!(
"Enable deadlock detection in debug builds: \
cargo run --features deadlock-detection"
);
}
Note:
build.rscannot activate Cargo features — it can only setcfgflags. Thecompile_error!approach acts as a guardrail that reminds developers to pass the feature flag.
For full details, see the Deadlock Detection Guide.
How It Works
The framework maintains a global wait-for graph:
- Before each
ask, an edgecaller → calleeis registered - The graph is checked for cycles
- If a cycle exists, the framework panics immediately with a descriptive message
- When
askcompletes, the edge is automatically removed
1. A asks B → graph: {A → B} → no cycle → proceed
2. B asks A → graph: {A → B, B → A} → cycle detected! → panic!
Panic Message
Deadlocks are design errors, so the framework panics (like an index out of bounds) rather than returning an error:
Deadlock detected: ask cycle MyActorA(#1) -> MyActorB(#2) -> MyActorA(#1)
This is a design error. Use `tell` to break the cycle,
or restructure actor dependencies.
The message includes actor type names and IDs, making it easy to identify the problematic interaction.
Fixing Deadlocks
Option 1: Replace ask with tell
If one direction doesn’t need a synchronous reply, use tell (fire-and-forget):
#![allow(unused)]
fn main() {
// Before: A and B ask each other (deadlock risk)
#[handler]
async fn handle_request(&mut self, msg: Request, _: &ActorRef<Self>) -> Response {
let result = self.other_actor.ask(Query).await?;
Response(result)
}
// After: A tells B, no waiting (deadlock-free)
#[handler]
async fn handle_request(&mut self, msg: Request, _: &ActorRef<Self>) {
self.other_actor.tell(Notify { data: msg.data }).await?;
}
}
Option 2: Restructure Dependencies
Eliminate cycles by introducing a mediator:
Before: A ↔ B (bidirectional ask = cycle risk)
After: A → Mediator ← B (no cycles possible)
Option 3: Use Callbacks
Pass a oneshot::Sender via tell instead of using ask:
#![allow(unused)]
fn main() {
use tokio::sync::oneshot;
struct RequestWithCallback {
data: String,
reply_tx: oneshot::Sender<Response>,
}
// Send via tell with a reply channel
let (tx, rx) = oneshot::channel();
actor_b.tell(RequestWithCallback { data, reply_tx: tx }).await?;
let response = rx.await?;
}
What Gets Detected
| Scenario | Detected? |
|---|---|
| Self-ask (A → A) | Yes |
| 2-actor cycle (A → B → A) | Yes |
| N-actor chain (A → B → C → A) | Yes |
| Sequential asks (A → B, then A → C) | No false positive |
| Non-actor caller (main → A) | Skipped (safe) |
Limitations
blocking_ask: Not tracked, as it’s designed for non-actor contexts that can’t form cycles.- Concurrent asks: Using
tokio::join!to send multipleaskcalls from one handler may miss some cycles, since the graph stores one edge per caller. tokio::spawnin handlers: Spawned tasks don’t inherit the actor identity, so theiraskcalls aren’t tracked.
Performance
| Condition | Overhead |
|---|---|
| Feature disabled | Zero (compile-time removal) |
Feature enabled, normal ask | ~100-500 ns per call |
The wait-for graph only contains in-flight ask calls, so it stays small and traversal is fast.
Examples
This section provides practical examples demonstrating various features and patterns of the rsActor framework.
Available Examples
Core Patterns
- Basic Usage — Simple counter actor with tell and ask
- Async Worker — Actor performing periodic async work
- Blocking Task — CPU-intensive work in blocking contexts
- Actor with Timeout — Timeout handling in actor communication
Advanced Patterns
- Kill Demo — Graceful stop vs immediate kill, ActorResult inspection
- Ask Join — ask_join pattern with JoinHandle
- Handler Demo — Handler traits for type-erased heterogeneous collections
- Weak References — ActorWeak for breaking circular references
- Metrics — Per-actor performance metrics
- Tracing — Structured tracing and observability
Classic Problems
- Dining Philosophers — Classic concurrency problem solved with actors
Running Examples
All examples are in the examples/ directory:
# Core examples
cargo run --example basic
cargo run --example actor_async_worker
cargo run --example actor_blocking_task
# Advanced examples
cargo run --example kill_demo
cargo run --example ask_join_demo
cargo run --example handler_demo
cargo run --example weak_reference_demo
# Feature-gated examples
cargo run --example metrics_demo --features metrics
cargo run --example tracing_demo --features tracing
Example Structure
Each example follows this pattern:
- Actor Definition: Define the actor struct and implement the
Actortrait (or use#[derive(Actor)]) - Message Types: Define messages that the actor can handle
- Message Handlers: Use
#[message_handlers]with#[handler]attributes - Usage: Demonstrate spawning, messaging, and lifecycle management
Basic Usage
This example demonstrates the fundamental concepts of rsActor through a simple counter actor that maintains internal state and processes messages.
Overview
The basic example shows:
- How to define an actor with internal state
- Implementing the
Actortrait with lifecycle methods - Creating and handling different message types
- Using the
#[message_handlers]macro - Spawning actors and managing their lifecycle
- Periodic tasks within actors using
on_run
Code Example
use anyhow::Result;
use rsactor::{message_handlers, Actor, ActorRef, ActorWeak};
use tokio::time::{interval, Duration};
use tracing::info;
// Message types
struct Increment; // Message to increment the actor's counter
struct Decrement; // Message to decrement the actor's counter
// Define the actor struct
struct MyActor {
count: u32, // Internal state of the actor
start_up: std::time::Instant, // Track the start time
tick_300ms: tokio::time::Interval, // Interval for 300ms ticks
tick_1s: tokio::time::Interval, // Interval for 1s ticks
}
// Implement the Actor trait for MyActor
impl Actor for MyActor {
type Args = Self;
type Error = anyhow::Error;
// Called when the actor is started
async fn on_start(args: Self::Args, _actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!("MyActor started. Initial count: {}.", args.count);
Ok(args)
}
// Called repeatedly when the message queue is empty (idle handler).
// Returns Ok(true) to continue calling on_run, Ok(false) to stop idle processing.
// Note: receives &ActorWeak<Self>, not &ActorRef<Self>.
async fn on_run(&mut self, _actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
// Use tokio::select! to handle multiple async operations
tokio::select! {
_ = self.tick_300ms.tick() => {
println!("300ms tick. Elapsed: {:?}", self.start_up.elapsed());
}
_ = self.tick_1s.tick() => {
println!("1s tick. Elapsed: {:?}", self.start_up.elapsed());
}
}
Ok(true) // Continue calling on_run
}
}
// Message handling using the #[message_handlers] macro with #[handler] attributes
#[message_handlers]
impl MyActor {
#[handler]
async fn handle_increment(&mut self, _msg: Increment, _: &ActorRef<Self>) -> u32 {
self.count += 1;
println!("MyActor handled Increment. Count is now {}.", self.count);
self.count
}
#[handler]
async fn handle_decrement(&mut self, _msg: Decrement, _: &ActorRef<Self>) -> u32 {
self.count -= 1;
println!("MyActor handled Decrement. Count is now {}.", self.count);
self.count
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
// Create actor instance with initial state
let my_actor = MyActor {
count: 100,
start_up: std::time::Instant::now(),
tick_300ms: interval(Duration::from_millis(300)),
tick_1s: interval(Duration::from_secs(1)),
};
// Spawn the actor
let (actor_ref, join_handle) = rsactor::spawn::<MyActor>(my_actor);
println!("MyActor spawned with ID: {}", actor_ref.identity());
// Wait a bit to see some ticks
tokio::time::sleep(Duration::from_millis(700)).await;
// Send messages and await replies
println!("Sending Increment message...");
let count_after_inc: u32 = actor_ref.ask(Increment).await?;
println!("Reply after Increment: {}", count_after_inc);
println!("Sending Decrement message...");
let count_after_dec: u32 = actor_ref.ask(Decrement).await?;
println!("Reply after Decrement: {}", count_after_dec);
// Wait for more ticks
tokio::time::sleep(Duration::from_millis(700)).await;
// Stop the actor gracefully
println!("Stopping actor...");
actor_ref.stop().await?;
// Wait for completion and check result
let result = join_handle.await?;
match result {
rsactor::ActorResult::Completed { actor, killed } => {
println!("Actor completed. Final count: {}. Killed: {}",
actor.count, killed);
}
rsactor::ActorResult::Failed { actor, error, phase, killed } => {
println!("Actor failed: {}. Phase: {:?}, Killed: {}",
error, phase, killed);
if let Some(actor) = actor {
println!("Final count: {}", actor.count);
}
}
}
Ok(())
}
Key Concepts Demonstrated
1. Actor State Management
The MyActor struct encapsulates:
count: The main business statestart_up: Timing informationtick_300msandtick_1s: Periodic timers
2. Lifecycle Methods
on_start: Initializes the actor when spawnedon_run: Runs continuously, handling periodic tasks withtokio::select!
3. Message Handling
IncrementandDecrementmessages modify the actor’s state- Each message handler returns the new count value
- Messages are processed sequentially, ensuring thread safety
4. Actor Communication
ask: Send a message and wait for a replytell: Send a message without waiting (fire-and-forget)stop: Gracefully shutdown the actor
5. Error Handling
The example demonstrates proper error handling throughout the actor lifecycle and shows how to access the final actor state after completion.
Running the Example
cargo run --example basic
You’ll see output showing the periodic ticks, message processing, and graceful shutdown.
Next Steps
This basic example forms the foundation for more complex actor patterns. Next, explore:
- Async Worker for I/O-intensive tasks
- Blocking Task for CPU-intensive operations
- Actor with Timeout for timeout handling
Async Worker
This example demonstrates a more complex actor interaction pattern where one actor (Requester) delegates asynchronous work to another actor (Worker). The Worker actor spawns async tasks to process requests and sends results back to the Requester.
Overview
The async worker pattern is useful when you need:
- Work delegation between actors
- Parallel processing of multiple requests
- Async task management within actors
- Bidirectional communication between actors
Architecture
RequesterActor ──[RequestWork]──> WorkerActor
↑ │
│ ↓
└──[WorkResult]──────── [spawns async task]
Key Components
- RequesterActor: Initiates work requests and handles results
- WorkerActor: Receives requests, spawns async tasks, and sends back results
- Async Tasks: Background tasks that perform the actual work
Implementation
Message Types
#![allow(unused)]
fn main() {
// Message to request work from Worker
struct RequestWork {
task_id: usize,
data: String,
}
// Message containing work results
struct WorkResult {
task_id: usize,
result: String,
}
}
RequesterActor
#![allow(unused)]
fn main() {
struct RequesterActor {
worker_ref: ActorRef<WorkerActor>,
received_results: Vec<String>,
}
impl Actor for RequesterActor {
type Args = ActorRef<WorkerActor>;
type Error = anyhow::Error;
async fn on_start(args: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
println!("RequesterActor started");
Ok(RequesterActor {
worker_ref: args,
received_results: Vec::new(),
})
}
}
impl Message<RequestWork> for RequesterActor {
type Reply = ();
async fn handle(&mut self, msg: RequestWork, actor_ref: &ActorRef<Self>) -> Self::Reply {
println!("RequesterActor sending work request for task {}", msg.task_id);
// Send request to the worker actor
let requester = actor_ref.clone();
let work_msg = ProcessTask {
task_id: msg.task_id,
data: msg.data,
callback: requester,
};
if let Err(e) = self.worker_ref.tell(work_msg).await {
eprintln!("Failed to send work to worker: {}", e);
}
}
}
impl Message<WorkResult> for RequesterActor {
type Reply = ();
async fn handle(&mut self, msg: WorkResult, _: &ActorRef<Self>) -> Self::Reply {
println!("RequesterActor received result for task {}: {}",
msg.task_id, msg.result);
self.received_results.push(msg.result);
}
}
}
WorkerActor
#![allow(unused)]
fn main() {
struct WorkerActor {
active_tasks: usize,
}
impl Actor for WorkerActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
println!("WorkerActor started");
Ok(WorkerActor { active_tasks: 0 })
}
}
impl Message<ProcessTask> for WorkerActor {
type Reply = ();
async fn handle(&mut self, msg: ProcessTask, _: &ActorRef<Self>) -> Self::Reply {
self.active_tasks += 1;
println!("WorkerActor processing task {} (active: {})",
msg.task_id, self.active_tasks);
let task_id = msg.task_id;
let data = msg.data;
let callback = msg.callback;
// Spawn an async task to do the work
tokio::spawn(async move {
// Simulate some async work (e.g., network request, database query)
tokio::time::sleep(Duration::from_millis(100 + task_id as u64 * 50)).await;
let result = format!("Processed: {}", data);
println!("Task {} completed with result: {}", task_id, result);
// Send result back to requester
let work_result = WorkResult { task_id, result };
if let Err(e) = callback.tell(work_result).await {
eprintln!("Failed to send result back: {}", e);
}
});
self.active_tasks -= 1;
}
}
}
Key Patterns Demonstrated
1. Actor-to-Actor Communication
- Requester sends work requests to Worker
- Worker sends results back to Requester
- Bidirectional message flow
2. Async Task Spawning
- Worker spawns
tokio::spawntasks for parallel processing - Tasks run independently of the actor’s message processing loop
- Results are sent back via actor references
3. Callback Pattern
- Work requests include a callback reference (ActorRef)
- Spawned tasks use the callback to return results
- Enables loose coupling between work requesters and processors
4. State Management
- Each actor maintains its own state independently
- Worker tracks active tasks
- Requester accumulates results
Benefits
Scalability
- Multiple work requests can be processed in parallel
- Worker actor doesn’t block while tasks are running
- Easy to scale by adding more worker actors
Fault Isolation
- Failed tasks don’t crash the actors
- Actor supervision can restart failed workers
- Error handling is isolated per task
Resource Management
- Tasks are managed by Tokio’s runtime
- Automatic cleanup when tasks complete
- Can implement backpressure by limiting concurrent tasks
Usage Example
#[tokio::main]
async fn main() -> Result<()> {
// Spawn worker actor
let (worker_ref, worker_handle) = rsactor::spawn::<WorkerActor>(());
// Spawn requester actor with worker reference
let (requester_ref, requester_handle) = rsactor::spawn::<RequesterActor>(worker_ref);
// Send multiple work requests
for i in 0..5 {
let request = RequestWork {
task_id: i,
data: format!("Task data {}", i),
};
requester_ref.tell(request).await?;
// Small delay between requests
tokio::time::sleep(Duration::from_millis(50)).await;
}
// Let the work complete
tokio::time::sleep(Duration::from_secs(2)).await;
// Cleanup
requester_ref.stop().await?;
worker_ref.stop().await?;
Ok(())
}
Running the Example
cargo run --example actor_async_worker
You’ll see output showing:
- Work requests being sent
- Tasks being processed in parallel
- Results being received back
- Task completion timing
When to Use This Pattern
- API Gateways: Distribute requests to worker services
- Data Processing: Parallel processing of data batches
- I/O Operations: Handle multiple network/database requests
- Background Jobs: Queue and process background tasks
- Microservices: Inter-service communication patterns
This pattern demonstrates how actors can coordinate complex asynchronous workflows while maintaining clean separation of concerns and fault tolerance.
Blocking Task
This example demonstrates how actors can interact with CPU-intensive or blocking operations without blocking the async runtime. It shows the proper way to integrate blocking tasks with the actor system using tokio::task::spawn_blocking and the blocking communication APIs.
Overview
The blocking task pattern is essential when you need to:
- CPU-intensive computations that would block the async executor
- Synchronous I/O operations with legacy APIs
- Bridge synchronous and asynchronous code
- Prevent blocking the main actor runtime
Key Concepts
Why Use Blocking Tasks?
Tokio’s async runtime is designed for I/O-bound operations. CPU-intensive or truly blocking operations can:
- Block the entire async executor
- Prevent other actors from processing messages
- Degrade overall system performance
Solution: spawn_blocking
Tokio provides spawn_blocking to run blocking operations on a dedicated thread pool, keeping the main async runtime responsive.
Architecture
Actor ←→ [tokio channels] ←→ Blocking Task
↑ ↓
└──[blocking API]──────────────┘
(ask_blocking/tell_blocking)
Implementation
Message Types
#![allow(unused)]
fn main() {
// Messages for actor communication
struct GetState;
struct SetFactor(f64);
struct ProcessedData {
value: f64,
timestamp: std::time::Instant,
}
// Commands for the blocking task
enum TaskCommand {
ChangeInterval(Duration),
Stop,
}
}
Actor with Blocking Task
#![allow(unused)]
fn main() {
struct SyncDataProcessorActor {
factor: f64,
latest_value: Option<f64>,
task_sender: Option<mpsc::UnboundedSender<TaskCommand>>,
task_handle: Option<task::JoinHandle<()>>,
}
impl Actor for SyncDataProcessorActor {
type Args = f64; // Initial factor
type Error = anyhow::Error;
async fn on_start(factor: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!("SyncDataProcessorActor started with factor: {}", factor);
// Create communication channels
let (task_sender, task_receiver) = mpsc::unbounded_channel();
// Spawn the blocking task
let actor_ref_clone = actor_ref.clone();
let task_handle = task::spawn_blocking(move || {
sync_background_task(factor, task_receiver, actor_ref_clone)
});
Ok(Self {
factor,
latest_value: None,
task_sender: Some(task_sender),
task_handle: Some(task_handle),
})
}
async fn on_stop(&mut self, _: &ActorRef<Self>) -> Result<(), Self::Error> {
info!("SyncDataProcessorActor stopping - sending stop command to background task");
// Signal the background task to stop
if let Some(sender) = &self.task_sender {
if sender.send(TaskCommand::Stop).is_err() {
debug!("Background task already stopped or receiver dropped");
}
}
// Wait for the background task to complete
if let Some(handle) = self.task_handle.take() {
match handle.await {
Ok(()) => info!("Background task completed successfully"),
Err(e) => info!("Background task completed with error: {}", e),
}
}
Ok(())
}
}
}
Blocking Task Implementation
#![allow(unused)]
fn main() {
fn sync_background_task(
mut factor: f64,
mut task_receiver: mpsc::UnboundedReceiver<TaskCommand>,
actor_ref: ActorRef<SyncDataProcessorActor>,
) {
info!("Sync background task started");
let mut interval = Duration::from_millis(500);
let mut counter = 0.0;
loop {
// Check for commands from the actor (non-blocking)
match task_receiver.try_recv() {
Ok(TaskCommand::ChangeInterval(new_interval)) => {
info!("Background task: changing interval to {:?}", new_interval);
interval = new_interval;
}
Ok(TaskCommand::Stop) => {
info!("Background task: received stop command");
break;
}
Err(mpsc::error::TryRecvError::Empty) => {
// No command available, continue with normal processing
}
Err(mpsc::error::TryRecvError::Disconnected) => {
info!("Background task: actor disconnected, stopping");
break;
}
}
// Simulate CPU-intensive work
counter += 1.0;
let processed_value = expensive_calculation(counter, factor);
// Send result back to actor using blocking API
let message = ProcessedData {
value: processed_value,
timestamp: std::time::Instant::now(),
};
if let Err(e) = actor_ref.tell_blocking(message) {
info!("Background task: failed to send data to actor: {}", e);
break;
}
// Sleep (blocking operation)
thread::sleep(interval);
}
info!("Sync background task finished");
}
fn expensive_calculation(input: f64, factor: f64) -> f64 {
// Simulate CPU-intensive calculation
let mut result = input * factor;
for _ in 0..1000000 {
result = result.sin().cos().tan();
}
result
}
}
Message Handlers
#![allow(unused)]
fn main() {
impl Message<ProcessedData> for SyncDataProcessorActor {
type Reply = ();
async fn handle(&mut self, msg: ProcessedData, _: &ActorRef<Self>) -> Self::Reply {
debug!("Actor received processed data: value={:.6}, timestamp={:?}",
msg.value, msg.timestamp);
self.latest_value = Some(msg.value);
}
}
impl Message<SetFactor> for SyncDataProcessorActor {
type Reply = f64; // Return the new factor
async fn handle(&mut self, msg: SetFactor, _: &ActorRef<Self>) -> Self::Reply {
info!("Actor: changing factor from {} to {}", self.factor, msg.0.0);
self.factor = msg.0.0;
// Could send a command to the background task if needed
// if let Some(sender) = &self.task_sender {
// sender.send(TaskCommand::UpdateFactor(self.factor)).ok();
// }
self.factor
}
}
impl Message<GetState> for SyncDataProcessorActor {
type Reply = (f64, Option<f64>); // (factor, latest_value)
async fn handle(&mut self, _: GetState, _: &ActorRef<Self>) -> Self::Reply {
(self.factor, self.latest_value)
}
}
}
Key Patterns
1. Proper Blocking API Usage
- Use
tell_blockingandask_blockingonly withinspawn_blockingtasks - These APIs are designed for Tokio’s blocking thread pool
- NOT for use in
std::thread::spawnor general sync code
2. Communication Channels
- Use
tokio::sync::mpscfor actor → blocking task communication - Use actor messages for blocking task → actor communication
- Separate concerns: commands vs. data
3. Lifecycle Management
- Spawn blocking tasks in
on_start - Clean up in
on_stopby sending stop commands - Await task completion to ensure proper cleanup
4. Error Handling
- Handle channel disconnections gracefully
- Propagate errors appropriately between sync and async contexts
- Use try_recv for non-blocking command checking
Benefits
Runtime Protection
- Blocking operations don’t block the async executor
- Other actors continue processing messages
- Maintains system responsiveness
Resource Management
- Tokio manages the blocking thread pool
- Automatic scaling based on workload
- Proper cleanup when actors stop
Flexibility
- Integrate legacy synchronous code
- Handle CPU-intensive algorithms
- Bridge different execution models
Usage Example
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
// Spawn the actor with initial factor
let (actor_ref, join_handle) = rsactor::spawn::<SyncDataProcessorActor>(2.0);
// Let it run for a bit
tokio::time::sleep(Duration::from_secs(2)).await;
// Get current state
let (factor, latest) = actor_ref.ask(GetState).await?;
println!("Current state: factor={}, latest_value={:?}", factor, latest);
// Change the factor
let new_factor = actor_ref.ask(SetFactor(3.5)).await?;
println!("Changed factor to: {}", new_factor);
// Let it run with new factor
tokio::time::sleep(Duration::from_secs(2)).await;
// Stop the actor
actor_ref.stop().await?;
let result = join_handle.await?;
println!("Actor stopped: {:?}", result);
Ok(())
}
Running the Example
cargo run --example actor_blocking_task
Output shows:
- Background task processing data continuously
- Actor receiving processed values
- Factor changes affecting calculations
- Proper cleanup on shutdown
When to Use This Pattern
- Mathematical Computations: Heavy algorithms, simulations
- Legacy Integration: Wrapping synchronous libraries
- File Processing: Large file operations, compression
- Database Operations: Synchronous database drivers
- System Calls: Direct OS interactions
This pattern enables seamless integration of blocking operations while maintaining the benefits of the actor model and async runtime efficiency.
Actor with Timeout
This example demonstrates how to handle timeouts in actor communication using ask_with_timeout and tell_with_timeout. Proper timeout handling is crucial for building resilient actor systems that can gracefully handle slow or unresponsive actors.
Overview
Timeout handling addresses several important scenarios:
- Slow operations that might take longer than expected
- Unresponsive actors due to blocking or infinite loops
- Network delays in distributed systems
- Resource contention causing processing delays
- Graceful degradation when services are overloaded
Key Methods
rsActor provides timeout-enabled communication methods:
#![allow(unused)]
fn main() {
// Request with timeout - fails if no response within duration
let result = actor_ref.ask_with_timeout(message, Duration::from_secs(5)).await;
// Fire-and-forget with timeout - fails if message can't be sent within duration
actor_ref.tell_with_timeout(message, Duration::from_millis(100)).await;
}
Implementation
Actor Definition
#![allow(unused)]
fn main() {
struct TimeoutDemoActor {
name: String,
}
impl Actor for TimeoutDemoActor {
type Args = String;
type Error = anyhow::Error;
async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!("{} actor (id: {}) started", args, actor_ref.identity());
Ok(Self { name: args })
}
}
}
Message Types with Different Response Times
#![allow(unused)]
fn main() {
// Fast response message
struct FastQuery(String);
// Slow response message (simulates long processing)
struct SlowQuery(String);
// Configurable delay message
struct ConfigurableQuery {
question: String,
delay_ms: u64,
}
}
Message Handlers
#![allow(unused)]
fn main() {
impl Message<FastQuery> for TimeoutDemoActor {
type Reply = String;
async fn handle(&mut self, msg: FastQuery, _: &ActorRef<Self>) -> Self::Reply {
debug!("{} handling a FastQuery: {}", self.name, msg.0);
// Immediate response
format!("Fast response to: {}", msg.0)
}
}
impl Message<SlowQuery> for TimeoutDemoActor {
type Reply = String;
async fn handle(&mut self, msg: SlowQuery, _: &ActorRef<Self>) -> Self::Reply {
debug!("{} handling a SlowQuery: {}", self.name, msg.0);
// Simulate slow processing
tokio::time::sleep(Duration::from_secs(3)).await;
format!("Slow response to: {}", msg.0)
}
}
impl Message<ConfigurableQuery> for TimeoutDemoActor {
type Reply = String;
async fn handle(&mut self, msg: ConfigurableQuery, _: &ActorRef<Self>) -> Self::Reply {
debug!("{} handling ConfigurableQuery with {}ms delay",
self.name, msg.delay_ms);
// Configurable delay
tokio::time::sleep(Duration::from_millis(msg.delay_ms)).await;
format!("Response after {}ms to: {}", msg.delay_ms, msg.question)
}
}
// Wire up message handlers
impl_message_handler!(TimeoutDemoActor, [FastQuery, SlowQuery, ConfigurableQuery]);
}
Timeout Scenarios
1. Successful Fast Operation
#![allow(unused)]
fn main() {
// This should succeed quickly
let fast_result = actor_ref
.ask_with_timeout(FastQuery("Quick question".to_string()), Duration::from_secs(1))
.await;
match fast_result {
Ok(response) => println!("Fast query succeeded: {}", response),
Err(e) => println!("Fast query failed: {}", e),
}
}
2. Timeout on Slow Operation
#![allow(unused)]
fn main() {
// This will timeout because SlowQuery takes 3 seconds but we only wait 1 second
let slow_result = actor_ref
.ask_with_timeout(SlowQuery("Slow question".to_string()), Duration::from_secs(1))
.await;
match slow_result {
Ok(response) => println!("Slow query succeeded: {}", response),
Err(e) => println!("Slow query timed out: {}", e), // This will be printed
}
}
3. Configurable Timeout Testing
#![allow(unused)]
fn main() {
async fn test_configurable_timeouts(actor_ref: &ActorRef<TimeoutDemoActor>) -> Result<()> {
let test_cases = vec![
(100, 500), // 100ms delay, 500ms timeout - should succeed
(800, 500), // 800ms delay, 500ms timeout - should timeout
(200, 1000), // 200ms delay, 1000ms timeout - should succeed
];
for (delay_ms, timeout_ms) in test_cases {
let query = ConfigurableQuery {
question: format!("Test with {}ms delay", delay_ms),
delay_ms,
};
let result = actor_ref
.ask_with_timeout(query, Duration::from_millis(timeout_ms))
.await;
match result {
Ok(response) => {
println!("✅ Success ({}ms delay, {}ms timeout): {}",
delay_ms, timeout_ms, response);
}
Err(e) => {
println!("❌ Timeout ({}ms delay, {}ms timeout): {}",
delay_ms, timeout_ms, e);
}
}
}
Ok(())
}
}
Error Handling Patterns
1. Timeout vs Other Errors
#![allow(unused)]
fn main() {
match actor_ref.ask_with_timeout(message, timeout).await {
Ok(response) => {
// Handle successful response
println!("Received: {}", response);
}
Err(rsactor::Error::Timeout { identity, duration }) => {
// Handle timeout specifically
println!("Actor {} timed out after {:?}", identity, duration);
// Could implement retry logic, fallback, or circuit breaker
}
Err(other_error) => {
// Handle other errors (send failures, actor crashes, etc.)
println!("Communication error: {}", other_error);
}
}
}
2. Retry with Exponential Backoff
#![allow(unused)]
fn main() {
async fn retry_with_backoff<T>(
actor_ref: &ActorRef<TimeoutDemoActor>,
message: T,
max_retries: usize,
) -> Result<String>
where
T: Clone + Send + 'static,
TimeoutDemoActor: Message<T, Reply = String>,
{
let mut delay = Duration::from_millis(100);
for attempt in 0..max_retries {
match actor_ref.ask_with_timeout(message.clone(), delay * 2).await {
Ok(response) => return Ok(response),
Err(rsactor::Error::Timeout { .. }) if attempt < max_retries - 1 => {
println!("Attempt {} timed out, retrying...", attempt + 1);
tokio::time::sleep(delay).await;
delay *= 2; // Exponential backoff
}
Err(e) => return Err(e.into()),
}
}
anyhow::bail!("All retry attempts failed")
}
}
3. Circuit Breaker Pattern
#![allow(unused)]
fn main() {
struct CircuitBreaker {
failure_count: usize,
failure_threshold: usize,
last_failure: Option<std::time::Instant>,
reset_timeout: Duration,
}
impl CircuitBreaker {
fn new(failure_threshold: usize, reset_timeout: Duration) -> Self {
Self {
failure_count: 0,
failure_threshold,
last_failure: None,
reset_timeout,
}
}
fn should_attempt(&self) -> bool {
if self.failure_count < self.failure_threshold {
return true;
}
if let Some(last_failure) = self.last_failure {
last_failure.elapsed() > self.reset_timeout
} else {
true
}
}
fn on_success(&mut self) {
self.failure_count = 0;
self.last_failure = None;
}
fn on_failure(&mut self) {
self.failure_count += 1;
self.last_failure = Some(std::time::Instant::now());
}
}
}
Best Practices
1. Choose Appropriate Timeouts
- Fast operations: 100ms - 1s
- Network operations: 5s - 30s
- Database queries: 1s - 10s
- Heavy computations: 30s - 5min
2. Graceful Degradation
#![allow(unused)]
fn main() {
async fn get_user_data_with_fallback(
actor_ref: &ActorRef<UserActor>,
user_id: u64,
) -> UserData {
match actor_ref
.ask_with_timeout(GetUser(user_id), Duration::from_secs(2))
.await
{
Ok(user_data) => user_data,
Err(_) => {
// Return cached or default data
UserData::default_for_user(user_id)
}
}
}
}
3. Monitoring and Metrics
#![allow(unused)]
fn main() {
async fn monitored_ask<T, R>(
actor_ref: &ActorRef<T>,
message: impl Send + 'static,
timeout: Duration,
operation_name: &str,
) -> Result<R>
where
T: Actor + Message<impl Send + 'static, Reply = R>,
{
let start = std::time::Instant::now();
let result = actor_ref.ask_with_timeout(message, timeout).await;
let duration = start.elapsed();
match &result {
Ok(_) => {
metrics::histogram!("actor_request_duration", duration, "operation" => operation_name, "status" => "success");
}
Err(rsactor::Error::Timeout { .. }) => {
metrics::histogram!("actor_request_duration", duration, "operation" => operation_name, "status" => "timeout");
}
Err(_) => {
metrics::histogram!("actor_request_duration", duration, "operation" => operation_name, "status" => "error");
}
}
result.map_err(Into::into)
}
}
Running the Example
cargo run --example actor_with_timeout
You’ll see output demonstrating:
- Fast queries completing successfully
- Slow queries timing out
- Different timeout scenarios
- Error handling patterns
When to Use Timeouts
- User-facing operations that need responsive feedback
- Dependent services that might become unavailable
- Resource-intensive operations that could hang
- Network communications subject to delays
- Any operation where infinite waiting is unacceptable
Proper timeout handling is essential for building robust, responsive actor systems that gracefully handle real-world operational challenges.
Kill Demo
This example demonstrates the difference between graceful stop and immediate kill, and how to inspect ActorResult after termination.
Key Concepts
stop(): Graceful shutdown — allows current message processing to finish, then callson_stop(killed: false)kill(): Immediate termination — callson_stop(killed: true)without waiting for pending messagesActorResult: Inspect how the actor terminated (completed vs failed, stopped vs killed)
Code Walkthrough
Actor with on_stop
#![allow(unused)]
fn main() {
use rsactor::{message_handlers, Actor, ActorRef, ActorWeak};
#[derive(Debug)]
struct DemoActor {
name: String,
}
impl Actor for DemoActor {
type Args = String;
type Error = anyhow::Error;
async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
tracing::info!("DemoActor '{}' (id: {}) started!", args, actor_ref.identity());
Ok(DemoActor { name: args })
}
async fn on_stop(&mut self, actor_ref: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
let status = if killed { "killed" } else { "stopped" };
tracing::info!("DemoActor '{}' (id: {}) {}", self.name, actor_ref.identity(), status);
Ok(())
}
}
}
Inspecting ActorResult
#![allow(unused)]
fn main() {
let (actor_ref, join_handle) = rsactor::spawn::<DemoActor>("TestActor".to_string());
// Kill the actor
actor_ref.kill()?;
// Inspect the result
match join_handle.await? {
rsactor::ActorResult::Completed { actor, killed } => {
println!("Actor '{}' completed. Killed: {}", actor.name, killed);
}
rsactor::ActorResult::Failed { error, phase, killed, .. } => {
println!("Actor failed: {}. Phase: {:?}, Killed: {}", error, phase, killed);
}
}
}
Running
cargo run --example kill_demo
Ask Join Demo
This example demonstrates the ask_join pattern where message handlers spawn long-running tasks and return JoinHandle<T>, while callers use ask_join to automatically await task completion.
Key Concepts
ask_join: Sends a message, receives aJoinHandle<T>, and automatically awaits it- Non-blocking handlers: Handlers spawn
tokio::spawntasks instead of blocking the actor - Concurrency: The actor remains responsive while spawned tasks run in the background
Code Walkthrough
Handler returning JoinHandle
#![allow(unused)]
fn main() {
#[derive(Actor)]
struct WorkerActor {
task_counter: u32,
}
struct HeavyComputationTask {
id: u32,
duration_secs: u64,
multiplier: u32,
}
#[message_handlers]
impl WorkerActor {
#[handler]
async fn handle_heavy_computation(
&mut self,
msg: HeavyComputationTask,
_: &ActorRef<Self>,
) -> JoinHandle<u64> {
self.task_counter += 1;
let task_id = msg.id;
let multiplier = msg.multiplier;
let duration = Duration::from_secs(msg.duration_secs);
// Spawn task — actor remains free to process other messages
tokio::spawn(async move {
tokio::time::sleep(duration).await;
(task_id as u64) * (multiplier as u64)
})
}
}
}
ask vs ask_join
#![allow(unused)]
fn main() {
// ask() returns JoinHandle — you await it manually
let join_handle: JoinHandle<u64> = worker_ref.ask(task).await?;
let result = join_handle.await?;
// ask_join() does both steps automatically
let result: u64 = worker_ref.ask_join(task).await?;
}
Error Handling
ask_join returns Error::Join when the spawned task panics:
#![allow(unused)]
fn main() {
match worker_ref.ask_join(PanicTask).await {
Ok(result) => println!("Success: {}", result),
Err(rsactor::Error::Join { identity, source }) => {
println!("Task panicked on actor {}: {}", identity, source);
}
Err(e) => println!("Other error: {}", e),
}
}
Running
cargo run --example ask_join_demo
Handler Demo
This example demonstrates how to use handler traits (TellHandler, AskHandler, WeakTellHandler, WeakAskHandler) to manage different actor types that handle the same message through a unified interface.
Key Concepts
TellHandler<M>: Type-erased fire-and-forget for any actor handling messageMAskHandler<M, R>: Type-erased request-response for any actor returningRfor messageMWeakTellHandler<M>/WeakAskHandler<M, R>: Weak reference variants that don’t keep actors alive- Heterogeneous collections: Store different actor types in a single
Vec
Code Walkthrough
Different actors, same messages
#![allow(unused)]
fn main() {
#[derive(Actor)]
struct CounterActor { name: String, count: u32 }
#[derive(Actor)]
struct LoggerActor { prefix: String, log_count: u32 }
// Both actors handle the same Ping and GetStatus messages
#[message_handlers]
impl CounterActor {
#[handler]
async fn handle_ping(&mut self, msg: Ping, _: &ActorRef<Self>) { ... }
#[handler]
async fn handle_get_status(&mut self, _: GetStatus, _: &ActorRef<Self>) -> Status { ... }
}
#[message_handlers]
impl LoggerActor {
#[handler]
async fn handle_ping(&mut self, msg: Ping, _: &ActorRef<Self>) { ... }
#[handler]
async fn handle_get_status(&mut self, _: GetStatus, _: &ActorRef<Self>) -> Status { ... }
}
}
TellHandler — fire-and-forget
#![allow(unused)]
fn main() {
let tell_handlers: Vec<Box<dyn TellHandler<Ping>>> = vec![
(&counter_actor).into(),
(&logger_actor).into(),
];
for handler in &tell_handlers {
handler.tell(Ping { timestamp: 1000 }).await?;
}
}
AskHandler — request-response
#![allow(unused)]
fn main() {
let ask_handlers: Vec<Box<dyn AskHandler<GetStatus, Status>>> = vec![
(&counter_actor).into(),
(&logger_actor).into(),
];
for handler in &ask_handlers {
let status = handler.ask(GetStatus).await?;
println!("{}: {} messages", status.name, status.message_count);
}
}
WeakTellHandler — upgrade pattern
#![allow(unused)]
fn main() {
let weak_handlers: Vec<Box<dyn WeakTellHandler<Ping>>> = vec![
ActorRef::downgrade(&counter_actor).into(),
ActorRef::downgrade(&logger_actor).into(),
];
for weak in &weak_handlers {
if let Some(strong) = weak.upgrade() {
strong.tell(Ping { timestamp: 3000 }).await?;
}
}
}
Downgrade strong to weak
#![allow(unused)]
fn main() {
let strong: Box<dyn TellHandler<Ping>> = (&counter_actor).into();
let weak: Box<dyn WeakTellHandler<Ping>> = strong.downgrade();
}
Running
cargo run --example handler_demo
Weak Reference Demo
This example demonstrates ActorWeak for weak references to actors — references that don’t prevent the actor from being dropped.
Key Concepts
ActorRef::downgrade(): Create a weak reference from a strong referenceweak_ref.upgrade(): Try to get a strong reference back (returnsNoneif actor is dead)weak_ref.is_alive(): Heuristic check if the actor might still be aliveweak_ref.identity(): Always available, even after the actor is dropped
Code Walkthrough
Creating and using weak references
#![allow(unused)]
fn main() {
let (actor_ref, join_handle) = spawn::<PingActor>("TestActor".to_string());
// Create a weak reference
let weak_ref = ActorRef::downgrade(&actor_ref);
println!("Weak ref is_alive: {}", weak_ref.is_alive());
// Upgrade and use
if let Some(strong_ref) = weak_ref.upgrade() {
let response: String = strong_ref.ask(Ping).await?;
println!("Response: {response}");
}
}
Behavior after dropping strong reference
#![allow(unused)]
fn main() {
drop(actor_ref);
tokio::time::sleep(Duration::from_millis(100)).await;
// Weak reference reflects the actor's state
println!("is_alive: {}", weak_ref.is_alive());
if let Some(strong) = weak_ref.upgrade() {
// Actor might still be running if other strong refs exist
let _ = strong.ask(Ping).await;
} else {
println!("Actor is no longer available");
}
}
Identity survives actor death
#![allow(unused)]
fn main() {
// Identity is always available, even after actor termination
println!("Actor ID: {}", weak_ref.identity());
}
Running
cargo run --example weak_reference_demo
Metrics Demo
This example demonstrates the per-actor metrics system for monitoring performance. Requires the metrics feature flag.
Key Concepts
- Lock-free counters: Metrics use
AtomicU64for zero-contention updates - Zero overhead when disabled: No metrics code compiles without the feature flag
- Automatic collection: Metrics are gathered transparently during message processing
- Snapshot API: Get all metrics at once or query individual values
Code Walkthrough
Getting a metrics snapshot
#![allow(unused)]
fn main() {
let (actor_ref, handle) = spawn::<WorkerActor>(actor);
// Send some messages
for _ in 0..10 {
actor_ref.tell(FastTask).await?;
}
// Get all metrics at once
let metrics = actor_ref.metrics();
println!("Message count: {}", metrics.message_count);
println!("Avg processing time: {:?}", metrics.avg_processing_time);
println!("Max processing time: {:?}", metrics.max_processing_time);
println!("Error count: {}", metrics.error_count);
println!("Uptime: {:?}", metrics.uptime);
println!("Last activity: {:?}", metrics.last_activity);
}
Individual accessors
#![allow(unused)]
fn main() {
let count = actor_ref.message_count();
let avg = actor_ref.avg_processing_time();
let max = actor_ref.max_processing_time();
let errors = actor_ref.error_count();
let uptime = actor_ref.uptime();
let last = actor_ref.last_activity();
}
Running
cargo run --example metrics_demo --features metrics
Without the metrics feature, the example still runs but skips metrics output.
Tracing Demo
This example demonstrates rsActor’s structured tracing and observability features.
Key Concepts
- Core logging: Always available via
tracing::warn!,tracing::error!, etc. - Instrumentation spans (opt-in):
#[tracing::instrument]attributes for timing and context - Structured fields: Actor IDs, message types, processing duration in every span
Code Walkthrough
Setup
#![allow(unused)]
fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
}
What gets traced
The example exercises multiple traced operations:
#![allow(unused)]
fn main() {
// ask — creates actor_ask span with actor_id and message_type fields
let response: String = actor_ref.ask(Ping).await?;
// tell — creates actor_tell span
actor_ref.tell(Increment).await?;
// ask_with_timeout — creates actor_ask_with_timeout span with timeout_ms
actor_ref.ask_with_timeout(SlowOperation(200), Duration::from_millis(150)).await;
// tell_with_timeout — creates actor_tell_with_timeout span
actor_ref.tell_with_timeout(Increment, Duration::from_millis(100)).await?;
// stop — creates actor_stop span
actor_ref.stop().await?;
}
Sample output
With RUST_LOG=debug:
DEBUG actor_lifecycle{actor_id=1 actor_type="TracingDemoActor"}: rsactor: Actor started
DEBUG actor_ask{actor_id=1 message_type="Ping" reply_type="String"}: rsactor: ask
DEBUG actor_process_message: rsactor: Processing message
DEBUG actor_stop{actor_id=1}: rsactor: stop
DEBUG actor_on_stop{killed=false}: rsactor: on_stop
Running
# With instrumentation spans
RUST_LOG=debug cargo run --example tracing_demo --features tracing
# Without instrumentation (core logging only)
RUST_LOG=debug cargo run --example tracing_demo
Dining Philosophers
The Dining Philosophers problem is a classic concurrency challenge that demonstrates resource sharing, deadlock prevention, and coordination between multiple actors. This example shows how rsActor can elegantly solve this problem using the actor model.
Problem Description
Five philosophers sit around a circular table with five forks. Each philosopher alternates between thinking and eating. To eat, a philosopher needs both the fork to their left and the fork to their right. The challenge is to design a solution that:
- Prevents deadlock (all philosophers waiting forever)
- Prevents starvation (ensuring each philosopher gets to eat)
- Maximizes concurrency (allows multiple philosophers to eat simultaneously when possible)
Solution Architecture
Our solution uses two types of actors:
1. Table Actor
- Role: Centralized resource manager for forks
- Responsibilities:
- Track which forks are available
- Grant/deny fork requests
- Handle fork releases
- Maintain philosopher registry
2. Philosopher Actors
- Role: Individual philosophers with their own thinking/eating logic
- Responsibilities:
- Think for random periods
- Request forks from the table when hungry
- Eat when both forks are acquired
- Release forks after eating
- Repeat the cycle
Key Components
Messages
#![allow(unused)]
fn main() {
// Fork management
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ForkSide {
Left,
Right,
}
// Messages to Table Actor
struct RegisterPhilosopher {
logical_id: usize,
philosopher_ref: ActorRef<Philosopher>,
}
struct RequestFork {
logical_id: usize,
side: ForkSide,
}
struct ReleaseFork {
logical_id: usize,
side: ForkSide,
}
// Messages to Philosopher Actor
struct StartThinking;
struct StartEating;
}
Philosopher Actor
#![allow(unused)]
fn main() {
#[derive(Debug)]
struct Philosopher {
id: usize,
name: String,
table_ref: ActorRef<Table>,
eat_count: u32,
has_left_fork: bool,
has_right_fork: bool,
}
impl Actor for Philosopher {
type Args = (usize, String, ActorRef<Table>);
type Error = anyhow::Error;
async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
let (id, name, table_ref) = args;
println!("Philosopher {} ({}) is joining the table.", id, name);
let philosopher = Self {
id, name: name.clone(), table_ref: table_ref.clone(),
eat_count: 0, has_left_fork: false, has_right_fork: false,
};
// Register with the table
table_ref.tell(RegisterPhilosopher {
logical_id: id,
philosopher_ref: actor_ref.clone(),
}).await?;
// Start thinking
actor_ref.tell(StartThinking).await?;
Ok(philosopher)
}
}
}
Table Actor
#![allow(unused)]
fn main() {
#[derive(Debug)]
struct Table {
philosophers: HashMap<usize, ActorRef<Philosopher>>,
fork_availability: Vec<bool>, // true = available, false = taken
}
impl Actor for Table {
type Args = usize; // Number of philosophers
type Error = anyhow::Error;
async fn on_start(num_philosophers: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(Self {
philosophers: HashMap::new(),
fork_availability: vec![true; num_philosophers],
})
}
}
}
Deadlock Prevention Strategy
The solution prevents deadlock through:
- Centralized Fork Management: The Table actor manages all forks, preventing race conditions
- Sequential Fork Acquisition: Philosophers request one fork at a time
- Fork Release on Failure: If a philosopher can’t get both forks, they release any held fork
- Random Timing: Randomized thinking/eating times reduce contention patterns
Key Implementation Details
Fork Request Logic
#![allow(unused)]
fn main() {
impl Message<RequestFork> for Table {
type Reply = bool; // true if fork granted, false if unavailable
async fn handle(&mut self, msg: RequestFork, _: &ActorRef<Self>) -> Self::Reply {
let fork_index = match msg.side {
ForkSide::Left => msg.logical_id,
ForkSide::Right => (msg.logical_id + 1) % self.fork_availability.len(),
};
if self.fork_availability[fork_index] {
self.fork_availability[fork_index] = false;
println!("Table: Fork {} granted to Philosopher {}", fork_index, msg.logical_id);
true
} else {
println!("Table: Fork {} denied to Philosopher {} (in use)", fork_index, msg.logical_id);
false
}
}
}
}
Philosopher State Machine
- Thinking State: Random duration, then try to acquire forks
- Fork Acquisition:
- Request left fork first
- If successful, request right fork
- If both acquired, start eating
- If either fails, release held forks and return to thinking
- Eating State: Random duration, then release both forks
- Repeat: Return to thinking state
Benefits of the Actor Model Solution
1. Encapsulation
- Each philosopher manages its own state independently
- The table encapsulates fork management logic
- No shared mutable state between actors
2. Message-Based Coordination
- All communication happens through well-defined messages
- No need for locks, mutexes, or other synchronization primitives
- Clear separation of concerns
3. Fault Tolerance
- Actor isolation means one philosopher’s failure doesn’t affect others
- The table actor can detect and handle philosopher failures
- System can continue operating with fewer philosophers
4. Scalability
- Easy to add more philosophers by spawning new actors
- Table actor can handle arbitrary numbers of philosophers
- Natural load distribution across async tasks
Running the Example
cargo run --example dining_philosophers
You’ll see output like:
Philosopher 0 (Socrates) is thinking.
Philosopher 1 (Plato) attempts to acquire Left fork.
Table: Fork 1 granted to Philosopher 1
Philosopher 1 (Plato) acquired Left fork. Attempting Right fork.
Table: Fork 2 granted to Philosopher 1
Philosopher 1 (Plato) acquired both forks and is eating.
...
Learning Outcomes
This example demonstrates:
- Actor coordination patterns for resource management
- Deadlock prevention strategies in distributed systems
- Message-driven state machines for complex logic
- Centralized vs. distributed resource management approaches
- Practical concurrency problem-solving with actors
The Dining Philosophers problem showcases how the actor model can elegantly solve complex concurrency challenges while maintaining code clarity and system reliability.
Frequently Asked Questions
This FAQ provides answers to common questions about the rsActor framework, covering basic concepts, practical usage patterns, and advanced scenarios.
Getting Started
What is rsActor?
rsActor is a lightweight, async-first Rust actor framework that provides a simple and type-safe way to build concurrent applications using the actor model. It leverages Tokio for async runtime and provides both compile-time and runtime type safety for message passing.
How do I create my first actor?
To create an actor, you need to implement the Actor trait and define message handlers:
use rsactor::{Actor, ActorRef, message_handlers, spawn};
// Define your actor struct
#[derive(Debug)]
struct CounterActor {
count: u32,
}
// Implement the Actor trait
impl Actor for CounterActor {
type Args = u32; // Initial count value
type Error = anyhow::Error;
async fn on_start(initial_count: Self::Args, _actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(CounterActor { count: initial_count })
}
}
// Define a message
#[derive(Debug)]
struct Increment(u32);
// Use message_handlers macro with handler attributes
#[message_handlers]
impl CounterActor {
#[handler]
async fn handle_increment(&mut self, msg: Increment, _: &ActorRef<Self>) -> u32 {
self.count += msg.0;
self.count
}
}
// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (actor_ref, _handle) = spawn::<CounterActor>(0);
let new_count = actor_ref.ask(Increment(5)).await?;
println!("Count: {}", new_count); // Prints: Count: 5
Ok(())
}
What is the difference between ask and tell?
ask: Sends a message and waits for a response. Returns the reply value.tell: Sends a message without waiting for a response. Fire-and-forget style.
#![allow(unused)]
fn main() {
// Ask - waits for response
let result = actor_ref.ask(GetValue).await?;
// Tell - no response expected
actor_ref.tell(LogMessage("Hello".to_string())).await?;
}
How do I handle actor failures?
Actors can fail during their lifecycle. The JoinHandle returns an ActorResult that indicates success or failure:
#![allow(unused)]
fn main() {
let (actor_ref, handle) = spawn::<MyActor>(args);
match handle.await {
Ok(ActorResult::Completed { actor, killed }) => {
println!("Actor completed successfully");
}
Ok(ActorResult::Failed { error, phase, .. }) => {
println!("Actor failed during {:?}: {}", phase, error);
}
Err(join_error) => {
println!("Actor panicked: {}", join_error);
}
}
}
Actor System
How do I spawn actors?
Use the spawn function to create and start actors:
#![allow(unused)]
fn main() {
use rsactor::spawn;
// Spawn with default mailbox size
let (actor_ref, handle) = spawn::<MyActor>(actor_args);
// Spawn with custom mailbox size
let (actor_ref, handle) = spawn_with_mailbox_capacity::<MyActor>(actor_args, 1000);
}
How do I stop an actor?
Actors can be stopped gracefully or killed immediately:
#![allow(unused)]
fn main() {
// Graceful stop - allows ongoing operations to complete
actor_ref.stop().await?;
// Immediate kill - stops the actor immediately
actor_ref.kill()?;
}
Can I send messages to stopped actors?
No, sending messages to stopped actors will return an Error::Send. You can check if an actor is alive, or handle the potential error:
#![allow(unused)]
fn main() {
if actor_ref.is_alive() {
actor_ref.tell(msg).await?;
}
}
Message Handling
How do I define message types?
Messages are regular Rust structs:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone)]
struct CreateUser {
name: String,
email: String,
}
#[derive(Debug)]
struct GetUser { id: u64 }
#[derive(Debug)]
struct DeleteUser { id: u64 }
}
How do I implement message handlers?
Use the #[message_handlers] macro with #[handler] attributes:
#![allow(unused)]
fn main() {
#[message_handlers]
impl UserManagerActor {
#[handler]
async fn handle_create_user(&mut self, msg: CreateUser, _: &ActorRef<Self>) -> Result<User, UserError> {
let user = User { id: self.next_id(), name: msg.name, email: msg.email };
self.users.insert(user.id, user.clone());
Ok(user)
}
#[handler]
async fn handle_get_user(&mut self, msg: GetUser, _: &ActorRef<Self>) -> Option<User> {
self.users.get(&msg.id).cloned()
}
#[handler]
async fn handle_delete_user(&mut self, msg: DeleteUser, _: &ActorRef<Self>) -> bool {
self.users.remove(&msg.id).is_some()
}
}
}
What happens if I forget to add a handler?
If you forget to add a #[handler] attribute to a method that should handle messages, you’ll get a compile-time error when trying to send that message type to the actor.
Actor Lifecycle
What is the lifecycle of an actor?
Actors follow a three-stage lifecycle:
-
Creation and Initialization:
- Actor is spawned with
spawn::<Actor>(args) - Framework calls
on_start(args, &ActorRef<Self>)to create the actor instance - If successful, actor enters the running state
- Actor is spawned with
-
Running:
- Framework repeatedly calls
on_run(&mut self, &ActorWeak<Self>)for continuous processing - Actor concurrently processes messages from its mailbox
- Continues until stopped, killed, or error occurs
- Framework repeatedly calls
-
Termination:
- Framework calls
on_stop(&mut self, &ActorWeak<Self>, killed: bool) - Actor is destroyed and
JoinHandleresolves withActorResult
- Framework calls
What are the lifecycle methods?
#![allow(unused)]
fn main() {
impl Actor for MyActor {
type Args = MyArgs;
type Error = MyError;
// Called during initialization — creates and returns the actor instance
async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(MyActor::new(args))
}
// Called repeatedly when idle — returns Ok(true) to continue, Ok(false) to stop idle processing
async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
// Main processing logic
Ok(false)
}
// Called during termination — cleanup and finalization
async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
// Cleanup logic
Ok(())
}
}
}
Note: on_run and on_stop receive &ActorWeak<Self> (not &ActorRef<Self>) to prevent the actor from holding a strong reference to itself.
What is the ActorResult enum?
ActorResult represents the outcome of an actor’s lifecycle:
ActorResult::Completed { actor, killed }: Actor completed successfullyActorResult::Failed { actor, error, phase, killed }: Actor failed with error details including the failure phase (OnStart,OnRun, orOnStop)
Error Handling
How do I handle errors in actors?
Error handling occurs at multiple levels:
#![allow(unused)]
fn main() {
impl Actor for MyActor {
type Error = MyError;
async fn on_start(args: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
// Return Err to prevent actor creation
Ok(MyActor::new(args)?)
}
async fn on_run(&mut self, _: &ActorWeak<Self>) -> Result<bool, Self::Error> {
// Return Err to terminate actor
self.do_work()?;
Ok(false)
}
}
// Message handling errors
#[message_handlers]
impl MyActor {
#[handler]
async fn handle_process(&mut self, msg: ProcessData, _: &ActorRef<Self>) -> Result<Output, ProcessingError> {
self.process(msg.data)
}
}
}
How do I observe tell() errors?
Use the on_tell_result hook. When using #[handler], the macro auto-generates a default implementation that logs errors via tracing::warn!. Use #[handler(no_log)] to suppress this.
For manual Message<T> implementations, override on_tell_result:
#![allow(unused)]
fn main() {
impl Message<Command> for MyActor {
type Reply = Result<(), CommandError>;
async fn handle(&mut self, msg: Command, _: &ActorRef<Self>) -> Self::Reply {
self.execute(msg)
}
fn on_tell_result(result: &Self::Reply, _actor_ref: &ActorRef<Self>) {
if let Err(e) = result {
tracing::error!("Command failed: {e:?}");
}
}
}
}
Can I use custom error types?
Yes, any error type that implements Send + Debug + 'static can be used:
#![allow(unused)]
fn main() {
#[derive(Debug, thiserror::Error)]
enum MyActorError {
#[error("Database error: {0}")]
DbError(#[from] sqlx::Error),
#[error("Network timeout")]
NetworkTimeout,
}
impl Actor for MyActor {
type Error = MyActorError;
// ...
}
}
Advanced Usage
Can I use rsActor with blocking code?
Yes, use tokio::task::spawn_blocking for blocking operations within handlers. For sending messages from blocking contexts, use blocking_ask and blocking_tell:
#![allow(unused)]
fn main() {
tokio::task::spawn_blocking(move || {
// blocking_ask replaces the deprecated ask_blocking (since v0.10.0)
let result = actor_ref.blocking_ask::<Query, QueryResult>(
Query { id: 123 },
Some(Duration::from_secs(5))
);
// blocking_tell replaces the deprecated tell_blocking (since v0.10.0)
actor_ref.blocking_tell(LogEvent("done".into()), None);
});
}
How do I test actors?
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_counter_actor() {
let (actor_ref, _handle) = spawn::<CounterActor>(0);
let result = actor_ref.ask(Increment(5)).await.unwrap();
assert_eq!(result, 5);
}
}
For dead letter testing, enable the test-utils feature:
#![allow(unused)]
fn main() {
use rsactor::{dead_letter_count, reset_dead_letter_count};
#[tokio::test]
async fn test_dead_letters() {
reset_dead_letter_count();
let (actor_ref, handle) = spawn::<MyActor>(args);
actor_ref.stop().await.unwrap();
handle.await.unwrap();
let _ = actor_ref.tell(MyMessage).await;
assert_eq!(dead_letter_count(), 1);
}
}
How do I implement actor supervision?
While rsActor doesn’t have built-in supervision, you can implement supervision patterns:
#![allow(unused)]
fn main() {
let (child_ref, child_handle) = spawn::<ChildActor>(args);
tokio::spawn(async move {
match child_handle.await {
Ok(ActorResult::Completed { .. }) => { /* normal */ }
Ok(ActorResult::Failed { error, .. }) => {
// Restart the child
let (_new_ref, _) = spawn::<ChildActor>(args);
}
Err(_) => { /* panicked */ }
}
});
}
How do I implement periodic tasks?
Use the on_run method with Tokio’s time utilities:
#![allow(unused)]
fn main() {
struct PeriodicActor {
interval: tokio::time::Interval,
}
impl Actor for PeriodicActor {
type Args = Duration;
type Error = anyhow::Error;
async fn on_start(period: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
Ok(Self { interval })
}
async fn on_run(&mut self, _: &ActorWeak<Self>) -> Result<bool, Self::Error> {
self.interval.tick().await;
self.perform_periodic_work();
Ok(false)
}
}
}
How do I use handler traits for heterogeneous collections?
Handler traits enable type-erased message sending to different actor types:
#![allow(unused)]
fn main() {
use rsactor::{TellHandler, AskHandler};
// Store different actors that handle the same message type
let handlers: Vec<Box<dyn TellHandler<Ping>>> = vec![
(&counter_ref).into(),
(&logger_ref).into(),
];
// Send to all
for handler in &handlers {
handler.tell(Ping).await?;
}
}
See Handler Traits for details.
How do I manage actors of different types?
Use ActorControl for type-erased lifecycle management:
#![allow(unused)]
fn main() {
use rsactor::ActorControl;
let controls: Vec<Box<dyn ActorControl>> = vec![
(&worker_ref).into(),
(&logger_ref).into(),
];
// Stop all actors
for control in &controls {
control.stop().await?;
}
}
See Actor Control for details.
Can I use generics with actors?
Yes, the #[message_handlers] macro supports generic actors:
#![allow(unused)]
fn main() {
#[derive(Debug)]
struct GenericActor<T: Send + Debug + Clone + 'static> {
data: Option<T>,
}
impl<T: Send + Debug + Clone + 'static> Actor for GenericActor<T> {
type Args = Option<T>;
type Error = anyhow::Error;
async fn on_start(args: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
Ok(GenericActor { data: args })
}
}
#[message_handlers]
impl<T: Send + Debug + Clone + 'static> GenericActor<T> {
#[handler]
async fn handle_set_value(&mut self, msg: SetValue<T>, _: &ActorRef<Self>) {
self.data = Some(msg.0);
}
#[handler]
async fn handle_get_value(&mut self, _: GetValue, _: &ActorRef<Self>) -> Option<T> {
self.data.clone()
}
}
}
How does rsActor handle type safety?
rsActor provides comprehensive type safety:
ActorRef<T>: Compile-time type safety — only allows sending messages the actor can handle- Handler traits: Type-erased at runtime while maintaining message type safety
- Automatic inference: Return types are inferred from handler signatures
#![allow(unused)]
fn main() {
// Compile-time safety
let count: u32 = actor_ref.ask(IncrementMsg(5)).await?; // Type-safe
}
For more detailed information, refer to the complete documentation and examples.