Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Welcome to the rsActor User Guide!

rsActor is a lightweight, Tokio-based actor framework in Rust focused on providing a simple and efficient actor model for local, in-process systems. It emphasizes clean message-passing semantics and straightforward actor lifecycle management while maintaining high performance for Rust applications.

This guide will walk you through the core concepts, features, and usage patterns of rsActor to help you build robust and concurrent applications.

Note: This project is actively evolving. While core APIs are stable, some features may be refined in future releases.

Core Features

  • Minimalist Actor System: Focuses on core actor model primitives without unnecessary complexity.
  • Message Passing: Comprehensive communication patterns (ask, tell, timeout variants, blocking versions).
  • Clean Lifecycle Management: on_start, on_run, and on_stop hooks provide intuitive actor lifecycle control.
  • Graceful Termination: Actors can be stopped gracefully or killed immediately, with differentiated cleanup via the killed parameter.
  • Rich Result Types: ActorResult enum captures detailed completion states and error information.
  • Macro-Assisted Development: #[message_handlers] and #[derive(Actor)] reduce boilerplate significantly.
  • Type Safety: Compile-time type safety with ActorRef<T> prevents runtime type errors.
  • Handler Traits: TellHandler and AskHandler enable type-erased message sending for heterogeneous actor collections.
  • Actor Control: ActorControl trait provides type-erased lifecycle management.
  • Weak References: ActorWeak<T> prevents circular dependencies without memory leaks.
  • Dead Letter Tracking: Automatic logging of undelivered messages with structured tracing.
  • Optional Metrics: Per-actor performance metrics (message count, processing time, error count) via the metrics feature.
  • Optional Tracing Instrumentation: #[tracing::instrument] spans for observability via the tracing feature.
  • Minimal Constraints: Only Send trait required for actor structs, enabling flexible internal state management.

Getting Started

This section will guide you through setting up rsActor in your project and running a basic example.

1. Add Dependency

To use rsActor in your Rust project, add it as a dependency in your Cargo.toml file:

[dependencies]
rsactor = "0.14" # Check crates.io for the latest version
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"

Make sure to replace "0.14" with the latest version available on crates.io.

2. Basic Usage Example

Let’s create a simple counter actor. This actor will maintain a count and increment it when it receives an IncrementMsg.

use rsactor::{Actor, ActorRef, message_handlers, spawn};
use anyhow::Result;
use tracing::info;

// Define actor struct
#[derive(Debug)] // Added Debug for printing the actor in ActorResult
struct CounterActor {
    count: u32,
}

// Implement Actor trait
impl Actor for CounterActor {
    type Args = u32; // Define an args type for actor creation
    type Error = anyhow::Error;

    // on_start is required and must be implemented.
    // on_run and on_stop are optional and have default implementations.
    async fn on_start(initial_count: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        info!("CounterActor (id: {}) started. Initial count: {}", actor_ref.identity(), initial_count);
        Ok(CounterActor {
            count: initial_count,
        })
    }
}

// Define message types
struct Increment(u32);

// Use message_handlers macro with handler attributes
#[message_handlers]
impl CounterActor {
    #[handler]
    async fn handle_increment(&mut self, msg: Increment, _: &ActorRef<Self>) -> u32 {
        self.count += msg.0;
        self.count
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init(); // Initialize tracing

    info!("Creating CounterActor");

    let (actor_ref, join_handle) = spawn::<CounterActor>(0u32); // Pass initial count
    info!("CounterActor spawned with ID: {}", actor_ref.identity());

    let new_count: u32 = actor_ref.ask(Increment(5)).await?;
    info!("Incremented count: {}", new_count);

    actor_ref.stop().await?;
    info!("Stop signal sent to CounterActor (ID: {})", actor_ref.identity());

    let actor_result = join_handle.await?;
    info!(
        "CounterActor (ID: {}) task completed. Result: {:?}",
        actor_ref.identity(),
        actor_result
    );

    Ok(())
}

Key Points Demonstrated:

  1. Actor State Management: CounterActor encapsulates its state (count) and provides controlled access through messages.
  2. Type-Safe Communication: The Message<T> trait ensures compile-time verification of message handling.
  3. Lifecycle Control: spawn creates the actor, ask communicates with it, and stop terminates it gracefully.
  4. Error Handling: Proper use of Result types and ? operator for clean error propagation.

This example shows the basic actor pattern that forms the foundation for more complex actor systems.

Core Concepts

This section delves into the fundamental concepts of the rsActor framework. Understanding these concepts is key to effectively using the library.

What is an Actor?

What is an Actor?

In rsActor, an actor is an independent computational entity that encapsulates:

  1. State: Actors can have internal state that they manage exclusively. No other part of the system can directly access or modify an actor’s state.
  2. Behavior: Actors define how they react to messages they receive. This behavior is implemented through message handlers.
  3. Mailbox: Each actor has a mailbox where incoming messages are queued. Messages are processed one at a time, ensuring that an actor’s state is modified sequentially and avoiding data races.

Key Characteristics of Actors in rsActor:

  • Isolation: An actor’s internal state is protected from direct external access. Communication happens solely through messages.
  • Concurrency: Actors can run concurrently with each other. rsActor leverages Tokio to manage these concurrent tasks.
  • Asynchronous Operations: Actors primarily operate asynchronously, allowing them to perform non-blocking I/O and other long-running tasks efficiently.
  • Lifecycle: Actors have a defined lifecycle, including startup, execution, and shutdown phases, which developers can hook into.

The Actor Trait

To define an actor in rsActor, you implement the Actor trait for your struct:

#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, ActorWeak};
use anyhow::Result;

#[derive(Debug)]
struct MyActor {
    // ... internal state
}

impl Actor for MyActor {
    type Args = (); // Arguments needed to create the actor
    type Error = anyhow::Error; // Error type for lifecycle methods

    async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        // Initialize and return the actor instance
        println!("MyActor (id: {}) started with args: {:?}", actor_ref.identity(), args);
        Ok(MyActor { /* ... initial state ... */ })
    }

    async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        // Optional: Idle handler called when the message queue is empty.
        // Return Ok(true) to continue calling on_run, Ok(false) to stop idle processing.
        println!("MyActor (id: {}) idle processing.", actor_weak.identity());
        Ok(false) // Stop idle processing, only handle messages
    }

    async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
        // Optional: Called when the actor is stopping.
        // `killed` indicates whether the actor was stopped gracefully (false) or killed (true).
        if killed {
            println!("MyActor (id: {}) was killed.", actor_weak.identity());
        } else {
            println!("MyActor (id: {}) is stopping gracefully.", actor_weak.identity());
        }
        Ok(())
    }
}
}
  • Args: An associated type defining the arguments required by the on_start method to initialize the actor.
  • Error: An associated type for the error type that lifecycle methods can return. Must implement Send + Debug.
  • on_start: A required asynchronous method called when the actor is first created and started. It receives the initialization arguments and an ActorRef to itself. It should return a Result containing the initialized actor instance or an error.
  • on_run: An optional idle handler called when the message queue is empty. It receives an ActorWeak (weak reference). Return Ok(true) to continue calling on_run, or Ok(false) to stop idle processing. Defaults to Ok(false).
  • on_stop: An optional cleanup method. It receives an ActorWeak and a killed boolean indicating whether termination was graceful (false) or forced via kill() (true).

By encapsulating state and behavior, actors provide a powerful model for building concurrent and resilient applications.

Actor References (ActorRef)

Actor References (ActorRef)

An ActorRef<A> is a handle or a reference to an actor of type A. It is the primary way to interact with an actor from outside. You cannot directly access an actor’s state or call its methods. Instead, you send messages to it via its ActorRef.

Key Features of ActorRef:

  • Message Sending: ActorRef provides methods like ask, tell, ask_with_timeout, and tell_with_timeout to send messages to the associated actor.
  • Type Safety: ActorRef<A> is generic over the actor type A. This ensures that you can only send messages that the actor A is defined to handle, providing compile-time safety.
  • Decoupling: It decouples the sender of a message from the actor itself. Senders don’t need to know the actor’s internal implementation details, only the messages it accepts.
  • Location Transparency (Conceptual): While rsActor is currently focused on in-process actors, the ActorRef concept is fundamental to actor systems and can be extended to support remote actors in the future. The reference itself abstracts away the actor’s actual location.
  • Lifecycle Management: ActorRef also provides methods to manage the actor’s lifecycle, such as stop() and kill().
  • Identity: Each ActorRef has a unique identity() that can be used for logging or tracking purposes.

Obtaining an ActorRef

An ActorRef is typically obtained when an actor is spawned:

use rsactor::{spawn, Actor, ActorRef, Message, impl_message_handler};
use anyhow::Result;

#[derive(Debug)]
struct MyActor;
impl Actor for MyActor {
    type Args = ();
    type Error = anyhow::Error;
    async fn on_start(_args: Self::Args, _actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(MyActor)
    }
}

// Dummy message for demonstration
struct PingMsg;
impl Message<PingMsg> for MyActor {
    type Reply = ();
    async fn handle(&mut self, _msg: PingMsg, _actor_ref: &ActorRef<Self>) -> Self::Reply {}
}
impl_message_handler!(MyActor, [PingMsg]);

#[tokio::main]
async fn main() {
    let (actor_ref, join_handle) = spawn::<MyActor>(());
    // actor_ref is an ActorRef<MyActor>
    // You can now use actor_ref to send messages to MyActor

    actor_ref.tell(PingMsg).await.unwrap();
    actor_ref.stop().await.unwrap();
    join_handle.await.unwrap();
}

Type Safety

rsActor provides compile-time type safety through ActorRef<A>. This ensures that only messages that the actor can handle are sent, and reply types are correctly typed at compile time.

The type-safe approach prevents runtime type errors and provides better IDE support with autocomplete and type checking. All actor communication should use ActorRef<A> for the best development experience and runtime safety.

ActorRef<A> is cloneable and can be safely shared across tasks.

Messages

Messages

Messages are the sole means of communication between actors in rsActor. They are plain Rust structs or enums that carry data from a sender to a recipient actor.

Defining Messages

A message can be any struct or enum. It’s a common practice to define messages specific to the interactions an actor supports.

#![allow(unused)]
fn main() {
// Example message structs
struct GetData { id: u32 }
struct UpdateData { id: u32, value: String }

// Example message enum
enum CounterCommand {
    Increment(u32),
    Decrement(u32),
    GetValue,
}
}

The Message<T> Trait

For an actor to handle a specific message type T, the actor must implement the Message<T> trait. This trait defines how the actor processes the message and what kind of reply (if any) it produces.

#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, Message, impl_message_handler};
use anyhow::Result;

#[derive(Debug)]
struct DataStore { content: String }

impl Actor for DataStore {
    type Args = String;
    type Error = anyhow::Error;
    async fn on_start(initial_content: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(DataStore { content: initial_content })
    }
}

// Message to retrieve current content
struct GetContent;

impl Message<GetContent> for DataStore {
    type Reply = String;

    async fn handle(&mut self, _: GetContent, _: &ActorRef<Self>) -> Self::Reply {
        self.content.clone()
    }
}

// Message to update content
struct SetContent(String);

impl Message<SetContent> for DataStore {
    type Reply = (); // No direct reply needed

    async fn handle(&mut self, msg: SetContent, _: &ActorRef<Self>) -> Self::Reply {
        self.content = msg.0;
    }
}

impl_message_handler!(DataStore, [GetContent, SetContent]);
}

Key components of the Message<T> trait implementation:

  • type Reply: An associated type that specifies the type of the value the actor will send back to the sender if the message was sent using ask. If the message doesn’t warrant a direct reply (e.g., for tell operations), () can be used.
  • async fn handle(&mut self, msg: T, actor_ref: &ActorRef<Self>) -> Self::Reply: This asynchronous method contains the logic for processing the message msg of type T.
    • It takes a mutable reference to the actor’s state (&mut self), allowing it to modify the actor’s internal data.
    • It also receives a reference to the actor’s own ActorRef, which can be useful for various purposes, such as spawning child actors or sending messages to itself.
    • It must return a value of type Self::Reply.

Message Immutability

Once a message is sent, it should be considered immutable by the sender. The actor receiving the message effectively takes ownership of the data within the message (or a copy of it).

Message Design Principles:

  • Clarity: Messages should clearly represent the intended operation or event.
  • Granularity: Design messages that are neither too coarse (bundling unrelated operations) nor too fine-grained (leading to chatty communication).
  • Immutability: Prefer messages that are immutable or contain immutable data to avoid shared mutable state issues.
  • Serializability (for future/distributed systems): While rsActor is in-process, if you ever plan to distribute your actors, designing messages that can be serialized (e.g., using Serde) is a good practice.

By using well-defined messages and implementing the Message<T> trait, you create a clear and type-safe communication protocol for your actors.

Actor Lifecycle

Actor Lifecycle

Actors in rsActor go through a well-defined lifecycle. Understanding this lifecycle is crucial for managing actor state, resources, and behavior correctly.

The lifecycle consists of several stages, with specific methods (hooks) in the Actor trait that you can implement to customize behavior at each stage:

  1. Spawning: An actor’s lifecycle begins when it is spawned using rsactor::spawn() or rsactor::spawn_with_mailbox_capacity().

  2. Starting (on_start): Once spawned, the framework calls on_start.

    • Purpose: Initialize the actor’s state. It receives the arguments passed during spawn() and an ActorRef to itself.
    • Required: This method must be implemented.
    • Outcome: Return Ok(Self) with the initialized actor instance or Err if initialization fails. If on_start fails, the ActorResult will be Failed with FailurePhase::OnStart.
    #![allow(unused)]
    fn main() {
    async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        // Initialize state, acquire resources
        Ok(Self { /* ... */ })
    }
    }
  3. Running (on_run): After on_start succeeds, on_run is called as an idle handler when the message queue is empty.

    • Purpose: Background processing or periodic tasks. Messages always have higher priority.
    • Optional: Defaults to Ok(false) (called once, then disabled).
    • Return values:
      • Ok(true): Continue calling on_run
      • Ok(false): Stop calling on_run, only process messages
      • Err(e): Terminate the actor with an error
    • Note: Receives &ActorWeak<Self>, not &ActorRef<Self>.
    #![allow(unused)]
    fn main() {
    async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        // Perform idle processing when message queue is empty
        Ok(false)
    }
    }
  4. Message Processing: The actor concurrently handles incoming messages and executes on_run. Messages are always given priority via biased tokio::select!.

  5. Stopping (on_stop): The actor can be stopped in several ways:

    • Graceful Stop: actor_ref.stop().await? — processes remaining messages first.
    • Immediate Kill: actor_ref.kill() — stops immediately.
    • Error in on_run: on_stop is called for cleanup before termination.
    • All references dropped: When all ActorRef instances are dropped.

    The killed parameter distinguishes the termination mode:

    #![allow(unused)]
    fn main() {
    async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
        if killed {
            println!("Forcefully terminated, performing minimal cleanup");
        } else {
            println!("Gracefully shutting down, performing full cleanup");
        }
        Ok(())
    }
    }
  6. Termination: After on_stop completes, the actor’s Tokio task finishes. The JoinHandle resolves with an ActorResult<T>:

    • ActorResult::Completed { actor, killed } — successful completion
    • ActorResult::Failed { actor, error, phase, killed } — failure with details

Graceful Stop vs. Kill

Featurestop()kill()
Drains mailboxYesNo
on_stop calledYesYes
killed parameterfalsetrue
BlockingAsync (await)Synchronous

Visualizing the Lifecycle

graph TD
    A[Spawn Actor] --> B{on_start};
    B -- Success --> D[Message Processing + on_run Loop];
    B -- Failure --> F[ActorResult::Failed OnStart];
    D -- stop signal --> E{on_stop killed=false};
    D -- kill signal --> E2{on_stop killed=true};
    D -- on_run Error --> E3{on_stop for cleanup};
    D -- All refs dropped --> E;
    E --> G[ActorResult::Completed];
    E2 --> G2[ActorResult::Completed killed=true];
    E3 --> F2[ActorResult::Failed OnRun];

Understanding these lifecycle hooks allows you to build actors that manage their resources and state reliably throughout their existence.

Creating Actors

rsActor provides flexible ways to define and spawn actors. The core of actor creation revolves around implementing the Actor trait and then using spawn functions to bring actors to life.

Actor Creation Methods

There are several approaches to creating actors in rsActor:

1. Manual Implementation

Implement the Actor trait manually for full control over initialization and lifecycle:

#![allow(unused)]
fn main() {
struct DatabaseActor {
    connection: DbConnection,
}

impl Actor for DatabaseActor {
    type Args = String; // connection string
    type Error = anyhow::Error;

    async fn on_start(conn_str: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        let connection = DbConnection::connect(&conn_str).await?;
        Ok(Self { connection })
    }
}
}

2. Derive Macro

Use #[derive(Actor)] for simple actors that don’t need complex initialization:

#![allow(unused)]
fn main() {
#[derive(Actor)]
struct CacheActor {
    data: HashMap<String, String>,
}
}

Spawning Actors

Once you’ve defined an actor, spawn it using one of these functions:

#![allow(unused)]
fn main() {
// Basic spawning
let (actor_ref, join_handle) = spawn(my_actor_args);

// Spawning with custom mailbox capacity
let (actor_ref, join_handle) = spawn_with_mailbox_capacity(my_actor_args, 1000);
}

Actor Categories

Basic Actors

Simple actors that process messages and maintain state without complex background tasks.

Async Actors

Actors that perform continuous asynchronous work using the on_run lifecycle method, such as:

  • Periodic tasks with timers
  • Network I/O operations
  • Database queries
  • File system operations

Blocking Task Actors

Actors designed to handle CPU-intensive or blocking operations that might block the async runtime:

  • Heavy computations
  • Synchronous I/O
  • Legacy blocking APIs
  • CPU-bound algorithms

Key Concepts

Actor Arguments (Args)

The Args associated type defines what data is needed to initialize your actor:

  • Self - Pass the entire actor struct as initialization data
  • Custom types - Define specific initialization parameters
  • () - No initialization data needed

Error Handling

The Error associated type defines how initialization failures are handled:

  • anyhow::Error - General error handling
  • std::convert::Infallible - For actors that never fail to initialize
  • Custom error types - Domain-specific error handling

Lifecycle Integration

Different actor types leverage lifecycle methods differently:

  • on_start: Always required for initialization
  • on_run: Optional, used for background tasks and periodic work
  • on_stop: Optional, used for cleanup when the actor shuts down

This section covers these different actor patterns and when to use each approach.

Basic Actors

Basic Actors

A basic actor is the foundation of the rsActor system. It encapsulates state and processes messages sequentially, ensuring thread safety through the actor model.

Essential Components

Every basic actor requires:

  1. Actor Struct: Holds the actor’s state
  2. Actor Trait Implementation: Defines initialization and lifecycle
  3. Message Types: Define communication protocol
  4. Message Handlers: Process incoming messages

Complete Example

use rsactor::{Actor, ActorRef, Message, impl_message_handler, spawn};
use anyhow::Result;

// 1. Define the actor struct
#[derive(Debug)]
struct BankAccount {
    account_id: String,
    balance: u64,
}

// 2. Implement the Actor trait
impl Actor for BankAccount {
    type Args = (String, u64); // (account_id, initial_balance)
    type Error = anyhow::Error;

    async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        let (account_id, initial_balance) = args;
        println!("BankAccount {} (ID: {}) opened with balance: ${}",
                 account_id, actor_ref.identity(), initial_balance);

        Ok(BankAccount { account_id, balance: initial_balance })
    }
}

// 3. Define message types
struct Deposit(u64);
struct Withdraw(u64);
struct GetBalance;

// 4. Implement message handlers
impl Message<Deposit> for BankAccount {
    type Reply = u64; // Returns new balance

    async fn handle(&mut self, msg: Deposit, _: &ActorRef<Self>) -> Self::Reply {
        self.balance += msg.0;
        println!("Account {}: Deposited ${}, new balance: ${}",
                 self.account_id, msg.0, self.balance);
        self.balance
    }
}

impl Message<Withdraw> for BankAccount {
    type Reply = Result<u64, String>; // Returns new balance or error

    async fn handle(&mut self, msg: Withdraw, _: &ActorRef<Self>) -> Self::Reply {
        if self.balance >= msg.0 {
            self.balance -= msg.0;
            println!("Account {}: Withdrew ${}, new balance: ${}",
                     self.account_id, msg.0, self.balance);
            Ok(self.balance)
        } else {
            Err(format!("Insufficient funds: requested ${}, available ${}",
                        msg.0, self.balance))
        }
    }
}

impl Message<GetBalance> for BankAccount {
    type Reply = u64;

    async fn handle(&mut self, _: GetBalance, _: &ActorRef<Self>) -> Self::Reply {
        self.balance
    }
}

// 5. Wire up message handlers
impl_message_handler!(BankAccount, [Deposit, Withdraw, GetBalance]);

// Usage example
#[tokio::main]
async fn main() -> Result<()> {
    let (account_ref, _) = spawn::<BankAccount>(
        ("Alice".to_string(), 1000)
    );

    // Perform operations
    let new_balance = account_ref.ask(Deposit(500)).await?;
    println!("After deposit: ${}", new_balance);

    match account_ref.ask(Withdraw(200)).await? {
        Ok(balance) => println!("After withdrawal: ${}", balance),
        Err(error) => println!("Withdrawal failed: {}", error),
    }

    let final_balance = account_ref.ask(GetBalance).await?;
    println!("Final balance: ${}", final_balance);

    Ok(())
}
## Key Design Patterns

**State Management**: Actors encapsulate state privately, ensuring thread safety without locks.

**Error Handling**: Use `Result` types in message replies to handle business logic errors gracefully.

**Message Design**: Keep messages focused on single operations for clarity and maintainability.

**Type Safety**: Leverage Rust's type system to ensure compile-time verification of message handling.

This pattern scales from simple stateful services to complex business logic processors while maintaining the benefits of the actor model.
}

This covers the creation of a standard actor. The actor runs in its own Tokio task, processes messages sequentially, and manages its state privately.

Async Actors

Async Actors

Actors in rsActor are inherently asynchronous, thanks to their integration with Tokio. However, the term “Async Actor” here refers to actors that, within their message handlers or lifecycle methods, perform operations that involve .awaiting other asynchronous tasks. This is a common pattern for I/O-bound work or when an actor needs to coordinate with other asynchronous services.

rsActor fully supports this: message handlers (handle method) and lifecycle hooks (on_start, on_run, on_stop) are already async fn.

Example: Actor Performing Async Work in handle

Consider an actor that, upon receiving a message, needs to perform an asynchronous HTTP request or a database query.

use rsactor::{Actor, ActorRef, Message, impl_message_handler, spawn};
use anyhow::Result;
use log::info;
use tokio::time::{sleep, Duration};

#[derive(Debug)]
struct AsyncWorkerActor {
    worker_id: u32,
}

impl Actor for AsyncWorkerActor {
    type Args = u32; // Worker ID
    type Error = anyhow::Error;

    async fn on_start(worker_id: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        info!("AsyncWorkerActor (id: {}, worker_id: {}) starting", actor_ref.identity(), worker_id);
        Ok(AsyncWorkerActor { worker_id })
    }
}

// Message to perform an async task
struct PerformAsyncTaskMsg {
    task_data: String,
    delay_ms: u64,
}

impl Message<PerformAsyncTaskMsg> for AsyncWorkerActor {
    type Reply = String; // Result of the async task

    async fn handle(&mut self, msg: PerformAsyncTaskMsg, actor_ref: &ActorRef<Self>) -> Self::Reply {
        info!(
            "Actor (id: {}, worker_id: {}) received async task: {}. Will delay for {}ms",
            actor_ref.identity(),
            self.worker_id,
            msg.task_data,
            msg.delay_ms
        );

        // Simulate an asynchronous operation (e.g., an HTTP call, DB query)
        sleep(Duration::from_millis(msg.delay_ms)).await;

        let result = format!(
            "Task '{}' completed by worker {} after {}ms",
            msg.task_data,
            self.worker_id,
            msg.delay_ms
        );
        info!("Actor (id: {}) finished async task: {}", actor_ref.identity(), result);
        result
    }
}

impl_message_handler!(AsyncWorkerActor, [PerformAsyncTaskMsg]);

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();
    let (actor_ref, jh) = spawn::<AsyncWorkerActor>(101); // Spawn with worker_id 101

    let task1 = PerformAsyncTaskMsg { task_data: "Process Payment".to_string(), delay_ms: 100 };
    let task2 = PerformAsyncTaskMsg { task_data: "Fetch User Data".to_string(), delay_ms: 50 };

    // Send messages and await replies
    // Since message handling is sequential for an actor, task2 will only start after task1's handle completes.
    let result1 = actor_ref.ask(task1).await?;
    info!("Main: Result 1: {}", result1);

    let result2 = actor_ref.ask(task2).await?;
    info!("Main: Result 2: {}", result2);

    actor_ref.stop().await?;
    jh.await??;
    Ok(())
}

In this example:

  • The handle method for PerformAsyncTaskMsg is async.
  • It uses tokio::time::sleep(...).await to simulate a non-CPU-bound asynchronous operation.
  • While the actor is awaiting inside handle, its specific Tokio task yields control, allowing other Tokio tasks (including other actors or other asynchronous operations in the system) to run. However, this specific actor instance will not process further messages from its mailbox until the current handle method completes.

Spawning Additional Tokio Tasks from an Actor

Sometimes, an actor might need to initiate multiple asynchronous operations concurrently or offload work to separate Tokio tasks without blocking its own message processing loop for the entire duration of that work. This is a common pattern for achieving higher concurrency within the scope of a single actor’s responsibilities.

The actor_async_worker.rs example in the examples/ directory demonstrates this pattern:

  • A RequesterActor sends tasks to a WorkerActor.
  • The WorkerActor, in its handle method for ProcessTask, uses tokio::spawn to launch a new asynchronous task for the actual work.
  • This allows the WorkerActor’s handle method to return quickly, making the WorkerActor available to process new messages while the spawned Tokio tasks run in the background.
  • The spawned tasks, upon completion, send their results back to the RequesterActor (or another designated actor) using messages.

Key snippet from examples/actor_async_worker.rs (WorkerActor):

#![allow(unused)]
fn main() {
// In WorkerActor's impl Message<ProcessTask>
async fn handle(&mut self, msg: ProcessTask, _actor_ref: &ActorRef<Self>) -> Self::Reply {
    let task_id = msg.task_id;
    let data = msg.data;
    let requester = msg.requester; // ActorRef to send the result back

    println!("WorkerActor received task {}: {}", task_id, data);

    // Spawn a new Tokio task to do the processing asynchronously
    tokio::spawn(async move {
        // Simulate some processing time
        let processing_time = (task_id % 3 + 1) as u64;
        println!("Processing task {} will take {} seconds", task_id, processing_time);
        tokio::time::sleep(Duration::from_secs(processing_time)).await;

        let result = format!("Result of task {} ...", task_id, /* ... */);

        // Send the result back to the requester
        match requester.tell(WorkCompleted { task_id, result }).await {
            Ok(_) => println!("Worker sent back result for task {}", task_id),
            Err(e) => eprintln!("Failed to send result for task {}: {:?}", task_id, e),
        }
    });
    // The handle method returns quickly, allowing WorkerActor to process other messages.
}
}

This pattern is powerful for actors that manage or delegate multiple concurrent operations.

Considerations:

  • Sequential Message Processing: Even if an actor spawns other Tokio tasks, its own messages are still processed one by one from its mailbox. The handle method for a given message must complete before the next message is taken from the mailbox.
  • State Management: If spawned tasks need to interact with the actor’s state, they must do so by sending messages back to the actor. Direct mutable access to self is not possible from a separately spawned tokio::spawn task that outlives the handle method’s scope (unless using Arc<Mutex<...>> or similar, which moves away from the actor model’s state encapsulation benefits for that specific piece of state).
  • Error Handling: Errors in spawned tasks need to be handled appropriately, potentially by sending an error message back to the parent actor or another designated error-handling actor.
  • Graceful Shutdown: If an actor spawns long-running tasks, consider how these tasks should behave when the actor itself is stopped. They might need to be cancelled or allowed to complete. rsActor itself doesn’t automatically manage tasks spawned via tokio::spawn from within an actor; this is the developer’s responsibility.

Async actors are fundamental to building responsive, I/O-bound applications with rsActor.

Blocking Task Actors

Blocking Task Actors

While Tokio and rsActor are designed for asynchronous operations, there are scenarios where you need to integrate with blocking code, such as CPU-bound computations, synchronous I/O libraries, or FFI calls.

Running blocking code directly within an actor’s message handler (which runs on a Tokio worker thread) can stall the Tokio runtime, preventing other asynchronous tasks from making progress. To handle this, Tokio provides tokio::task::spawn_blocking.

rsActor facilitates interaction with tasks running in spawn_blocking by providing blocking message sending methods: blocking_ask and blocking_tell on ActorRef.

When to Use spawn_blocking with Actors

  1. CPU-Bound Work: For lengthy computations that would otherwise block an async task for too long.
  2. Synchronous Libraries: When interacting with libraries that use blocking I/O or do not have an async API.
  3. FFI Calls: If foreign function interface calls are blocking.

Pattern: Actor Managing a Blocking Task

A common pattern is an actor that either:

  • Offloads specific blocking operations to spawn_blocking within its message handlers.
  • Manages a dedicated, long-running blocking task that communicates with the actor.

The examples/actor_blocking_task.rs file demonstrates an actor that spawns a dedicated synchronous background task in its on_start method.

Key elements:

  1. Spawning the Blocking Task (in on_start):

    #![allow(unused)]
    fn main() {
    // Inside SyncDataProcessorActor::on_start
    let task_actor_ref = actor_ref.clone(); // For task -> actor communication
    
    let handle = task::spawn_blocking(move || {
        loop {
            thread::sleep(interval); // Blocking sleep
            let raw_value = rand::random::<f64>() * 100.0;
    
            // Send data back to the actor using blocking_tell
            if let Err(e) = task_actor_ref.blocking_tell(ProcessedData { /* ... */ }, None) {
                break; // Exit task on error
            }
        }
    });
    }
  2. Communication from Blocking Task to Actor (blocking_tell): The blocking task uses task_actor_ref.blocking_tell(...) to send messages back to the actor.

  3. Communication from Actor to Blocking Task: The actor can send commands to the blocking task using standard Tokio MPSC channels.

blocking_tell and blocking_ask

These methods on ActorRef are designed for use from any thread, including non-async contexts:

  • blocking_tell(message, timeout: Option<Duration>): Sends a message and blocks the current thread until enqueued. Fire-and-forget.
  • blocking_ask(message, timeout: Option<Duration>): Sends a message and blocks until the actor processes it and a reply is received.

Timeout Behavior

  • timeout: None: Uses Tokio’s blocking_send directly. Most efficient, but blocks indefinitely if the mailbox is full.
  • timeout: Some(duration): Spawns a separate thread with a temporary Tokio runtime. Has additional overhead (~50-200us for thread creation) but guarantees bounded waiting.

Thread Safety

These methods are safe to call from within an existing Tokio runtime context because the timeout implementation spawns a separate thread with its own runtime, avoiding the “cannot start a runtime from within a runtime” panic.

Deprecated Methods

The older ask_blocking and tell_blocking methods are deprecated since v0.10.0. Use blocking_ask and blocking_tell instead:

#![allow(unused)]
fn main() {
// Old (deprecated):
// actor_ref.ask_blocking(msg, timeout);
// actor_ref.tell_blocking(msg, timeout);

// New:
actor_ref.blocking_ask(msg, timeout);
actor_ref.blocking_tell(msg, timeout);
}

Considerations

  • Thread Pool: spawn_blocking uses a dedicated thread pool in Tokio. Be mindful of pool size if spawning many blocking tasks.
  • Communication: Use ActorRef with blocking_* methods for task-to-actor communication, and Tokio MPSC channels for actor-to-task communication.
  • Shutdown: Ensure graceful shutdown of blocking tasks when the managing actor stops.

By using spawn_blocking and the blocking_* methods, rsActor allows you to integrate synchronous, blocking code into your asynchronous actor system safely and efficiently.

Communicating with Actors

Once an actor is spawned and you have its ActorRef, you can communicate with it by sending messages. rsActor provides two primary modes of message passing: “tell” (fire-and-forget) and “ask” (request-response).

All message sending methods are asynchronous and return a Future that resolves when the operation completes (e.g., message enqueued for tell, or reply received for ask).

Tell (Fire and Forget)

Tell (Fire and Forget)

The “tell” pattern, accessed via actor_ref.tell(message), is used to send a message to an actor without expecting a direct reply. This is a form of asynchronous, one-way communication.

Characteristics of tell:

  • Asynchronous: actor_ref.tell(message) returns a Future that completes once the message is enqueued into the actor’s mailbox. It does not wait for the actor to process the message.
  • No Direct Reply: The sender does not receive a value back from the actor.
  • Reply Type: For messages intended for tell, their handler typically returns ().
  • Error Handling: Returns Result<(), rsactor::Error>. An error occurs if the actor is no longer alive.

Usage Example:

use rsactor::{Actor, ActorRef, message_handlers, spawn};
use anyhow::Result;
use tracing::info;

#[derive(Actor)]
struct LoggerActor;

// Message to log a string
struct LogMessage(String);

#[message_handlers]
impl LoggerActor {
    #[handler]
    async fn handle_log(&mut self, msg: LogMessage, actor_ref: &ActorRef<Self>) -> () {
        info!("LoggerActor (id: {}): {}", actor_ref.identity(), msg.0);
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();
    let (logger_ref, jh) = spawn::<LoggerActor>(LoggerActor);

    // Send a log message using tell
    logger_ref.tell(LogMessage("Application started successfully".to_string())).await?;
    logger_ref.tell(LogMessage("Processing item #123".to_string())).await?;

    logger_ref.stop().await?;
    jh.await?;
    Ok(())
}

tell_with_timeout

rsActor also provides actor_ref.tell_with_timeout(message, timeout).

  • Similar to tell, but allows specifying a Duration as a timeout for enqueuing the message.
  • Returns a Timeout error if the message cannot be enqueued within the given duration.
#![allow(unused)]
fn main() {
use std::time::Duration;

match logger_ref.tell_with_timeout(
    LogMessage("Critical event".to_string()),
    Duration::from_millis(100)
).await {
    Ok(_) => info!("Log message sent."),
    Err(e) => info!("Failed to send log message: {:?}", e),
}
}

blocking_tell

For sending messages from non-async contexts (e.g., spawn_blocking tasks):

#![allow(unused)]
fn main() {
// Without timeout (most efficient)
actor_ref.blocking_tell(LogMessage("from blocking".into()), None)?;

// With timeout
actor_ref.blocking_tell(LogMessage("from blocking".into()), Some(Duration::from_secs(1)))?;
}

When to Use tell:

  • Notifications/Events: When an actor needs to notify another without needing an immediate response.
  • Commands: Issuing commands where the sender doesn’t need to wait for completion.
  • Decoupling: When you want to minimize coupling between actors.
  • Avoiding Deadlocks: In complex actor interactions, tell can break circular dependency cycles that ask might create.

tell is a fundamental communication pattern for building reactive and event-driven systems with actors.

Ask (Request-Response)

Ask (Request-Response)

The “ask” pattern, accessed via actor_ref.ask(message), is used to send a message to an actor and asynchronously await a reply. This is a form of two-way communication, suitable for request-response interactions.

Characteristics of ask:

  • Asynchronous Request, Asynchronous Reply: actor_ref.ask(message) returns a Future that completes when the target actor has processed the message and produced a reply.
  • Direct Reply: The sender receives a typed value back from the actor, defined by the handler’s return type.
  • Error Handling: Returns Result<Reply, rsactor::Error>. Errors can occur if:
    • The actor is not alive (Error::Send)
    • The reply channel was dropped (Error::Receive)
    • A timeout occurs (Error::Timeout)

Usage Example:

use rsactor::{Actor, ActorRef, message_handlers, spawn};
use anyhow::Result;
use tracing::info;

#[derive(Actor)]
struct CalculatorActor {
    last_result: i32,
}

struct AddMsg(i32, i32);
struct GetLastResult;

#[message_handlers]
impl CalculatorActor {
    #[handler]
    async fn handle_add(&mut self, msg: AddMsg, _: &ActorRef<Self>) -> i32 {
        let sum = msg.0 + msg.1;
        self.last_result = sum;
        sum
    }

    #[handler]
    async fn handle_get_last(&mut self, _: GetLastResult, _: &ActorRef<Self>) -> i32 {
        self.last_result
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();
    let (calc_ref, jh) = spawn::<CalculatorActor>(CalculatorActor { last_result: 0 });

    // Send an AddMsg using ask and await the reply
    let sum: i32 = calc_ref.ask(AddMsg(10, 25)).await?;
    info!("Sum: {}", sum); // Sum: 35

    let last: i32 = calc_ref.ask(GetLastResult).await?;
    assert_eq!(sum, last);

    calc_ref.stop().await?;
    jh.await?;
    Ok(())
}

ask_with_timeout

Specify a timeout for the entire request-response cycle:

#![allow(unused)]
fn main() {
use std::time::Duration;

match calc_ref.ask_with_timeout(AddMsg(5, 7), Duration::from_secs(1)).await {
    Ok(sum) => info!("Sum: {}", sum),
    Err(e) => info!("Failed: {:?}", e),
}
}

This is crucial for building resilient systems where you cannot wait indefinitely.

ask_join

For handlers that return a JoinHandle<R> (spawning long-running tasks), ask_join automatically awaits the task completion:

#![allow(unused)]
fn main() {
// Handler returns JoinHandle<String>
let result: String = actor_ref.ask_join(HeavyTask { data: "input".into() }).await?;
}

This avoids manually awaiting the JoinHandle from a regular ask call.

blocking_ask

For sending messages from non-async contexts:

#![allow(unused)]
fn main() {
// Without timeout
let result = actor_ref.blocking_ask(GetLastResult, None)?;

// With timeout
let result = actor_ref.blocking_ask(GetLastResult, Some(Duration::from_secs(5)))?;
}

When to Use ask:

  • Requesting Data: When an actor needs to retrieve information from another actor.
  • Performing Operations with Results: When the caller needs the result to proceed.
  • Synchronizing Operations: To ensure one operation completes before another begins.

Potential Pitfalls:

  • Deadlocks: If Actor A asks Actor B, and Actor B concurrently asks Actor A, a deadlock occurs. Design interaction flows carefully, or use tell to break cycles. rsActor provides a runtime deadlock detection feature that catches these cycles before they happen — see Deadlock Detection for details.
  • Performance: Excessive ask where tell would suffice leads to performance bottlenecks. If a reply isn’t strictly needed, prefer tell.

ask is a powerful tool for interactions requiring a response, but should be used with awareness of its blocking nature and potential complexities.

Error Handling

Robust error handling is essential in any concurrent system. rsActor provides comprehensive mechanisms to deal with errors at every level.

Error Categories

Errors in rsActor fall into these categories:

  1. Actor Lifecycle Errors: Errors in on_start, on_run, or on_stop (your custom Actor::Error type)
  2. Communication Errors: Errors during message sending (rsactor::Error)
  3. Message Handling Errors: Errors within handler methods (your custom reply types)
  4. Panics: If an actor task panics (caught by JoinHandle)

rsactor::Error

The framework’s built-in error type for communication failures:

VariantDescriptionRetryable?
Error::SendActor’s mailbox is closed (actor stopped)No
Error::ReceiveReply channel was dropped before responseNo
Error::TimeoutOperation exceeded the specified timeoutYes
Error::DowncastReply type downcast failed (programming error)No
Error::RuntimeActor lifecycle runtime failureNo
Error::MailboxCapacityMailbox capacity configuration errorNo
Error::JoinSpawned task panicked or was cancelledNo

Error Utilities

#![allow(unused)]
fn main() {
match actor_ref.tell(msg).await {
    Ok(()) => { /* delivered */ }
    Err(e) => {
        // Check if retrying might help
        if e.is_retryable() {
            // Only Timeout errors are retryable
        }

        // Get actionable debugging suggestions
        for tip in e.debugging_tips() {
            println!("  - {}", tip);
        }
    }
}
}

Custom Actor Errors

Define your own error type for lifecycle methods. Any type implementing Send + Debug + 'static works:

#![allow(unused)]
fn main() {
#[derive(Debug, thiserror::Error)]
enum MyActorError {
    #[error("Database error: {0}")]
    DbError(#[from] sqlx::Error),

    #[error("Network timeout")]
    NetworkTimeout,
}

impl Actor for MyActor {
    type Error = MyActorError;
    // ...
}
}

on_tell_result Hook

For fire-and-forget (tell) messages, errors in the handler are silently dropped by default. The on_tell_result hook provides observability:

#![allow(unused)]
fn main() {
#[message_handlers]
impl MyActor {
    #[handler]
    async fn handle_command(&mut self, msg: Command, _: &ActorRef<Self>) {
        self.execute(msg)?;
    }
}

// In the Message<Command> trait implementation (auto-generated by #[handler]):
// on_tell_result is called with the handler's return value when sent via tell()
}

When implementing Message<T> manually, override on_tell_result:

#![allow(unused)]
fn main() {
impl Message<Command> for MyActor {
    type Reply = Result<(), CommandError>;

    async fn handle(&mut self, msg: Command, _: &ActorRef<Self>) -> Self::Reply {
        self.execute(msg)
    }

    fn on_tell_result(result: &Self::Reply, _actor_ref: &ActorRef<Self>) {
        if let Err(e) = result {
            tracing::error!("Command failed (fire-and-forget): {e:?}");
        }
    }
}
}

Use #[handler(no_log)] to suppress the default warning log for tell errors.

Dead Letters

When a message cannot be delivered, rsActor automatically records a dead letter with structured tracing. See Dead Letter Tracking for details.

ActorResult

The ActorResult<T: Actor> enum is returned when an actor’s lifecycle completes. This is typically what you get when you .await the JoinHandle returned by rsactor::spawn().

It provides detailed information about how the actor terminated.

#![allow(unused)]
fn main() {
/// Represents the phase during which an actor failure occurred.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FailurePhase {
    /// Actor failed during the `on_start` lifecycle hook.
    OnStart,
    /// Actor failed during execution in the `on_run` lifecycle hook.
    OnRun,
    /// Actor failed during the `on_stop` lifecycle hook.
    OnStop,
}

/// Result type returned when an actor's lifecycle completes.
#[derive(Debug)]
pub enum ActorResult<T: Actor> {
    /// Actor completed successfully and can be recovered.
    Completed {
        /// The successfully completed actor instance
        actor: T,
        /// Whether the actor was killed (`true`) or stopped gracefully (`false`)
        killed: bool,
    },
    /// Actor failed during one of its lifecycle phases.
    Failed {
        /// The actor instance (if recoverable), or None if not recoverable.
        /// This will be `None` specifically when the failure occurred during `on_start`,
        /// as the actor wasn't fully initialized.
        actor: Option<T>,
        /// The error that caused the failure
        error: T::Error,
        /// The lifecycle phase during which the failure occurred
        phase: FailurePhase,
        /// Whether the actor was killed (`true`) or was attempting to stop gracefully (`false`)
        killed: bool,
    },
}
}

Variants

  1. ActorResult::Completed { actor: T, killed: bool }

    • Indicates that the actor finished its lifecycle without any errors defined by T::Error.
    • actor: The instance of the actor. You can retrieve it if you need to inspect its final state.
    • killed: A boolean indicating if the actor was terminated via a kill() signal (true) or if it stopped gracefully (e.g., via stop() or by naturally ending its run loop) (false).
  2. ActorResult::Failed { actor: Option<T>, error: T::Error, phase: FailurePhase, killed: bool }

    • Indicates that the actor encountered an error (T::Error) during its operation.
    • actor: An Option<T> containing the actor instance if it was recoverable. This will be None if the failure occurred during the OnStart phase, as the actor instance wouldn’t have been fully created.
    • error: The specific error of type T::Error (defined in your impl Actor for YourActor) that caused the failure.
    • phase: A FailurePhase enum (OnStart, OnRun, OnStop) indicating when the error occurred in the actor’s lifecycle.
    • killed: A boolean indicating if the actor was attempting to stop due to a kill() signal when the failure occurred.

Utility Methods

ActorResult provides several helpful methods:

  • is_completed(): Returns true if the actor completed successfully.
  • is_failed(): Returns true if the actor failed.
  • was_killed(): Returns true if the actor was killed, regardless of completion or failure.
  • stopped_normally(): Returns true if Completed and not killed.
  • is_startup_failed(): Returns true if failed during OnStart.
  • is_runtime_failed(): Returns true if failed during OnRun.
  • is_stop_failed(): Returns true if failed during OnStop.
  • actor(): Returns Option<&T> to the actor instance if available.
  • into_actor(): Consumes self and returns Option<T>.
  • error(): Returns Option<&T::Error> if failed.
  • into_error(): Consumes self and returns Option<T::Error>.
  • has_actor(): Returns true if the result contains an actor instance.
  • to_result(): Converts ActorResult<T> into std::result::Result<T, T::Error>.

Example Usage

use rsactor::{spawn, Actor, ActorRef, ActorWeak, ActorResult, FailurePhase, message_handlers};
use anyhow::{Result, anyhow};

#[derive(Debug)]
struct MyErrActor { should_fail_on_start: bool, should_fail_on_run: bool }

impl Actor for MyErrActor {
    type Args = (bool, bool); // (should_fail_on_start, should_fail_on_run)
    type Error = anyhow::Error;

    async fn on_start(args: Self::Args, ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
        tracing::info!("MyErrActor (id: {}) on_start called", ar.identity());
        if args.0 {
            return Err(anyhow!("Deliberate failure in on_start"));
        }
        Ok(MyErrActor { should_fail_on_start: args.0, should_fail_on_run: args.1 })
    }

    async fn on_run(&mut self, ar: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        tracing::info!("MyErrActor (id: {}) on_run called", ar.identity());
        if self.should_fail_on_run {
            return Err(anyhow!("Deliberate failure in on_run"));
        }
        Ok(true) // Return true to indicate run loop should stop
    }
}

// Message type
struct Ping;

#[message_handlers]
impl MyErrActor {
    #[handler]
    async fn handle_ping(&mut self, _msg: Ping, _: &ActorRef<Self>) {}
}


async fn check_actor_termination(fail_on_start: bool, fail_on_run: bool) {
    let (actor_ref, join_handle) = spawn::<MyErrActor>((fail_on_start, fail_on_run));
    let id = actor_ref.identity();

    match join_handle.await {
        Ok(actor_result) => match actor_result {
            ActorResult::Completed { actor, killed } => {
                tracing::info!(
                    "Actor (id: {}) COMPLETED. Killed: {}. Final state: {:?}",
                    id, killed, actor
                );
            }
            ActorResult::Failed { actor, error, phase, killed } => {
                tracing::info!(
                    "Actor (id: {}) FAILED. Phase: {:?}, Killed: {}. Error: {}. State: {:?}",
                    id, phase, killed, error, actor
                );
                if fail_on_start {
                    assert!(matches!(phase, FailurePhase::OnStart));
                    assert!(actor.is_none());
                }
                if fail_on_run && !fail_on_start {
                     assert!(matches!(phase, FailurePhase::OnRun));
                     assert!(actor.is_some());
                }
            }
        },
        Err(join_error) => {
            tracing::info!("Actor (id: {}) task JOIN ERROR: {:?}", id, join_error);
        }
    }
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .init();

    tracing::info!("--- Checking normal completion ---");
    check_actor_termination(false, false).await;
    tracing::info!("--- Checking failure in on_start ---");
    check_actor_termination(true, false).await;
    tracing::info!("--- Checking failure in on_run ---");
    check_actor_termination(false, true).await;
}

This ActorResult provides a comprehensive way to understand the termination reason and final state of an actor, which is crucial for supervision strategies and debugging.

Timeouts

Timeouts are essential for building resilient actor systems. rsActor provides timeout functionality for both message sending (tell) and request-response patterns (ask). This prevents your application from hanging indefinitely when actors become unresponsive or slow.

Overview

rsActor provides two timeout-enabled communication methods:

  • ask_with_timeout: Sends a message and waits for a reply within a specified timeout
  • tell_with_timeout: Sends a fire-and-forget message with a timeout for the send operation

Both methods use tokio::time::Duration to specify timeout values and return a Result that will contain a Timeout error if the operation exceeds the specified duration.

ask_with_timeout

The ask_with_timeout method is similar to the regular ask method, but allows you to specify a timeout for the entire request-response cycle.

Syntax

#![allow(unused)]
fn main() {
pub async fn ask_with_timeout<M>(&self, msg: M, timeout: Duration) -> Result<T::Reply>
where
    T: Message<M>,
    M: Send + 'static,
    T::Reply: Send + 'static,
}

Usage

use rsactor::{spawn, Actor, ActorRef, Message, impl_message_handler};
use std::time::Duration;
use anyhow::Result;

#[derive(Debug)]
struct MathActor;

impl Actor for MathActor {
    type Args = ();
    type Error = anyhow::Error;

    async fn on_start(_args: Self::Args, _ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(MathActor)
    }
}

struct MultiplyMsg(i32, i32);

impl Message<MultiplyMsg> for MathActor {
    type Reply = i32;

    async fn handle(&mut self, msg: MultiplyMsg, _ar: &ActorRef<Self>) -> Self::Reply {
        msg.0 * msg.1
    }
}

impl_message_handler!(MathActor, [MultiplyMsg]);

#[tokio::main]
async fn main() -> Result<()> {
    let (math_ref, _join_handle) = spawn::<MathActor>(());

    // Ask with timeout - will succeed if the actor responds within 1 second
    match math_ref.ask_with_timeout(MultiplyMsg(6, 7), Duration::from_secs(1)).await {
        Ok(product) => println!("Product: {}", product),
        Err(e) => println!("Failed to get product: {:?}", e),
    }

    Ok(())
}

When ask_with_timeout Times Out

If the actor doesn’t respond within the specified timeout, you’ll receive a Timeout error:

#![allow(unused)]
fn main() {
// This will likely timeout if the actor takes longer than 10ms to respond
match math_ref.ask_with_timeout(MultiplyMsg(1, 2), Duration::from_millis(10)).await {
    Ok(result) => println!("Result: {}", result),
    Err(e) => {
        if e.to_string().contains("timed out") {
            println!("Request timed out!");
        } else {
            println!("Other error: {}", e);
        }
    }
}
}

tell_with_timeout

The tell_with_timeout method is similar to the regular tell method, but allows you to specify a timeout for the send operation itself.

Syntax

#![allow(unused)]
fn main() {
pub async fn tell_with_timeout<M>(&self, msg: M, timeout: Duration) -> Result<()>
where
    T: Message<M>,
    M: Send + 'static,
}

Usage

use rsactor::{spawn, Actor, ActorRef, Message, impl_message_handler};
use std::time::Duration;
use anyhow::Result;
use log::info;

#[derive(Debug)]
struct EventActor;

impl Actor for EventActor {
    type Args = ();
    type Error = anyhow::Error;

    async fn on_start(_args: Self::Args, _ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(EventActor)
    }
}

struct EventMessage(String);

impl Message<EventMessage> for EventActor {
    type Reply = ();

    async fn handle(&mut self, msg: EventMessage, _ar: &ActorRef<Self>) -> Self::Reply {
        println!("EVENT: {}", msg.0);
    }
}

impl_message_handler!(EventActor, [EventMessage]);

#[tokio::main]
async fn main() -> Result<()> {
    let (event_ref, _join_handle) = spawn::<EventActor>(());

    // Tell with timeout - will fail if the message can't be enqueued within 100ms
    match event_ref.tell_with_timeout(
        EventMessage("System started".to_string()),
        Duration::from_millis(100)
    ).await {
        Ok(_) => info!("Event message sent successfully"),
        Err(e) => info!("Failed to send event message: {:?}", e),
    }

    Ok(())
}

When tell_with_timeout Times Out

tell_with_timeout typically only times out if:

  • The actor’s mailbox is full and cannot accept new messages
  • The actor has terminated and can no longer receive messages
  • There are system-level issues preventing message delivery

In most cases, tell_with_timeout completes quickly since sending a message is usually a fast operation.

Practical Example: Slow Actor with Timeouts

Here’s a comprehensive example demonstrating timeout behavior with actors that simulate different response times:

use rsactor::{message_handlers, spawn, Actor, ActorRef};
use std::time::Duration;
use anyhow::Result;
use tracing::info;

// Define an actor that can process requests with varying response times
struct TimeoutDemoActor {
    name: String,
}

impl Actor for TimeoutDemoActor {
    type Args = String;
    type Error = anyhow::Error;

    async fn on_start(name: Self::Args, _ar: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(Self { name })
    }
}

struct FastQuery(String);
struct SlowQuery(String);

#[message_handlers]
impl TimeoutDemoActor {
    #[handler]
    async fn handle_fast_query(&mut self, msg: FastQuery, _: &ActorRef<Self>) -> String {
        // Fast response - completes immediately
        format!("{}: Fast response to: {}", self.name, msg.0)
    }

    #[handler]
    async fn handle_slow_query(&mut self, msg: SlowQuery, _: &ActorRef<Self>) -> String {
        // Slow response - takes 500ms to complete
        tokio::time::sleep(Duration::from_millis(500)).await;
        format!("{}: Slow response to: {}", self.name, msg.0)
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();

    let (actor_ref, _join_handle) = spawn::<TimeoutDemoActor>("Demo".to_string());

    // Fast query with sufficient timeout - should succeed
    info!("=== Fast query with long timeout ===");
    match actor_ref.ask_with_timeout(
        FastQuery("What is your name?".to_string()),
        Duration::from_millis(100)
    ).await {
        Ok(response) => info!("Success: {}", response),
        Err(e) => info!("Failed: {}", e),
    }

    // Slow query with insufficient timeout - should fail
    info!("=== Slow query with short timeout ===");
    match actor_ref.ask_with_timeout(
        SlowQuery("Complex calculation".to_string()),
        Duration::from_millis(100)  // Less than 500ms needed
    ).await {
        Ok(response) => info!("Success: {}", response),
        Err(e) => info!("Failed: {}", e),
    }

    // Slow query with sufficient timeout - should succeed
    info!("=== Slow query with sufficient timeout ===");
    match actor_ref.ask_with_timeout(
        SlowQuery("Another calculation".to_string()),
        Duration::from_millis(1000)  // More than 500ms needed
    ).await {
        Ok(response) => info!("Success: {}", response),
        Err(e) => info!("Failed: {}", e),
    }

    Ok(())
}

Best Practices

1. Choose Appropriate Timeout Values

  • Too short: May cause unnecessary failures due to normal processing delays
  • Too long: May not protect against truly unresponsive actors
  • Consider your use case: Interactive UI vs batch processing will have different timeout requirements
#![allow(unused)]
fn main() {
// For interactive applications
let timeout = Duration::from_millis(100);

// For backend processing
let timeout = Duration::from_secs(30);

// For critical real-time systems
let timeout = Duration::from_millis(10);
}

2. Handle Timeout Errors Gracefully

Always handle timeout errors appropriately for your application:

#![allow(unused)]
fn main() {
match actor_ref.ask_with_timeout(msg, timeout).await {
    Ok(response) => {
        // Process successful response
        handle_response(response);
    }
    Err(e) if e.to_string().contains("timed out") => {
        // Handle timeout specifically
        log::warn!("Actor response timed out, using fallback");
        use_fallback_value();
    }
    Err(e) => {
        // Handle other errors
        log::error!("Actor communication failed: {}", e);
        return Err(e);
    }
}
}

3. Consider Retries with Backoff

For important operations, consider implementing retry logic:

#![allow(unused)]
fn main() {
use tokio::time::{sleep, Duration};

async fn ask_with_retries<T, M>(
    actor_ref: &ActorRef<T>,
    msg: M,
    timeout: Duration,
    max_retries: u32
) -> Result<T::Reply>
where
    T: Actor + Message<M>,
    M: Send + Clone + 'static,
    T::Reply: Send + 'static,
{
    for attempt in 1..=max_retries {
        match actor_ref.ask_with_timeout(msg.clone(), timeout).await {
            Ok(response) => return Ok(response),
            Err(e) if attempt == max_retries => return Err(e),
            Err(e) => {
                log::warn!("Attempt {} failed: {}", attempt, e);
                sleep(Duration::from_millis(100 * attempt as u64)).await;
            }
        }
    }
    unreachable!()
}
}

4. Use Timeouts in Supervision Strategies

Timeouts are particularly useful in supervision scenarios where you need to detect and handle unresponsive child actors:

#![allow(unused)]
fn main() {
// Supervisor checking child actor health
match child_ref.ask_with_timeout(HealthCheck, Duration::from_secs(5)).await {
    Ok(_) => {
        // Child is responsive
    }
    Err(_) => {
        // Child is unresponsive, consider restarting
        log::warn!("Child actor unresponsive, restarting...");
        restart_child_actor().await?;
    }
}
}

Error Types

When a timeout occurs, you’ll receive an error that contains information about:

  • The actor’s identity
  • The timeout duration that was exceeded
  • The operation that timed out (“ask” or “tell”)

This information helps with debugging and monitoring actor system health.

Performance Considerations

  • Timeouts add minimal overhead to message passing
  • Very short timeouts (< 1ms) may be unreliable due to system scheduling
  • Consider your system’s typical response times when setting timeouts
  • Monitor timeout rates to detect system performance issues

Timeouts are a crucial tool for building resilient actor systems that can handle failures gracefully and maintain responsive behavior even when some actors become slow or unresponsive.

Macros

rsActor provides several macros to reduce boilerplate code and make actor development more ergonomic. These macros handle the repetitive aspects of actor implementation while maintaining type safety and performance.

Available Macros

1. #[derive(Actor)] - Actor Derive Macro

Automatically implements the Actor trait for simple structs that don’t require complex initialization logic.

2. impl_message_handler! - Message Handler Macro

Generates the necessary boilerplate to wire up message handlers for an actor, supporting both regular and generic actors.

When to Use Macros

Use #[derive(Actor)] when:

  • Your actor doesn’t need complex initialization logic
  • The actor can be created directly from its field values
  • You want to minimize boilerplate for simple actors
  • Error handling can use std::convert::Infallible (never fails)

Use impl_message_handler! when:

  • You have multiple message types to handle
  • You want to avoid manual implementation of the MessageHandler trait
  • You’re working with generic actors
  • You need consistent message routing logic

Basic Usage Example

use rsactor::{Actor, ActorRef, Message, impl_message_handler, spawn};

// Simple actor using derive macro - no custom initialization needed
#[derive(Actor, Debug)]
struct UserAccount {
    username: String,
    balance: u64,
}

// Define messages
struct Deposit(u64);
struct GetBalance;

// Implement message handlers
impl Message<Deposit> for UserAccount {
    type Reply = u64; // Returns new balance
    async fn handle(&mut self, msg: Deposit, _: &ActorRef<Self>) -> Self::Reply {
        self.balance += msg.0;
        self.balance
    }
}

impl Message<GetBalance> for UserAccount {
    type Reply = u64;
    async fn handle(&mut self, _: GetBalance, _: &ActorRef<Self>) -> Self::Reply {
        self.balance
    }
}

// Wire up message handlers
impl_message_handler!(UserAccount, [Deposit, GetBalance]);

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create actor instance directly - no complex initialization
    let account = UserAccount {
        username: "alice".to_string(),
        balance: 100
    };
    let (actor_ref, _) = spawn(account);

    let new_balance = actor_ref.ask(Deposit(50)).await?;
    println!("New balance: {}", new_balance); // 150

    Ok(())
}

Key Benefits

  • Reduced Boilerplate: Automatically generate repetitive code
  • Type Safety: Compile-time verification of actor implementations
  • Zero Runtime Overhead: All code generation happens at compile time
  • Easy Maintenance: Simple to add new message types and actors

Derive Macro Features

The #[derive(Actor)] macro is perfect for simple actors:

#![allow(unused)]
fn main() {
#[derive(Actor)]
struct SimpleActor {
    name: String,
    value: i32,
}
// Automatically implements Actor trait with:
// - Args = Self (takes the struct instance)
// - Error = std::convert::Infallible (never fails)
// - on_start simply returns the provided instance
}

For detailed information about each macro, see the individual sections:

Message Handlers Macro

The #[message_handlers] attribute macro combined with #[handler] method attributes is the recommended approach for defining message handlers in rsActor. It automatically generates the necessary Message trait implementations from annotated methods.

Purpose

When an actor needs to handle multiple message types, manually implementing the Message trait for each one is verbose. The #[message_handlers] macro automates this by generating all the boilerplate from simple method signatures.

Basic Example

#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, message_handlers, spawn};

#[derive(Actor)]
struct CounterActor {
    value: i32,
}

// Define message types
struct Add(i32);
struct GetValue;

// Use message_handlers macro with handler attributes
#[message_handlers]
impl CounterActor {
    #[handler]
    async fn handle_add(&mut self, msg: Add, _: &ActorRef<Self>) -> i32 {
        self.value += msg.0;
        self.value
    }

    #[handler]
    async fn handle_get(&mut self, _: GetValue, _: &ActorRef<Self>) -> i32 {
        self.value
    }

    // Regular methods can coexist without the #[handler] attribute
    fn reset(&mut self) {
        self.value = 0;
    }
}
}

How It Works

Each #[handler] method is transformed into a Message<T> trait implementation:

  1. The first parameter after &mut self determines the message type T
  2. The return type becomes Message<T>::Reply
  3. The method body becomes the handle() implementation

The macro generates the equivalent of:

#![allow(unused)]
fn main() {
impl Message<Add> for CounterActor {
    type Reply = i32;
    async fn handle(&mut self, msg: Add, actor_ref: &ActorRef<Self>) -> Self::Reply {
        self.value += msg.0;
        self.value
    }
}
}

Generic Actor Support

The macro fully supports generic actors:

#![allow(unused)]
fn main() {
use rsactor::{Actor, ActorRef, message_handlers, spawn};
use std::fmt::Debug;

#[derive(Debug)]
struct GenericActor<T: Send + Debug + Clone + 'static> {
    value: Option<T>,
}

impl<T: Send + Debug + Clone + 'static> Actor for GenericActor<T> {
    type Error = String;
    type Args = ();

    async fn on_start(_: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(GenericActor { value: None })
    }
}

struct SetValue<T: Send + Debug + 'static>(T);
struct GetValue;

#[message_handlers]
impl<T: Send + Debug + Clone + 'static> GenericActor<T> {
    #[handler]
    async fn handle_set(&mut self, msg: SetValue<T>, _: &ActorRef<Self>) -> () {
        self.value = Some(msg.0);
    }

    #[handler]
    async fn handle_get(&mut self, _: GetValue, _: &ActorRef<Self>) -> Option<T> {
        self.value.clone()
    }
}
}

Error Handling with on_tell_result

When a handler returns Result<T, E>, the macro automatically generates an on_tell_result implementation that logs errors for tell() calls:

#![allow(unused)]
fn main() {
#[message_handlers]
impl MyActor {
    // Returning Result<T, E> auto-generates error logging for tell() calls
    #[handler]
    async fn handle_process(&mut self, msg: ProcessData, _: &ActorRef<Self>) -> Result<(), MyError> {
        self.process(msg.data)?;
        Ok(())
    }

    // Use #[handler(no_log)] to suppress automatic error logging
    #[handler(no_log)]
    async fn handle_silent(&mut self, msg: SilentOp, _: &ActorRef<Self>) -> Result<(), MyError> {
        // Errors won't be automatically logged for tell() calls
        Ok(())
    }
}
}

#[derive(Actor)] Macro

For simple actors that don’t need custom initialization logic, use #[derive(Actor)]:

#![allow(unused)]
fn main() {
#[derive(Actor)]
struct SimpleActor {
    name: String,
    count: u32,
}

// spawn takes the struct instance directly as Args
let actor = SimpleActor { name: "test".into(), count: 0 };
let (actor_ref, handle) = spawn::<SimpleActor>(actor);
}

This generates an Actor impl where:

  • type Args = Self (the struct itself)
  • type Error = std::convert::Infallible (never fails)
  • on_start simply returns the provided instance

Benefits

  1. Selective Processing: Only methods with #[handler] become message handlers
  2. Clean Separation: Regular methods coexist with handlers in the same impl block
  3. Automatic on_tell_result: Error logging for Result return types (suppressed via #[handler(no_log)])
  4. Generic Support: Full support for generic actors and message types
  5. Compile-Time Safety: Message handler signatures are verified at compile time
  6. Reduced Boilerplate: No manual Message trait implementations needed

Running the Example

cargo run --example derive_macro_demo
cargo run --example unified_macro_demo

Advanced Features

This section covers advanced features of rsActor that enable more sophisticated actor system designs:

Handler Traits

Handler traits enable type-erased message sending, allowing different actor types that handle the same message to be stored in a unified collection. This is essential for building systems where you need to broadcast messages to heterogeneous actors.

Overview

rsActor provides four handler traits:

TraitReferenceMessage Pattern
TellHandler<M>StrongFire-and-forget
AskHandler<M, R>StrongRequest-response
WeakTellHandler<M>WeakFire-and-forget
WeakAskHandler<M, R>WeakRequest-response

Strong handlers keep actors alive. Weak handlers do not prevent actors from being dropped.

TellHandler — Fire-and-Forget

Store different actor types that handle the same message in one collection:

#![allow(unused)]
fn main() {
use rsactor::{TellHandler, ActorRef, spawn, message_handlers, Actor};

// Two different actors that both handle PingMsg
#[derive(Actor)]
struct ActorA;
#[derive(Actor)]
struct ActorB;

struct PingMsg;

#[message_handlers]
impl ActorA {
    #[handler]
    async fn handle_ping(&mut self, _: PingMsg, _: &ActorRef<Self>) -> () {}
}

#[message_handlers]
impl ActorB {
    #[handler]
    async fn handle_ping(&mut self, _: PingMsg, _: &ActorRef<Self>) -> () {}
}

// Store both in a single collection
let (ref_a, _) = spawn::<ActorA>(ActorA);
let (ref_b, _) = spawn::<ActorB>(ActorB);

let handlers: Vec<Box<dyn TellHandler<PingMsg>>> = vec![
    (&ref_a).into(),  // From<&ActorRef<T>> — clones the reference
    ref_b.into(),     // From<ActorRef<T>> — moves ownership
];

// Broadcast to all handlers
for handler in &handlers {
    handler.tell(PingMsg).await?;
}
}

AskHandler — Request-Response

For actors that return the same reply type for a message:

#![allow(unused)]
fn main() {
use rsactor::AskHandler;

struct GetStatus;

// Both actors return String for GetStatus
let handlers: Vec<Box<dyn AskHandler<GetStatus, String>>> = vec![
    (&actor_a).into(),
    (&actor_b).into(),
];

for handler in &handlers {
    let status = handler.ask(GetStatus).await?;
    println!("Status: {}", status);
}
}

Weak Handlers

Weak handlers do not keep actors alive. You must upgrade() before sending messages:

#![allow(unused)]
fn main() {
use rsactor::{WeakTellHandler, ActorRef};

let weak_handlers: Vec<Box<dyn WeakTellHandler<PingMsg>>> = vec![
    ActorRef::downgrade(&actor_a).into(),
    ActorRef::downgrade(&actor_b).into(),
];

for handler in &weak_handlers {
    if let Some(strong) = handler.upgrade() {
        strong.tell(PingMsg).await?;
    }
    // else: actor was dropped, skip it
}
}

Available Methods

All handler traits provide these methods:

MethodTellHandlerAskHandler
tell(msg) / ask(msg)Async sendAsync send + reply
tell_with_timeout / ask_with_timeoutWith timeoutWith timeout
blocking_tell / blocking_askBlocking sendBlocking send + reply
clone_boxed()Clone handlerClone handler
downgrade()To weak handlerTo weak handler
as_control()Access lifecycleAccess lifecycle

Lifecycle Access via as_control()

Handler traits provide access to ActorControl for lifecycle management:

#![allow(unused)]
fn main() {
for handler in &handlers {
    let control = handler.as_control();
    println!("Actor {} alive: {}", control.identity(), control.is_alive());
}
}

Running the Example

cargo run --example handler_demo

Actor Control

The ActorControl trait provides type-erased lifecycle management for actors. It allows you to manage different actor types through a unified interface without knowing their message types.

Use Case

When you need to manage a collection of actors of different types (e.g., stopping all actors during shutdown), ActorControl lets you store them in a single Vec and control their lifecycle uniformly.

ActorControl — Strong Reference

#![allow(unused)]
fn main() {
use rsactor::{ActorControl, ActorRef, spawn, Actor};

// Store different actor types in a single collection
let controls: Vec<Box<dyn ActorControl>> = vec![
    (&worker_ref).into(),   // ActorRef<WorkerActor>
    (&logger_ref).into(),   // ActorRef<LoggerActor>
    (&cache_ref).into(),    // ActorRef<CacheActor>
];

// Check status of all actors
for control in &controls {
    println!("Actor {} alive: {}", control.identity(), control.is_alive());
}

// Stop all actors gracefully
for control in &controls {
    control.stop().await?;
}
}

Available Methods

MethodDescription
identity()Returns the actor’s unique Identity
is_alive()Checks if the actor is still running
stop()Gracefully stops the actor (async)
kill()Immediately terminates the actor
downgrade()Creates a Box<dyn WeakActorControl>
clone_boxed()Clones the control reference

WeakActorControl — Weak Reference

WeakActorControl does not keep actors alive and requires upgrading before performing operations:

#![allow(unused)]
fn main() {
use rsactor::{WeakActorControl, ActorRef};

let weak_controls: Vec<Box<dyn WeakActorControl>> = vec![
    ActorRef::downgrade(&worker_ref).into(),
    ActorRef::downgrade(&logger_ref).into(),
];

for control in &weak_controls {
    println!("Actor {} might be alive: {}", control.identity(), control.is_alive());

    if let Some(strong) = control.upgrade() {
        strong.stop().await?;
    }
}
}

Conversion

ActorControl is automatically implemented for all ActorRef<T> types:

#![allow(unused)]
fn main() {
// From ownership transfer
let control: Box<dyn ActorControl> = actor_ref.into();

// From reference (clones)
let control: Box<dyn ActorControl> = (&actor_ref).into();

// Similarly for weak references
let weak: Box<dyn WeakActorControl> = ActorRef::downgrade(&actor_ref).into();
}

Combining with Handler Traits

Handler traits (TellHandler, AskHandler) also provide access to ActorControl via as_control():

#![allow(unused)]
fn main() {
let handler: Box<dyn TellHandler<PingMsg>> = (&actor_ref).into();

// Access lifecycle through handler
let control = handler.as_control();
if control.is_alive() {
    handler.tell(PingMsg).await?;
}
}

Weak References

ActorWeak<T> is a weak reference to an actor that does not prevent the actor from being dropped. It is used to avoid circular references and memory leaks in complex actor graphs.

Creating Weak References

Weak references are created by calling ActorRef::downgrade():

#![allow(unused)]
fn main() {
use rsactor::{ActorRef, ActorWeak};

let weak_ref: ActorWeak<MyActor> = ActorRef::downgrade(&actor_ref);
}

Upgrading to Strong References

A weak reference can be upgraded back to a strong ActorRef if the actor is still alive:

#![allow(unused)]
fn main() {
if let Some(strong_ref) = weak_ref.upgrade() {
    // Actor is still alive, send a message
    strong_ref.tell(MyMessage).await?;
} else {
    // Actor has been dropped
    println!("Actor is no longer alive");
}
}

Checking Liveness

#![allow(unused)]
fn main() {
// Heuristic check — may return true even if upgrade would fail (race condition)
if weak_ref.is_alive() {
    // Actor might be alive, but always use upgrade() for certainty
}

// The identity is always available, even after the actor is dropped
println!("Actor ID: {}", weak_ref.identity());
}

Use in Lifecycle Methods

on_run and on_stop receive ActorWeak instead of ActorRef. This prevents the actor from holding a strong reference to itself, which would prevent graceful shutdown:

#![allow(unused)]
fn main() {
impl Actor for MyActor {
    // ...

    async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        // Use weak reference for identity/logging
        println!("Actor {} processing", actor_weak.identity());

        // Upgrade if you need to send messages to self
        if let Some(strong) = actor_weak.upgrade() {
            // Use strong_ref for operations that need ActorRef
        }
        Ok(false)
    }

    async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
        println!("Actor {} stopping (killed: {})", actor_weak.identity(), killed);
        Ok(())
    }
}
}

Common Patterns

Avoiding Circular References

When actors reference each other, use weak references to break cycles:

#![allow(unused)]
fn main() {
struct ParentActor {
    children: Vec<ActorRef<ChildActor>>,  // Strong refs — parent owns children
}

struct ChildActor {
    parent: ActorWeak<ParentActor>,  // Weak ref — child doesn't keep parent alive
}
}

Observer Pattern

Use weak references to observers so they don’t prevent the subject from being dropped:

#![allow(unused)]
fn main() {
struct EventBus {
    listeners: Vec<ActorWeak<ListenerActor>>,
}

impl EventBus {
    async fn notify_all(&mut self, event: Event) {
        // Retain only alive listeners
        self.listeners.retain(|weak| weak.is_alive());

        for weak in &self.listeners {
            if let Some(strong) = weak.upgrade() {
                let _ = strong.tell(event.clone()).await;
            }
        }
    }
}
}

Running the Example

cargo run --example weak_reference_demo

Metrics

rsActor provides optional per-actor performance metrics. Enable with the metrics feature flag:

[dependencies]
rsactor = { version = "0.14", features = ["metrics"] }

Tracked Metrics

MetricTypeDescription
message_countu64Total messages processed
avg_processing_timeDurationAverage message processing time
max_processing_timeDurationMaximum processing time observed
error_countu64Total errors during message handling
uptimeDurationTime since actor was started
last_activityOption<SystemTime>Timestamp of last message processing

Usage

Snapshot

Get all metrics at once:

#![allow(unused)]
fn main() {
let metrics = actor_ref.metrics();
println!("Messages: {}", metrics.message_count);
println!("Avg time: {:?}", metrics.avg_processing_time);
println!("Max time: {:?}", metrics.max_processing_time);
println!("Errors: {}", metrics.error_count);
println!("Uptime: {:?}", metrics.uptime);
println!("Last activity: {:?}", metrics.last_activity);
}

Individual Accessors

#![allow(unused)]
fn main() {
let count = actor_ref.message_count();
let avg = actor_ref.avg_processing_time();
let max = actor_ref.max_processing_time();
let errors = actor_ref.error_count();
let uptime = actor_ref.uptime();
let last = actor_ref.last_activity();
}

Design

  • Lock-free: Uses AtomicU64 counters for zero-contention updates
  • Zero overhead when disabled: When the metrics feature is not enabled, no metrics code is compiled
  • Automatic collection: Metrics are collected transparently during message processing
  • Post-mortem analysis: Metrics survive actor drop via ActorWeak (strong reference to metrics collector)

Philosophy

rsActor exposes raw metrics data. How you monitor, alert, or visualize this data is your responsibility. You can integrate with any monitoring system (Prometheus, Grafana, custom dashboards, etc.).

Running the Example

cargo run --example metrics_demo --features metrics

Dead Letter Tracking

Dead letters are messages that could not be delivered to their intended recipients. rsActor automatically tracks and logs all dead letters via structured tracing.

When Dead Letters Occur

ReasonDescriptionExample
ActorStoppedActor’s mailbox channel closedSending to a stopped actor
TimeoutSend or ask operation exceeded timeouttell_with_timeout expired
ReplyDroppedReply channel dropped before responseActor crashed during ask processing

Observability

Dead letters are always logged as structured tracing::warn! events:

WARN dead_letter: Dead letter: message could not be delivered
  actor.id=42
  actor.type_name="MyActor"
  message.type_name="PingMessage"
  dead_letter.reason="actor stopped"
  dead_letter.operation="tell"

To see these logs, initialize a tracing subscriber:

#![allow(unused)]
fn main() {
tracing_subscriber::fmt()
    .with_env_filter("rsactor=warn")
    .init();
}

Performance

Dead letter recording has minimal overhead:

ScenarioOverhead
Successful message delivery (hot path)Zero — no code executes
Dead letter, no tracing subscriber~5-50 ns
Dead letter, subscriber active~1-10 us

The record function is marked #[cold] to optimize the hot path.

Testing with test-utils

Enable the test-utils feature to count dead letters in tests:

[dev-dependencies]
rsactor = { version = "0.14", features = ["test-utils"] }
#![allow(unused)]
fn main() {
use rsactor::{dead_letter_count, reset_dead_letter_count};

#[tokio::test]
async fn test_dead_letters() {
    reset_dead_letter_count();

    let (actor_ref, handle) = spawn::<MyActor>(args);
    actor_ref.stop().await.unwrap();
    handle.await.unwrap();

    // Sending to a stopped actor creates a dead letter
    let _ = actor_ref.tell(MyMessage).await;
    assert_eq!(dead_letter_count(), 1);
}
}

Warning: Never enable test-utils in production builds. It exposes internal metrics that could be misused.

Error Integration

Dead letter tracking works seamlessly with rsActor’s error types:

#![allow(unused)]
fn main() {
match actor_ref.tell(msg).await {
    Ok(_) => { /* delivered successfully */ }
    Err(rsactor::Error::Send { .. }) => {
        // Dead letter was automatically logged
        // Error contains actor identity and details
    }
    Err(rsactor::Error::Timeout { .. }) => {
        // Dead letter recorded with Timeout reason
        // Check if error is retryable
        if e.is_retryable() {
            // Retry logic
        }
    }
    _ => {}
}
}

Tracing & Observability

rsActor provides comprehensive observability through the tracing crate.

Architecture

Tracing in rsActor has two layers:

  1. Core logging (always available): tracing::warn!, tracing::error!, tracing::debug! for lifecycle events, dead letters, and errors. The tracing crate is a required dependency.

  2. Instrumentation spans (opt-in via tracing feature): #[tracing::instrument] attributes that create structured spans with timing and context for actor lifecycle and message processing.

Enabling Instrumentation

[dependencies]
rsactor = { version = "0.14", features = ["tracing"] }
tracing = "0.1"
tracing-subscriber = "0.3"

What Gets Traced

Always Available (Core Logging)

  • Dead letter warnings with structured fields
  • Actor on_start success/failure
  • Actor on_stop errors
  • on_run errors
  • Reply channel failures

With tracing Feature (Instrumentation Spans)

SpanFieldsDescription
actor_lifecycleactor_id, actor_typeEntire actor lifecycle
actor_on_startInitialization phase
actor_on_runEach idle handler invocation
actor_on_stopkilledShutdown phase
actor_process_messageEach message processing
actor_tellactor_id, message_typeTell operation
actor_askactor_id, message_type, reply_typeAsk operation
actor_tell_with_timeoutactor_id, message_type, timeout_msTell with timeout
actor_ask_with_timeoutactor_id, message_type, reply_type, timeout_msAsk with timeout
actor_killactor_idKill signal
actor_stopactor_idStop signal

Setup Example

use tracing_subscriber;

#[tokio::main]
async fn main() {
    // Simple setup
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .with_target(false)
        .init();

    // Your actor code here...
}

Running with Tracing

# Basic logging (always works)
RUST_LOG=debug cargo run --example basic

# With instrumentation spans
RUST_LOG=debug cargo run --example tracing_demo --features tracing

Feature Flag Behavior

ConfigurationCore LoggingInstrumentation Spans
DefaultAvailableDisabled
features = ["tracing"]AvailableEnabled

The tracing feature only controls #[tracing::instrument] attributes. Core logging via tracing::warn!, tracing::error!, etc. is always available regardless of feature flags.

Running the Example

RUST_LOG=debug cargo run --example tracing_demo --features tracing

Deadlock Detection

rsActor provides runtime deadlock detection that catches circular ask dependencies before they cause your application to hang.

Why Deadlocks Happen

Each actor processes messages sequentially in a single loop. When an actor calls ask, it pauses its loop to wait for the reply. If Actor A asks Actor B, and B simultaneously asks A, both loops are paused — neither can process the other’s request.

Actor A handler: actor_ref_b.ask(msg).await  ← A's loop paused, waiting for B
Actor B handler: actor_ref_a.ask(msg).await  ← B's loop paused, waiting for A
→ Both wait forever = deadlock

This can also happen with:

  • Self-ask: An actor asking itself
  • Indirect chains: A → B → C → A

Enabling Detection

Add the deadlock-detection feature:

[dependencies]
rsactor = { version = "0.14", features = ["deadlock-detection"] }

When disabled, all detection code is removed at compile time — zero overhead in production.

Auto-Enabling in Debug Builds

Cargo does not support enabling features based on build profile (debug vs release). Here are practical ways to keep detection active during development while excluding it from release builds.

Define aliases in .cargo/config.toml:

# .cargo/config.toml
[alias]
dev-run = "run --features deadlock-detection"
dev-test = "test --features deadlock-detection"
cargo dev-run                    # debug + deadlock detection
cargo dev-test                   # test + deadlock detection
cargo run --release              # release, no overhead

Makefile / justfile

Centralize dev commands with the feature flag included:

# Makefile
run:
	cargo run --features deadlock-detection

test:
	cargo test --features deadlock-detection

release:
	cargo build --release

build.rs Guardrail

For a compile-time reminder, add a build.rs that detects debug profile and a compile_error! that fires when the feature is missing:

# Cargo.toml
[features]
deadlock-detection = ["rsactor/deadlock-detection"]
// build.rs
fn main() {
    let profile = std::env::var("PROFILE").unwrap();
    println!("cargo::rustc-check-cfg=cfg(debug_build)");
    if profile == "debug" {
        println!("cargo:rustc-cfg=debug_build");
    }
}
#![allow(unused)]
fn main() {
// main.rs — reminds you to enable the feature in debug
#[cfg(all(debug_build, not(feature = "deadlock-detection")))]
compile_error!(
    "Enable deadlock detection in debug builds: \
     cargo run --features deadlock-detection"
);
}

Note: build.rs cannot activate Cargo features — it can only set cfg flags. The compile_error! approach acts as a guardrail that reminds developers to pass the feature flag.

For full details, see the Deadlock Detection Guide.

How It Works

The framework maintains a global wait-for graph:

  1. Before each ask, an edge caller → callee is registered
  2. The graph is checked for cycles
  3. If a cycle exists, the framework panics immediately with a descriptive message
  4. When ask completes, the edge is automatically removed
1. A asks B → graph: {A → B} → no cycle → proceed
2. B asks A → graph: {A → B, B → A} → cycle detected! → panic!

Panic Message

Deadlocks are design errors, so the framework panics (like an index out of bounds) rather than returning an error:

Deadlock detected: ask cycle MyActorA(#1) -> MyActorB(#2) -> MyActorA(#1)
This is a design error. Use `tell` to break the cycle,
or restructure actor dependencies.

The message includes actor type names and IDs, making it easy to identify the problematic interaction.

Fixing Deadlocks

Option 1: Replace ask with tell

If one direction doesn’t need a synchronous reply, use tell (fire-and-forget):

#![allow(unused)]
fn main() {
// Before: A and B ask each other (deadlock risk)
#[handler]
async fn handle_request(&mut self, msg: Request, _: &ActorRef<Self>) -> Response {
    let result = self.other_actor.ask(Query).await?;
    Response(result)
}

// After: A tells B, no waiting (deadlock-free)
#[handler]
async fn handle_request(&mut self, msg: Request, _: &ActorRef<Self>) {
    self.other_actor.tell(Notify { data: msg.data }).await?;
}
}

Option 2: Restructure Dependencies

Eliminate cycles by introducing a mediator:

Before: A ↔ B (bidirectional ask = cycle risk)
After:  A → Mediator ← B (no cycles possible)

Option 3: Use Callbacks

Pass a oneshot::Sender via tell instead of using ask:

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;

struct RequestWithCallback {
    data: String,
    reply_tx: oneshot::Sender<Response>,
}

// Send via tell with a reply channel
let (tx, rx) = oneshot::channel();
actor_b.tell(RequestWithCallback { data, reply_tx: tx }).await?;
let response = rx.await?;
}

What Gets Detected

ScenarioDetected?
Self-ask (A → A)Yes
2-actor cycle (A → B → A)Yes
N-actor chain (A → B → C → A)Yes
Sequential asks (A → B, then A → C)No false positive
Non-actor caller (main → A)Skipped (safe)

Limitations

  • blocking_ask: Not tracked, as it’s designed for non-actor contexts that can’t form cycles.
  • Concurrent asks: Using tokio::join! to send multiple ask calls from one handler may miss some cycles, since the graph stores one edge per caller.
  • tokio::spawn in handlers: Spawned tasks don’t inherit the actor identity, so their ask calls aren’t tracked.

Performance

ConditionOverhead
Feature disabledZero (compile-time removal)
Feature enabled, normal ask~100-500 ns per call

The wait-for graph only contains in-flight ask calls, so it stays small and traversal is fast.

Examples

This section provides practical examples demonstrating various features and patterns of the rsActor framework.

Available Examples

Core Patterns

Advanced Patterns

  • Kill Demo — Graceful stop vs immediate kill, ActorResult inspection
  • Ask Join — ask_join pattern with JoinHandle
  • Handler Demo — Handler traits for type-erased heterogeneous collections
  • Weak References — ActorWeak for breaking circular references
  • Metrics — Per-actor performance metrics
  • Tracing — Structured tracing and observability

Classic Problems

Running Examples

All examples are in the examples/ directory:

# Core examples
cargo run --example basic
cargo run --example actor_async_worker
cargo run --example actor_blocking_task

# Advanced examples
cargo run --example kill_demo
cargo run --example ask_join_demo
cargo run --example handler_demo
cargo run --example weak_reference_demo

# Feature-gated examples
cargo run --example metrics_demo --features metrics
cargo run --example tracing_demo --features tracing

Example Structure

Each example follows this pattern:

  1. Actor Definition: Define the actor struct and implement the Actor trait (or use #[derive(Actor)])
  2. Message Types: Define messages that the actor can handle
  3. Message Handlers: Use #[message_handlers] with #[handler] attributes
  4. Usage: Demonstrate spawning, messaging, and lifecycle management

Basic Usage

This example demonstrates the fundamental concepts of rsActor through a simple counter actor that maintains internal state and processes messages.

Overview

The basic example shows:

  • How to define an actor with internal state
  • Implementing the Actor trait with lifecycle methods
  • Creating and handling different message types
  • Using the #[message_handlers] macro
  • Spawning actors and managing their lifecycle
  • Periodic tasks within actors using on_run

Code Example

use anyhow::Result;
use rsactor::{message_handlers, Actor, ActorRef, ActorWeak};
use tokio::time::{interval, Duration};
use tracing::info;

// Message types
struct Increment; // Message to increment the actor's counter
struct Decrement; // Message to decrement the actor's counter

// Define the actor struct
struct MyActor {
    count: u32,                        // Internal state of the actor
    start_up: std::time::Instant,      // Track the start time
    tick_300ms: tokio::time::Interval, // Interval for 300ms ticks
    tick_1s: tokio::time::Interval,    // Interval for 1s ticks
}

// Implement the Actor trait for MyActor
impl Actor for MyActor {
    type Args = Self;
    type Error = anyhow::Error;

    // Called when the actor is started
    async fn on_start(args: Self::Args, _actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        info!("MyActor started. Initial count: {}.", args.count);
        Ok(args)
    }

    // Called repeatedly when the message queue is empty (idle handler).
    // Returns Ok(true) to continue calling on_run, Ok(false) to stop idle processing.
    // Note: receives &ActorWeak<Self>, not &ActorRef<Self>.
    async fn on_run(&mut self, _actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        // Use tokio::select! to handle multiple async operations
        tokio::select! {
            _ = self.tick_300ms.tick() => {
                println!("300ms tick. Elapsed: {:?}", self.start_up.elapsed());
            }
            _ = self.tick_1s.tick() => {
                println!("1s tick. Elapsed: {:?}", self.start_up.elapsed());
            }
        }
        Ok(true) // Continue calling on_run
    }
}

// Message handling using the #[message_handlers] macro with #[handler] attributes
#[message_handlers]
impl MyActor {
    #[handler]
    async fn handle_increment(&mut self, _msg: Increment, _: &ActorRef<Self>) -> u32 {
        self.count += 1;
        println!("MyActor handled Increment. Count is now {}.", self.count);
        self.count
    }

    #[handler]
    async fn handle_decrement(&mut self, _msg: Decrement, _: &ActorRef<Self>) -> u32 {
        self.count -= 1;
        println!("MyActor handled Decrement. Count is now {}.", self.count);
        self.count
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .with_target(false)
        .init();

    // Create actor instance with initial state
    let my_actor = MyActor {
        count: 100,
        start_up: std::time::Instant::now(),
        tick_300ms: interval(Duration::from_millis(300)),
        tick_1s: interval(Duration::from_secs(1)),
    };

    // Spawn the actor
    let (actor_ref, join_handle) = rsactor::spawn::<MyActor>(my_actor);
    println!("MyActor spawned with ID: {}", actor_ref.identity());

    // Wait a bit to see some ticks
    tokio::time::sleep(Duration::from_millis(700)).await;

    // Send messages and await replies
    println!("Sending Increment message...");
    let count_after_inc: u32 = actor_ref.ask(Increment).await?;
    println!("Reply after Increment: {}", count_after_inc);

    println!("Sending Decrement message...");
    let count_after_dec: u32 = actor_ref.ask(Decrement).await?;
    println!("Reply after Decrement: {}", count_after_dec);

    // Wait for more ticks
    tokio::time::sleep(Duration::from_millis(700)).await;

    // Stop the actor gracefully
    println!("Stopping actor...");
    actor_ref.stop().await?;

    // Wait for completion and check result
    let result = join_handle.await?;
    match result {
        rsactor::ActorResult::Completed { actor, killed } => {
            println!("Actor completed. Final count: {}. Killed: {}",
                     actor.count, killed);
        }
        rsactor::ActorResult::Failed { actor, error, phase, killed } => {
            println!("Actor failed: {}. Phase: {:?}, Killed: {}",
                     error, phase, killed);
            if let Some(actor) = actor {
                println!("Final count: {}", actor.count);
            }
        }
    }

    Ok(())
}

Key Concepts Demonstrated

1. Actor State Management

The MyActor struct encapsulates:

  • count: The main business state
  • start_up: Timing information
  • tick_300ms and tick_1s: Periodic timers

2. Lifecycle Methods

  • on_start: Initializes the actor when spawned
  • on_run: Runs continuously, handling periodic tasks with tokio::select!

3. Message Handling

  • Increment and Decrement messages modify the actor’s state
  • Each message handler returns the new count value
  • Messages are processed sequentially, ensuring thread safety

4. Actor Communication

  • ask: Send a message and wait for a reply
  • tell: Send a message without waiting (fire-and-forget)
  • stop: Gracefully shutdown the actor

5. Error Handling

The example demonstrates proper error handling throughout the actor lifecycle and shows how to access the final actor state after completion.

Running the Example

cargo run --example basic

You’ll see output showing the periodic ticks, message processing, and graceful shutdown.

Next Steps

This basic example forms the foundation for more complex actor patterns. Next, explore:

Async Worker

This example demonstrates a more complex actor interaction pattern where one actor (Requester) delegates asynchronous work to another actor (Worker). The Worker actor spawns async tasks to process requests and sends results back to the Requester.

Overview

The async worker pattern is useful when you need:

  • Work delegation between actors
  • Parallel processing of multiple requests
  • Async task management within actors
  • Bidirectional communication between actors

Architecture

RequesterActor ──[RequestWork]──> WorkerActor
      ↑                              │
      │                              ↓
      └──[WorkResult]──────── [spawns async task]

Key Components

  1. RequesterActor: Initiates work requests and handles results
  2. WorkerActor: Receives requests, spawns async tasks, and sends back results
  3. Async Tasks: Background tasks that perform the actual work

Implementation

Message Types

#![allow(unused)]
fn main() {
// Message to request work from Worker
struct RequestWork {
    task_id: usize,
    data: String,
}

// Message containing work results
struct WorkResult {
    task_id: usize,
    result: String,
}
}

RequesterActor

#![allow(unused)]
fn main() {
struct RequesterActor {
    worker_ref: ActorRef<WorkerActor>,
    received_results: Vec<String>,
}

impl Actor for RequesterActor {
    type Args = ActorRef<WorkerActor>;
    type Error = anyhow::Error;

    async fn on_start(args: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        println!("RequesterActor started");
        Ok(RequesterActor {
            worker_ref: args,
            received_results: Vec::new(),
        })
    }
}

impl Message<RequestWork> for RequesterActor {
    type Reply = ();

    async fn handle(&mut self, msg: RequestWork, actor_ref: &ActorRef<Self>) -> Self::Reply {
        println!("RequesterActor sending work request for task {}", msg.task_id);

        // Send request to the worker actor
        let requester = actor_ref.clone();
        let work_msg = ProcessTask {
            task_id: msg.task_id,
            data: msg.data,
            callback: requester,
        };

        if let Err(e) = self.worker_ref.tell(work_msg).await {
            eprintln!("Failed to send work to worker: {}", e);
        }
    }
}

impl Message<WorkResult> for RequesterActor {
    type Reply = ();

    async fn handle(&mut self, msg: WorkResult, _: &ActorRef<Self>) -> Self::Reply {
        println!("RequesterActor received result for task {}: {}",
                 msg.task_id, msg.result);
        self.received_results.push(msg.result);
    }
}
}

WorkerActor

#![allow(unused)]
fn main() {
struct WorkerActor {
    active_tasks: usize,
}

impl Actor for WorkerActor {
    type Args = ();
    type Error = anyhow::Error;

    async fn on_start(_: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        println!("WorkerActor started");
        Ok(WorkerActor { active_tasks: 0 })
    }
}

impl Message<ProcessTask> for WorkerActor {
    type Reply = ();

    async fn handle(&mut self, msg: ProcessTask, _: &ActorRef<Self>) -> Self::Reply {
        self.active_tasks += 1;
        println!("WorkerActor processing task {} (active: {})",
                 msg.task_id, self.active_tasks);

        let task_id = msg.task_id;
        let data = msg.data;
        let callback = msg.callback;

        // Spawn an async task to do the work
        tokio::spawn(async move {
            // Simulate some async work (e.g., network request, database query)
            tokio::time::sleep(Duration::from_millis(100 + task_id as u64 * 50)).await;

            let result = format!("Processed: {}", data);
            println!("Task {} completed with result: {}", task_id, result);

            // Send result back to requester
            let work_result = WorkResult { task_id, result };
            if let Err(e) = callback.tell(work_result).await {
                eprintln!("Failed to send result back: {}", e);
            }
        });

        self.active_tasks -= 1;
    }
}
}

Key Patterns Demonstrated

1. Actor-to-Actor Communication

  • Requester sends work requests to Worker
  • Worker sends results back to Requester
  • Bidirectional message flow

2. Async Task Spawning

  • Worker spawns tokio::spawn tasks for parallel processing
  • Tasks run independently of the actor’s message processing loop
  • Results are sent back via actor references

3. Callback Pattern

  • Work requests include a callback reference (ActorRef)
  • Spawned tasks use the callback to return results
  • Enables loose coupling between work requesters and processors

4. State Management

  • Each actor maintains its own state independently
  • Worker tracks active tasks
  • Requester accumulates results

Benefits

Scalability

  • Multiple work requests can be processed in parallel
  • Worker actor doesn’t block while tasks are running
  • Easy to scale by adding more worker actors

Fault Isolation

  • Failed tasks don’t crash the actors
  • Actor supervision can restart failed workers
  • Error handling is isolated per task

Resource Management

  • Tasks are managed by Tokio’s runtime
  • Automatic cleanup when tasks complete
  • Can implement backpressure by limiting concurrent tasks

Usage Example

#[tokio::main]
async fn main() -> Result<()> {
    // Spawn worker actor
    let (worker_ref, worker_handle) = rsactor::spawn::<WorkerActor>(());

    // Spawn requester actor with worker reference
    let (requester_ref, requester_handle) = rsactor::spawn::<RequesterActor>(worker_ref);

    // Send multiple work requests
    for i in 0..5 {
        let request = RequestWork {
            task_id: i,
            data: format!("Task data {}", i),
        };
        requester_ref.tell(request).await?;

        // Small delay between requests
        tokio::time::sleep(Duration::from_millis(50)).await;
    }

    // Let the work complete
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Cleanup
    requester_ref.stop().await?;
    worker_ref.stop().await?;

    Ok(())
}

Running the Example

cargo run --example actor_async_worker

You’ll see output showing:

  • Work requests being sent
  • Tasks being processed in parallel
  • Results being received back
  • Task completion timing

When to Use This Pattern

  • API Gateways: Distribute requests to worker services
  • Data Processing: Parallel processing of data batches
  • I/O Operations: Handle multiple network/database requests
  • Background Jobs: Queue and process background tasks
  • Microservices: Inter-service communication patterns

This pattern demonstrates how actors can coordinate complex asynchronous workflows while maintaining clean separation of concerns and fault tolerance.

Blocking Task

This example demonstrates how actors can interact with CPU-intensive or blocking operations without blocking the async runtime. It shows the proper way to integrate blocking tasks with the actor system using tokio::task::spawn_blocking and the blocking communication APIs.

Overview

The blocking task pattern is essential when you need to:

  • CPU-intensive computations that would block the async executor
  • Synchronous I/O operations with legacy APIs
  • Bridge synchronous and asynchronous code
  • Prevent blocking the main actor runtime

Key Concepts

Why Use Blocking Tasks?

Tokio’s async runtime is designed for I/O-bound operations. CPU-intensive or truly blocking operations can:

  • Block the entire async executor
  • Prevent other actors from processing messages
  • Degrade overall system performance

Solution: spawn_blocking

Tokio provides spawn_blocking to run blocking operations on a dedicated thread pool, keeping the main async runtime responsive.

Architecture

Actor ←→ [tokio channels] ←→ Blocking Task
  ↑                              ↓
  └──[blocking API]──────────────┘
     (ask_blocking/tell_blocking)

Implementation

Message Types

#![allow(unused)]
fn main() {
// Messages for actor communication
struct GetState;
struct SetFactor(f64);
struct ProcessedData {
    value: f64,
    timestamp: std::time::Instant,
}

// Commands for the blocking task
enum TaskCommand {
    ChangeInterval(Duration),
    Stop,
}
}

Actor with Blocking Task

#![allow(unused)]
fn main() {
struct SyncDataProcessorActor {
    factor: f64,
    latest_value: Option<f64>,
    task_sender: Option<mpsc::UnboundedSender<TaskCommand>>,
    task_handle: Option<task::JoinHandle<()>>,
}

impl Actor for SyncDataProcessorActor {
    type Args = f64; // Initial factor
    type Error = anyhow::Error;

    async fn on_start(factor: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        info!("SyncDataProcessorActor started with factor: {}", factor);

        // Create communication channels
        let (task_sender, task_receiver) = mpsc::unbounded_channel();

        // Spawn the blocking task
        let actor_ref_clone = actor_ref.clone();
        let task_handle = task::spawn_blocking(move || {
            sync_background_task(factor, task_receiver, actor_ref_clone)
        });

        Ok(Self {
            factor,
            latest_value: None,
            task_sender: Some(task_sender),
            task_handle: Some(task_handle),
        })
    }

    async fn on_stop(&mut self, _: &ActorRef<Self>) -> Result<(), Self::Error> {
        info!("SyncDataProcessorActor stopping - sending stop command to background task");

        // Signal the background task to stop
        if let Some(sender) = &self.task_sender {
            if sender.send(TaskCommand::Stop).is_err() {
                debug!("Background task already stopped or receiver dropped");
            }
        }

        // Wait for the background task to complete
        if let Some(handle) = self.task_handle.take() {
            match handle.await {
                Ok(()) => info!("Background task completed successfully"),
                Err(e) => info!("Background task completed with error: {}", e),
            }
        }

        Ok(())
    }
}
}

Blocking Task Implementation

#![allow(unused)]
fn main() {
fn sync_background_task(
    mut factor: f64,
    mut task_receiver: mpsc::UnboundedReceiver<TaskCommand>,
    actor_ref: ActorRef<SyncDataProcessorActor>,
) {
    info!("Sync background task started");
    let mut interval = Duration::from_millis(500);
    let mut counter = 0.0;

    loop {
        // Check for commands from the actor (non-blocking)
        match task_receiver.try_recv() {
            Ok(TaskCommand::ChangeInterval(new_interval)) => {
                info!("Background task: changing interval to {:?}", new_interval);
                interval = new_interval;
            }
            Ok(TaskCommand::Stop) => {
                info!("Background task: received stop command");
                break;
            }
            Err(mpsc::error::TryRecvError::Empty) => {
                // No command available, continue with normal processing
            }
            Err(mpsc::error::TryRecvError::Disconnected) => {
                info!("Background task: actor disconnected, stopping");
                break;
            }
        }

        // Simulate CPU-intensive work
        counter += 1.0;
        let processed_value = expensive_calculation(counter, factor);

        // Send result back to actor using blocking API
        let message = ProcessedData {
            value: processed_value,
            timestamp: std::time::Instant::now(),
        };

        if let Err(e) = actor_ref.tell_blocking(message) {
            info!("Background task: failed to send data to actor: {}", e);
            break;
        }

        // Sleep (blocking operation)
        thread::sleep(interval);
    }

    info!("Sync background task finished");
}

fn expensive_calculation(input: f64, factor: f64) -> f64 {
    // Simulate CPU-intensive calculation
    let mut result = input * factor;
    for _ in 0..1000000 {
        result = result.sin().cos().tan();
    }
    result
}
}

Message Handlers

#![allow(unused)]
fn main() {
impl Message<ProcessedData> for SyncDataProcessorActor {
    type Reply = ();

    async fn handle(&mut self, msg: ProcessedData, _: &ActorRef<Self>) -> Self::Reply {
        debug!("Actor received processed data: value={:.6}, timestamp={:?}",
               msg.value, msg.timestamp);
        self.latest_value = Some(msg.value);
    }
}

impl Message<SetFactor> for SyncDataProcessorActor {
    type Reply = f64; // Return the new factor

    async fn handle(&mut self, msg: SetFactor, _: &ActorRef<Self>) -> Self::Reply {
        info!("Actor: changing factor from {} to {}", self.factor, msg.0.0);
        self.factor = msg.0.0;

        // Could send a command to the background task if needed
        // if let Some(sender) = &self.task_sender {
        //     sender.send(TaskCommand::UpdateFactor(self.factor)).ok();
        // }

        self.factor
    }
}

impl Message<GetState> for SyncDataProcessorActor {
    type Reply = (f64, Option<f64>); // (factor, latest_value)

    async fn handle(&mut self, _: GetState, _: &ActorRef<Self>) -> Self::Reply {
        (self.factor, self.latest_value)
    }
}
}

Key Patterns

1. Proper Blocking API Usage

  • Use tell_blocking and ask_blocking only within spawn_blocking tasks
  • These APIs are designed for Tokio’s blocking thread pool
  • NOT for use in std::thread::spawn or general sync code

2. Communication Channels

  • Use tokio::sync::mpsc for actor → blocking task communication
  • Use actor messages for blocking task → actor communication
  • Separate concerns: commands vs. data

3. Lifecycle Management

  • Spawn blocking tasks in on_start
  • Clean up in on_stop by sending stop commands
  • Await task completion to ensure proper cleanup

4. Error Handling

  • Handle channel disconnections gracefully
  • Propagate errors appropriately between sync and async contexts
  • Use try_recv for non-blocking command checking

Benefits

Runtime Protection

  • Blocking operations don’t block the async executor
  • Other actors continue processing messages
  • Maintains system responsiveness

Resource Management

  • Tokio manages the blocking thread pool
  • Automatic scaling based on workload
  • Proper cleanup when actors stop

Flexibility

  • Integrate legacy synchronous code
  • Handle CPU-intensive algorithms
  • Bridge different execution models

Usage Example

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    // Spawn the actor with initial factor
    let (actor_ref, join_handle) = rsactor::spawn::<SyncDataProcessorActor>(2.0);

    // Let it run for a bit
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Get current state
    let (factor, latest) = actor_ref.ask(GetState).await?;
    println!("Current state: factor={}, latest_value={:?}", factor, latest);

    // Change the factor
    let new_factor = actor_ref.ask(SetFactor(3.5)).await?;
    println!("Changed factor to: {}", new_factor);

    // Let it run with new factor
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Stop the actor
    actor_ref.stop().await?;
    let result = join_handle.await?;
    println!("Actor stopped: {:?}", result);

    Ok(())
}

Running the Example

cargo run --example actor_blocking_task

Output shows:

  • Background task processing data continuously
  • Actor receiving processed values
  • Factor changes affecting calculations
  • Proper cleanup on shutdown

When to Use This Pattern

  • Mathematical Computations: Heavy algorithms, simulations
  • Legacy Integration: Wrapping synchronous libraries
  • File Processing: Large file operations, compression
  • Database Operations: Synchronous database drivers
  • System Calls: Direct OS interactions

This pattern enables seamless integration of blocking operations while maintaining the benefits of the actor model and async runtime efficiency.

Actor with Timeout

This example demonstrates how to handle timeouts in actor communication using ask_with_timeout and tell_with_timeout. Proper timeout handling is crucial for building resilient actor systems that can gracefully handle slow or unresponsive actors.

Overview

Timeout handling addresses several important scenarios:

  • Slow operations that might take longer than expected
  • Unresponsive actors due to blocking or infinite loops
  • Network delays in distributed systems
  • Resource contention causing processing delays
  • Graceful degradation when services are overloaded

Key Methods

rsActor provides timeout-enabled communication methods:

#![allow(unused)]
fn main() {
// Request with timeout - fails if no response within duration
let result = actor_ref.ask_with_timeout(message, Duration::from_secs(5)).await;

// Fire-and-forget with timeout - fails if message can't be sent within duration
actor_ref.tell_with_timeout(message, Duration::from_millis(100)).await;
}

Implementation

Actor Definition

#![allow(unused)]
fn main() {
struct TimeoutDemoActor {
    name: String,
}

impl Actor for TimeoutDemoActor {
    type Args = String;
    type Error = anyhow::Error;

    async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        info!("{} actor (id: {}) started", args, actor_ref.identity());
        Ok(Self { name: args })
    }
}
}

Message Types with Different Response Times

#![allow(unused)]
fn main() {
// Fast response message
struct FastQuery(String);

// Slow response message (simulates long processing)
struct SlowQuery(String);

// Configurable delay message
struct ConfigurableQuery {
    question: String,
    delay_ms: u64,
}
}

Message Handlers

#![allow(unused)]
fn main() {
impl Message<FastQuery> for TimeoutDemoActor {
    type Reply = String;

    async fn handle(&mut self, msg: FastQuery, _: &ActorRef<Self>) -> Self::Reply {
        debug!("{} handling a FastQuery: {}", self.name, msg.0);
        // Immediate response
        format!("Fast response to: {}", msg.0)
    }
}

impl Message<SlowQuery> for TimeoutDemoActor {
    type Reply = String;

    async fn handle(&mut self, msg: SlowQuery, _: &ActorRef<Self>) -> Self::Reply {
        debug!("{} handling a SlowQuery: {}", self.name, msg.0);

        // Simulate slow processing
        tokio::time::sleep(Duration::from_secs(3)).await;

        format!("Slow response to: {}", msg.0)
    }
}

impl Message<ConfigurableQuery> for TimeoutDemoActor {
    type Reply = String;

    async fn handle(&mut self, msg: ConfigurableQuery, _: &ActorRef<Self>) -> Self::Reply {
        debug!("{} handling ConfigurableQuery with {}ms delay",
               self.name, msg.delay_ms);

        // Configurable delay
        tokio::time::sleep(Duration::from_millis(msg.delay_ms)).await;

        format!("Response after {}ms to: {}", msg.delay_ms, msg.question)
    }
}

// Wire up message handlers
impl_message_handler!(TimeoutDemoActor, [FastQuery, SlowQuery, ConfigurableQuery]);
}

Timeout Scenarios

1. Successful Fast Operation

#![allow(unused)]
fn main() {
// This should succeed quickly
let fast_result = actor_ref
    .ask_with_timeout(FastQuery("Quick question".to_string()), Duration::from_secs(1))
    .await;

match fast_result {
    Ok(response) => println!("Fast query succeeded: {}", response),
    Err(e) => println!("Fast query failed: {}", e),
}
}

2. Timeout on Slow Operation

#![allow(unused)]
fn main() {
// This will timeout because SlowQuery takes 3 seconds but we only wait 1 second
let slow_result = actor_ref
    .ask_with_timeout(SlowQuery("Slow question".to_string()), Duration::from_secs(1))
    .await;

match slow_result {
    Ok(response) => println!("Slow query succeeded: {}", response),
    Err(e) => println!("Slow query timed out: {}", e), // This will be printed
}
}

3. Configurable Timeout Testing

#![allow(unused)]
fn main() {
async fn test_configurable_timeouts(actor_ref: &ActorRef<TimeoutDemoActor>) -> Result<()> {
    let test_cases = vec![
        (100, 500),  // 100ms delay, 500ms timeout - should succeed
        (800, 500),  // 800ms delay, 500ms timeout - should timeout
        (200, 1000), // 200ms delay, 1000ms timeout - should succeed
    ];

    for (delay_ms, timeout_ms) in test_cases {
        let query = ConfigurableQuery {
            question: format!("Test with {}ms delay", delay_ms),
            delay_ms,
        };

        let result = actor_ref
            .ask_with_timeout(query, Duration::from_millis(timeout_ms))
            .await;

        match result {
            Ok(response) => {
                println!("✅ Success ({}ms delay, {}ms timeout): {}",
                        delay_ms, timeout_ms, response);
            }
            Err(e) => {
                println!("❌ Timeout ({}ms delay, {}ms timeout): {}",
                        delay_ms, timeout_ms, e);
            }
        }
    }

    Ok(())
}
}

Error Handling Patterns

1. Timeout vs Other Errors

#![allow(unused)]
fn main() {
match actor_ref.ask_with_timeout(message, timeout).await {
    Ok(response) => {
        // Handle successful response
        println!("Received: {}", response);
    }
    Err(rsactor::Error::Timeout { identity, duration }) => {
        // Handle timeout specifically
        println!("Actor {} timed out after {:?}", identity, duration);
        // Could implement retry logic, fallback, or circuit breaker
    }
    Err(other_error) => {
        // Handle other errors (send failures, actor crashes, etc.)
        println!("Communication error: {}", other_error);
    }
}
}

2. Retry with Exponential Backoff

#![allow(unused)]
fn main() {
async fn retry_with_backoff<T>(
    actor_ref: &ActorRef<TimeoutDemoActor>,
    message: T,
    max_retries: usize,
) -> Result<String>
where
    T: Clone + Send + 'static,
    TimeoutDemoActor: Message<T, Reply = String>,
{
    let mut delay = Duration::from_millis(100);

    for attempt in 0..max_retries {
        match actor_ref.ask_with_timeout(message.clone(), delay * 2).await {
            Ok(response) => return Ok(response),
            Err(rsactor::Error::Timeout { .. }) if attempt < max_retries - 1 => {
                println!("Attempt {} timed out, retrying...", attempt + 1);
                tokio::time::sleep(delay).await;
                delay *= 2; // Exponential backoff
            }
            Err(e) => return Err(e.into()),
        }
    }

    anyhow::bail!("All retry attempts failed")
}
}

3. Circuit Breaker Pattern

#![allow(unused)]
fn main() {
struct CircuitBreaker {
    failure_count: usize,
    failure_threshold: usize,
    last_failure: Option<std::time::Instant>,
    reset_timeout: Duration,
}

impl CircuitBreaker {
    fn new(failure_threshold: usize, reset_timeout: Duration) -> Self {
        Self {
            failure_count: 0,
            failure_threshold,
            last_failure: None,
            reset_timeout,
        }
    }

    fn should_attempt(&self) -> bool {
        if self.failure_count < self.failure_threshold {
            return true;
        }

        if let Some(last_failure) = self.last_failure {
            last_failure.elapsed() > self.reset_timeout
        } else {
            true
        }
    }

    fn on_success(&mut self) {
        self.failure_count = 0;
        self.last_failure = None;
    }

    fn on_failure(&mut self) {
        self.failure_count += 1;
        self.last_failure = Some(std::time::Instant::now());
    }
}
}

Best Practices

1. Choose Appropriate Timeouts

  • Fast operations: 100ms - 1s
  • Network operations: 5s - 30s
  • Database queries: 1s - 10s
  • Heavy computations: 30s - 5min

2. Graceful Degradation

#![allow(unused)]
fn main() {
async fn get_user_data_with_fallback(
    actor_ref: &ActorRef<UserActor>,
    user_id: u64,
) -> UserData {
    match actor_ref
        .ask_with_timeout(GetUser(user_id), Duration::from_secs(2))
        .await
    {
        Ok(user_data) => user_data,
        Err(_) => {
            // Return cached or default data
            UserData::default_for_user(user_id)
        }
    }
}
}

3. Monitoring and Metrics

#![allow(unused)]
fn main() {
async fn monitored_ask<T, R>(
    actor_ref: &ActorRef<T>,
    message: impl Send + 'static,
    timeout: Duration,
    operation_name: &str,
) -> Result<R>
where
    T: Actor + Message<impl Send + 'static, Reply = R>,
{
    let start = std::time::Instant::now();

    let result = actor_ref.ask_with_timeout(message, timeout).await;

    let duration = start.elapsed();

    match &result {
        Ok(_) => {
            metrics::histogram!("actor_request_duration", duration, "operation" => operation_name, "status" => "success");
        }
        Err(rsactor::Error::Timeout { .. }) => {
            metrics::histogram!("actor_request_duration", duration, "operation" => operation_name, "status" => "timeout");
        }
        Err(_) => {
            metrics::histogram!("actor_request_duration", duration, "operation" => operation_name, "status" => "error");
        }
    }

    result.map_err(Into::into)
}
}

Running the Example

cargo run --example actor_with_timeout

You’ll see output demonstrating:

  • Fast queries completing successfully
  • Slow queries timing out
  • Different timeout scenarios
  • Error handling patterns

When to Use Timeouts

  • User-facing operations that need responsive feedback
  • Dependent services that might become unavailable
  • Resource-intensive operations that could hang
  • Network communications subject to delays
  • Any operation where infinite waiting is unacceptable

Proper timeout handling is essential for building robust, responsive actor systems that gracefully handle real-world operational challenges.

Kill Demo

This example demonstrates the difference between graceful stop and immediate kill, and how to inspect ActorResult after termination.

Key Concepts

  • stop(): Graceful shutdown — allows current message processing to finish, then calls on_stop(killed: false)
  • kill(): Immediate termination — calls on_stop(killed: true) without waiting for pending messages
  • ActorResult: Inspect how the actor terminated (completed vs failed, stopped vs killed)

Code Walkthrough

Actor with on_stop

#![allow(unused)]
fn main() {
use rsactor::{message_handlers, Actor, ActorRef, ActorWeak};

#[derive(Debug)]
struct DemoActor {
    name: String,
}

impl Actor for DemoActor {
    type Args = String;
    type Error = anyhow::Error;

    async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        tracing::info!("DemoActor '{}' (id: {}) started!", args, actor_ref.identity());
        Ok(DemoActor { name: args })
    }

    async fn on_stop(&mut self, actor_ref: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
        let status = if killed { "killed" } else { "stopped" };
        tracing::info!("DemoActor '{}' (id: {}) {}", self.name, actor_ref.identity(), status);
        Ok(())
    }
}
}

Inspecting ActorResult

#![allow(unused)]
fn main() {
let (actor_ref, join_handle) = rsactor::spawn::<DemoActor>("TestActor".to_string());

// Kill the actor
actor_ref.kill()?;

// Inspect the result
match join_handle.await? {
    rsactor::ActorResult::Completed { actor, killed } => {
        println!("Actor '{}' completed. Killed: {}", actor.name, killed);
    }
    rsactor::ActorResult::Failed { error, phase, killed, .. } => {
        println!("Actor failed: {}. Phase: {:?}, Killed: {}", error, phase, killed);
    }
}
}

Running

cargo run --example kill_demo

Ask Join Demo

This example demonstrates the ask_join pattern where message handlers spawn long-running tasks and return JoinHandle<T>, while callers use ask_join to automatically await task completion.

Key Concepts

  • ask_join: Sends a message, receives a JoinHandle<T>, and automatically awaits it
  • Non-blocking handlers: Handlers spawn tokio::spawn tasks instead of blocking the actor
  • Concurrency: The actor remains responsive while spawned tasks run in the background

Code Walkthrough

Handler returning JoinHandle

#![allow(unused)]
fn main() {
#[derive(Actor)]
struct WorkerActor {
    task_counter: u32,
}

struct HeavyComputationTask {
    id: u32,
    duration_secs: u64,
    multiplier: u32,
}

#[message_handlers]
impl WorkerActor {
    #[handler]
    async fn handle_heavy_computation(
        &mut self,
        msg: HeavyComputationTask,
        _: &ActorRef<Self>,
    ) -> JoinHandle<u64> {
        self.task_counter += 1;
        let task_id = msg.id;
        let multiplier = msg.multiplier;
        let duration = Duration::from_secs(msg.duration_secs);

        // Spawn task — actor remains free to process other messages
        tokio::spawn(async move {
            tokio::time::sleep(duration).await;
            (task_id as u64) * (multiplier as u64)
        })
    }
}
}

ask vs ask_join

#![allow(unused)]
fn main() {
// ask() returns JoinHandle — you await it manually
let join_handle: JoinHandle<u64> = worker_ref.ask(task).await?;
let result = join_handle.await?;

// ask_join() does both steps automatically
let result: u64 = worker_ref.ask_join(task).await?;
}

Error Handling

ask_join returns Error::Join when the spawned task panics:

#![allow(unused)]
fn main() {
match worker_ref.ask_join(PanicTask).await {
    Ok(result) => println!("Success: {}", result),
    Err(rsactor::Error::Join { identity, source }) => {
        println!("Task panicked on actor {}: {}", identity, source);
    }
    Err(e) => println!("Other error: {}", e),
}
}

Running

cargo run --example ask_join_demo

Handler Demo

This example demonstrates how to use handler traits (TellHandler, AskHandler, WeakTellHandler, WeakAskHandler) to manage different actor types that handle the same message through a unified interface.

Key Concepts

  • TellHandler<M>: Type-erased fire-and-forget for any actor handling message M
  • AskHandler<M, R>: Type-erased request-response for any actor returning R for message M
  • WeakTellHandler<M> / WeakAskHandler<M, R>: Weak reference variants that don’t keep actors alive
  • Heterogeneous collections: Store different actor types in a single Vec

Code Walkthrough

Different actors, same messages

#![allow(unused)]
fn main() {
#[derive(Actor)]
struct CounterActor { name: String, count: u32 }

#[derive(Actor)]
struct LoggerActor { prefix: String, log_count: u32 }

// Both actors handle the same Ping and GetStatus messages
#[message_handlers]
impl CounterActor {
    #[handler]
    async fn handle_ping(&mut self, msg: Ping, _: &ActorRef<Self>) { ... }
    #[handler]
    async fn handle_get_status(&mut self, _: GetStatus, _: &ActorRef<Self>) -> Status { ... }
}

#[message_handlers]
impl LoggerActor {
    #[handler]
    async fn handle_ping(&mut self, msg: Ping, _: &ActorRef<Self>) { ... }
    #[handler]
    async fn handle_get_status(&mut self, _: GetStatus, _: &ActorRef<Self>) -> Status { ... }
}
}

TellHandler — fire-and-forget

#![allow(unused)]
fn main() {
let tell_handlers: Vec<Box<dyn TellHandler<Ping>>> = vec![
    (&counter_actor).into(),
    (&logger_actor).into(),
];

for handler in &tell_handlers {
    handler.tell(Ping { timestamp: 1000 }).await?;
}
}

AskHandler — request-response

#![allow(unused)]
fn main() {
let ask_handlers: Vec<Box<dyn AskHandler<GetStatus, Status>>> = vec![
    (&counter_actor).into(),
    (&logger_actor).into(),
];

for handler in &ask_handlers {
    let status = handler.ask(GetStatus).await?;
    println!("{}: {} messages", status.name, status.message_count);
}
}

WeakTellHandler — upgrade pattern

#![allow(unused)]
fn main() {
let weak_handlers: Vec<Box<dyn WeakTellHandler<Ping>>> = vec![
    ActorRef::downgrade(&counter_actor).into(),
    ActorRef::downgrade(&logger_actor).into(),
];

for weak in &weak_handlers {
    if let Some(strong) = weak.upgrade() {
        strong.tell(Ping { timestamp: 3000 }).await?;
    }
}
}

Downgrade strong to weak

#![allow(unused)]
fn main() {
let strong: Box<dyn TellHandler<Ping>> = (&counter_actor).into();
let weak: Box<dyn WeakTellHandler<Ping>> = strong.downgrade();
}

Running

cargo run --example handler_demo

Weak Reference Demo

This example demonstrates ActorWeak for weak references to actors — references that don’t prevent the actor from being dropped.

Key Concepts

  • ActorRef::downgrade(): Create a weak reference from a strong reference
  • weak_ref.upgrade(): Try to get a strong reference back (returns None if actor is dead)
  • weak_ref.is_alive(): Heuristic check if the actor might still be alive
  • weak_ref.identity(): Always available, even after the actor is dropped

Code Walkthrough

Creating and using weak references

#![allow(unused)]
fn main() {
let (actor_ref, join_handle) = spawn::<PingActor>("TestActor".to_string());

// Create a weak reference
let weak_ref = ActorRef::downgrade(&actor_ref);
println!("Weak ref is_alive: {}", weak_ref.is_alive());

// Upgrade and use
if let Some(strong_ref) = weak_ref.upgrade() {
    let response: String = strong_ref.ask(Ping).await?;
    println!("Response: {response}");
}
}

Behavior after dropping strong reference

#![allow(unused)]
fn main() {
drop(actor_ref);
tokio::time::sleep(Duration::from_millis(100)).await;

// Weak reference reflects the actor's state
println!("is_alive: {}", weak_ref.is_alive());

if let Some(strong) = weak_ref.upgrade() {
    // Actor might still be running if other strong refs exist
    let _ = strong.ask(Ping).await;
} else {
    println!("Actor is no longer available");
}
}

Identity survives actor death

#![allow(unused)]
fn main() {
// Identity is always available, even after actor termination
println!("Actor ID: {}", weak_ref.identity());
}

Running

cargo run --example weak_reference_demo

Metrics Demo

This example demonstrates the per-actor metrics system for monitoring performance. Requires the metrics feature flag.

Key Concepts

  • Lock-free counters: Metrics use AtomicU64 for zero-contention updates
  • Zero overhead when disabled: No metrics code compiles without the feature flag
  • Automatic collection: Metrics are gathered transparently during message processing
  • Snapshot API: Get all metrics at once or query individual values

Code Walkthrough

Getting a metrics snapshot

#![allow(unused)]
fn main() {
let (actor_ref, handle) = spawn::<WorkerActor>(actor);

// Send some messages
for _ in 0..10 {
    actor_ref.tell(FastTask).await?;
}

// Get all metrics at once
let metrics = actor_ref.metrics();
println!("Message count: {}", metrics.message_count);
println!("Avg processing time: {:?}", metrics.avg_processing_time);
println!("Max processing time: {:?}", metrics.max_processing_time);
println!("Error count: {}", metrics.error_count);
println!("Uptime: {:?}", metrics.uptime);
println!("Last activity: {:?}", metrics.last_activity);
}

Individual accessors

#![allow(unused)]
fn main() {
let count = actor_ref.message_count();
let avg = actor_ref.avg_processing_time();
let max = actor_ref.max_processing_time();
let errors = actor_ref.error_count();
let uptime = actor_ref.uptime();
let last = actor_ref.last_activity();
}

Running

cargo run --example metrics_demo --features metrics

Without the metrics feature, the example still runs but skips metrics output.

Tracing Demo

This example demonstrates rsActor’s structured tracing and observability features.

Key Concepts

  • Core logging: Always available via tracing::warn!, tracing::error!, etc.
  • Instrumentation spans (opt-in): #[tracing::instrument] attributes for timing and context
  • Structured fields: Actor IDs, message types, processing duration in every span

Code Walkthrough

Setup

#![allow(unused)]
fn main() {
tracing_subscriber::fmt()
    .with_max_level(tracing::Level::DEBUG)
    .with_target(false)
    .init();
}

What gets traced

The example exercises multiple traced operations:

#![allow(unused)]
fn main() {
// ask — creates actor_ask span with actor_id and message_type fields
let response: String = actor_ref.ask(Ping).await?;

// tell — creates actor_tell span
actor_ref.tell(Increment).await?;

// ask_with_timeout — creates actor_ask_with_timeout span with timeout_ms
actor_ref.ask_with_timeout(SlowOperation(200), Duration::from_millis(150)).await;

// tell_with_timeout — creates actor_tell_with_timeout span
actor_ref.tell_with_timeout(Increment, Duration::from_millis(100)).await?;

// stop — creates actor_stop span
actor_ref.stop().await?;
}

Sample output

With RUST_LOG=debug:

DEBUG actor_lifecycle{actor_id=1 actor_type="TracingDemoActor"}: rsactor: Actor started
DEBUG actor_ask{actor_id=1 message_type="Ping" reply_type="String"}: rsactor: ask
DEBUG actor_process_message: rsactor: Processing message
DEBUG actor_stop{actor_id=1}: rsactor: stop
DEBUG actor_on_stop{killed=false}: rsactor: on_stop

Running

# With instrumentation spans
RUST_LOG=debug cargo run --example tracing_demo --features tracing

# Without instrumentation (core logging only)
RUST_LOG=debug cargo run --example tracing_demo

Dining Philosophers

The Dining Philosophers problem is a classic concurrency challenge that demonstrates resource sharing, deadlock prevention, and coordination between multiple actors. This example shows how rsActor can elegantly solve this problem using the actor model.

Problem Description

Five philosophers sit around a circular table with five forks. Each philosopher alternates between thinking and eating. To eat, a philosopher needs both the fork to their left and the fork to their right. The challenge is to design a solution that:

  1. Prevents deadlock (all philosophers waiting forever)
  2. Prevents starvation (ensuring each philosopher gets to eat)
  3. Maximizes concurrency (allows multiple philosophers to eat simultaneously when possible)

Solution Architecture

Our solution uses two types of actors:

1. Table Actor

  • Role: Centralized resource manager for forks
  • Responsibilities:
    • Track which forks are available
    • Grant/deny fork requests
    • Handle fork releases
    • Maintain philosopher registry

2. Philosopher Actors

  • Role: Individual philosophers with their own thinking/eating logic
  • Responsibilities:
    • Think for random periods
    • Request forks from the table when hungry
    • Eat when both forks are acquired
    • Release forks after eating
    • Repeat the cycle

Key Components

Messages

#![allow(unused)]
fn main() {
// Fork management
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ForkSide {
    Left,
    Right,
}

// Messages to Table Actor
struct RegisterPhilosopher {
    logical_id: usize,
    philosopher_ref: ActorRef<Philosopher>,
}

struct RequestFork {
    logical_id: usize,
    side: ForkSide,
}

struct ReleaseFork {
    logical_id: usize,
    side: ForkSide,
}

// Messages to Philosopher Actor
struct StartThinking;
struct StartEating;
}

Philosopher Actor

#![allow(unused)]
fn main() {
#[derive(Debug)]
struct Philosopher {
    id: usize,
    name: String,
    table_ref: ActorRef<Table>,
    eat_count: u32,
    has_left_fork: bool,
    has_right_fork: bool,
}

impl Actor for Philosopher {
    type Args = (usize, String, ActorRef<Table>);
    type Error = anyhow::Error;

    async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        let (id, name, table_ref) = args;
        println!("Philosopher {} ({}) is joining the table.", id, name);

        let philosopher = Self {
            id, name: name.clone(), table_ref: table_ref.clone(),
            eat_count: 0, has_left_fork: false, has_right_fork: false,
        };

        // Register with the table
        table_ref.tell(RegisterPhilosopher {
            logical_id: id,
            philosopher_ref: actor_ref.clone(),
        }).await?;

        // Start thinking
        actor_ref.tell(StartThinking).await?;
        Ok(philosopher)
    }
}
}

Table Actor

#![allow(unused)]
fn main() {
#[derive(Debug)]
struct Table {
    philosophers: HashMap<usize, ActorRef<Philosopher>>,
    fork_availability: Vec<bool>, // true = available, false = taken
}

impl Actor for Table {
    type Args = usize; // Number of philosophers
    type Error = anyhow::Error;

    async fn on_start(num_philosophers: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(Self {
            philosophers: HashMap::new(),
            fork_availability: vec![true; num_philosophers],
        })
    }
}
}

Deadlock Prevention Strategy

The solution prevents deadlock through:

  1. Centralized Fork Management: The Table actor manages all forks, preventing race conditions
  2. Sequential Fork Acquisition: Philosophers request one fork at a time
  3. Fork Release on Failure: If a philosopher can’t get both forks, they release any held fork
  4. Random Timing: Randomized thinking/eating times reduce contention patterns

Key Implementation Details

Fork Request Logic

#![allow(unused)]
fn main() {
impl Message<RequestFork> for Table {
    type Reply = bool; // true if fork granted, false if unavailable

    async fn handle(&mut self, msg: RequestFork, _: &ActorRef<Self>) -> Self::Reply {
        let fork_index = match msg.side {
            ForkSide::Left => msg.logical_id,
            ForkSide::Right => (msg.logical_id + 1) % self.fork_availability.len(),
        };

        if self.fork_availability[fork_index] {
            self.fork_availability[fork_index] = false;
            println!("Table: Fork {} granted to Philosopher {}", fork_index, msg.logical_id);
            true
        } else {
            println!("Table: Fork {} denied to Philosopher {} (in use)", fork_index, msg.logical_id);
            false
        }
    }
}
}

Philosopher State Machine

  1. Thinking State: Random duration, then try to acquire forks
  2. Fork Acquisition:
    • Request left fork first
    • If successful, request right fork
    • If both acquired, start eating
    • If either fails, release held forks and return to thinking
  3. Eating State: Random duration, then release both forks
  4. Repeat: Return to thinking state

Benefits of the Actor Model Solution

1. Encapsulation

  • Each philosopher manages its own state independently
  • The table encapsulates fork management logic
  • No shared mutable state between actors

2. Message-Based Coordination

  • All communication happens through well-defined messages
  • No need for locks, mutexes, or other synchronization primitives
  • Clear separation of concerns

3. Fault Tolerance

  • Actor isolation means one philosopher’s failure doesn’t affect others
  • The table actor can detect and handle philosopher failures
  • System can continue operating with fewer philosophers

4. Scalability

  • Easy to add more philosophers by spawning new actors
  • Table actor can handle arbitrary numbers of philosophers
  • Natural load distribution across async tasks

Running the Example

cargo run --example dining_philosophers

You’ll see output like:

Philosopher 0 (Socrates) is thinking.
Philosopher 1 (Plato) attempts to acquire Left fork.
Table: Fork 1 granted to Philosopher 1
Philosopher 1 (Plato) acquired Left fork. Attempting Right fork.
Table: Fork 2 granted to Philosopher 1
Philosopher 1 (Plato) acquired both forks and is eating.
...

Learning Outcomes

This example demonstrates:

  • Actor coordination patterns for resource management
  • Deadlock prevention strategies in distributed systems
  • Message-driven state machines for complex logic
  • Centralized vs. distributed resource management approaches
  • Practical concurrency problem-solving with actors

The Dining Philosophers problem showcases how the actor model can elegantly solve complex concurrency challenges while maintaining code clarity and system reliability.

Frequently Asked Questions

This FAQ provides answers to common questions about the rsActor framework, covering basic concepts, practical usage patterns, and advanced scenarios.

Getting Started

What is rsActor?

rsActor is a lightweight, async-first Rust actor framework that provides a simple and type-safe way to build concurrent applications using the actor model. It leverages Tokio for async runtime and provides both compile-time and runtime type safety for message passing.

How do I create my first actor?

To create an actor, you need to implement the Actor trait and define message handlers:

use rsactor::{Actor, ActorRef, message_handlers, spawn};

// Define your actor struct
#[derive(Debug)]
struct CounterActor {
    count: u32,
}

// Implement the Actor trait
impl Actor for CounterActor {
    type Args = u32; // Initial count value
    type Error = anyhow::Error;

    async fn on_start(initial_count: Self::Args, _actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(CounterActor { count: initial_count })
    }
}

// Define a message
#[derive(Debug)]
struct Increment(u32);

// Use message_handlers macro with handler attributes
#[message_handlers]
impl CounterActor {
    #[handler]
    async fn handle_increment(&mut self, msg: Increment, _: &ActorRef<Self>) -> u32 {
        self.count += msg.0;
        self.count
    }
}

// Usage
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (actor_ref, _handle) = spawn::<CounterActor>(0);
    let new_count = actor_ref.ask(Increment(5)).await?;
    println!("Count: {}", new_count); // Prints: Count: 5
    Ok(())
}

What is the difference between ask and tell?

  • ask: Sends a message and waits for a response. Returns the reply value.
  • tell: Sends a message without waiting for a response. Fire-and-forget style.
#![allow(unused)]
fn main() {
// Ask - waits for response
let result = actor_ref.ask(GetValue).await?;

// Tell - no response expected
actor_ref.tell(LogMessage("Hello".to_string())).await?;
}

How do I handle actor failures?

Actors can fail during their lifecycle. The JoinHandle returns an ActorResult that indicates success or failure:

#![allow(unused)]
fn main() {
let (actor_ref, handle) = spawn::<MyActor>(args);

match handle.await {
    Ok(ActorResult::Completed { actor, killed }) => {
        println!("Actor completed successfully");
    }
    Ok(ActorResult::Failed { error, phase, .. }) => {
        println!("Actor failed during {:?}: {}", phase, error);
    }
    Err(join_error) => {
        println!("Actor panicked: {}", join_error);
    }
}
}

Actor System

How do I spawn actors?

Use the spawn function to create and start actors:

#![allow(unused)]
fn main() {
use rsactor::spawn;

// Spawn with default mailbox size
let (actor_ref, handle) = spawn::<MyActor>(actor_args);

// Spawn with custom mailbox size
let (actor_ref, handle) = spawn_with_mailbox_capacity::<MyActor>(actor_args, 1000);
}

How do I stop an actor?

Actors can be stopped gracefully or killed immediately:

#![allow(unused)]
fn main() {
// Graceful stop - allows ongoing operations to complete
actor_ref.stop().await?;

// Immediate kill - stops the actor immediately
actor_ref.kill()?;
}

Can I send messages to stopped actors?

No, sending messages to stopped actors will return an Error::Send. You can check if an actor is alive, or handle the potential error:

#![allow(unused)]
fn main() {
if actor_ref.is_alive() {
    actor_ref.tell(msg).await?;
}
}

Message Handling

How do I define message types?

Messages are regular Rust structs:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone)]
struct CreateUser {
    name: String,
    email: String,
}

#[derive(Debug)]
struct GetUser { id: u64 }

#[derive(Debug)]
struct DeleteUser { id: u64 }
}

How do I implement message handlers?

Use the #[message_handlers] macro with #[handler] attributes:

#![allow(unused)]
fn main() {
#[message_handlers]
impl UserManagerActor {
    #[handler]
    async fn handle_create_user(&mut self, msg: CreateUser, _: &ActorRef<Self>) -> Result<User, UserError> {
        let user = User { id: self.next_id(), name: msg.name, email: msg.email };
        self.users.insert(user.id, user.clone());
        Ok(user)
    }

    #[handler]
    async fn handle_get_user(&mut self, msg: GetUser, _: &ActorRef<Self>) -> Option<User> {
        self.users.get(&msg.id).cloned()
    }

    #[handler]
    async fn handle_delete_user(&mut self, msg: DeleteUser, _: &ActorRef<Self>) -> bool {
        self.users.remove(&msg.id).is_some()
    }
}
}

What happens if I forget to add a handler?

If you forget to add a #[handler] attribute to a method that should handle messages, you’ll get a compile-time error when trying to send that message type to the actor.

Actor Lifecycle

What is the lifecycle of an actor?

Actors follow a three-stage lifecycle:

  1. Creation and Initialization:

    • Actor is spawned with spawn::<Actor>(args)
    • Framework calls on_start(args, &ActorRef<Self>) to create the actor instance
    • If successful, actor enters the running state
  2. Running:

    • Framework repeatedly calls on_run(&mut self, &ActorWeak<Self>) for continuous processing
    • Actor concurrently processes messages from its mailbox
    • Continues until stopped, killed, or error occurs
  3. Termination:

    • Framework calls on_stop(&mut self, &ActorWeak<Self>, killed: bool)
    • Actor is destroyed and JoinHandle resolves with ActorResult

What are the lifecycle methods?

#![allow(unused)]
fn main() {
impl Actor for MyActor {
    type Args = MyArgs;
    type Error = MyError;

    // Called during initialization — creates and returns the actor instance
    async fn on_start(args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(MyActor::new(args))
    }

    // Called repeatedly when idle — returns Ok(true) to continue, Ok(false) to stop idle processing
    async fn on_run(&mut self, actor_weak: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        // Main processing logic
        Ok(false)
    }

    // Called during termination — cleanup and finalization
    async fn on_stop(&mut self, actor_weak: &ActorWeak<Self>, killed: bool) -> Result<(), Self::Error> {
        // Cleanup logic
        Ok(())
    }
}
}

Note: on_run and on_stop receive &ActorWeak<Self> (not &ActorRef<Self>) to prevent the actor from holding a strong reference to itself.

What is the ActorResult enum?

ActorResult represents the outcome of an actor’s lifecycle:

  • ActorResult::Completed { actor, killed }: Actor completed successfully
  • ActorResult::Failed { actor, error, phase, killed }: Actor failed with error details including the failure phase (OnStart, OnRun, or OnStop)

Error Handling

How do I handle errors in actors?

Error handling occurs at multiple levels:

#![allow(unused)]
fn main() {
impl Actor for MyActor {
    type Error = MyError;

    async fn on_start(args: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        // Return Err to prevent actor creation
        Ok(MyActor::new(args)?)
    }

    async fn on_run(&mut self, _: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        // Return Err to terminate actor
        self.do_work()?;
        Ok(false)
    }
}

// Message handling errors
#[message_handlers]
impl MyActor {
    #[handler]
    async fn handle_process(&mut self, msg: ProcessData, _: &ActorRef<Self>) -> Result<Output, ProcessingError> {
        self.process(msg.data)
    }
}
}

How do I observe tell() errors?

Use the on_tell_result hook. When using #[handler], the macro auto-generates a default implementation that logs errors via tracing::warn!. Use #[handler(no_log)] to suppress this.

For manual Message<T> implementations, override on_tell_result:

#![allow(unused)]
fn main() {
impl Message<Command> for MyActor {
    type Reply = Result<(), CommandError>;

    async fn handle(&mut self, msg: Command, _: &ActorRef<Self>) -> Self::Reply {
        self.execute(msg)
    }

    fn on_tell_result(result: &Self::Reply, _actor_ref: &ActorRef<Self>) {
        if let Err(e) = result {
            tracing::error!("Command failed: {e:?}");
        }
    }
}
}

Can I use custom error types?

Yes, any error type that implements Send + Debug + 'static can be used:

#![allow(unused)]
fn main() {
#[derive(Debug, thiserror::Error)]
enum MyActorError {
    #[error("Database error: {0}")]
    DbError(#[from] sqlx::Error),

    #[error("Network timeout")]
    NetworkTimeout,
}

impl Actor for MyActor {
    type Error = MyActorError;
    // ...
}
}

Advanced Usage

Can I use rsActor with blocking code?

Yes, use tokio::task::spawn_blocking for blocking operations within handlers. For sending messages from blocking contexts, use blocking_ask and blocking_tell:

#![allow(unused)]
fn main() {
tokio::task::spawn_blocking(move || {
    // blocking_ask replaces the deprecated ask_blocking (since v0.10.0)
    let result = actor_ref.blocking_ask::<Query, QueryResult>(
        Query { id: 123 },
        Some(Duration::from_secs(5))
    );

    // blocking_tell replaces the deprecated tell_blocking (since v0.10.0)
    actor_ref.blocking_tell(LogEvent("done".into()), None);
});
}

How do I test actors?

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_counter_actor() {
    let (actor_ref, _handle) = spawn::<CounterActor>(0);
    let result = actor_ref.ask(Increment(5)).await.unwrap();
    assert_eq!(result, 5);
}
}

For dead letter testing, enable the test-utils feature:

#![allow(unused)]
fn main() {
use rsactor::{dead_letter_count, reset_dead_letter_count};

#[tokio::test]
async fn test_dead_letters() {
    reset_dead_letter_count();
    let (actor_ref, handle) = spawn::<MyActor>(args);
    actor_ref.stop().await.unwrap();
    handle.await.unwrap();

    let _ = actor_ref.tell(MyMessage).await;
    assert_eq!(dead_letter_count(), 1);
}
}

How do I implement actor supervision?

While rsActor doesn’t have built-in supervision, you can implement supervision patterns:

#![allow(unused)]
fn main() {
let (child_ref, child_handle) = spawn::<ChildActor>(args);

tokio::spawn(async move {
    match child_handle.await {
        Ok(ActorResult::Completed { .. }) => { /* normal */ }
        Ok(ActorResult::Failed { error, .. }) => {
            // Restart the child
            let (_new_ref, _) = spawn::<ChildActor>(args);
        }
        Err(_) => { /* panicked */ }
    }
});
}

How do I implement periodic tasks?

Use the on_run method with Tokio’s time utilities:

#![allow(unused)]
fn main() {
struct PeriodicActor {
    interval: tokio::time::Interval,
}

impl Actor for PeriodicActor {
    type Args = Duration;
    type Error = anyhow::Error;

    async fn on_start(period: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        let mut interval = tokio::time::interval(period);
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        Ok(Self { interval })
    }

    async fn on_run(&mut self, _: &ActorWeak<Self>) -> Result<bool, Self::Error> {
        self.interval.tick().await;
        self.perform_periodic_work();
        Ok(false)
    }
}
}

How do I use handler traits for heterogeneous collections?

Handler traits enable type-erased message sending to different actor types:

#![allow(unused)]
fn main() {
use rsactor::{TellHandler, AskHandler};

// Store different actors that handle the same message type
let handlers: Vec<Box<dyn TellHandler<Ping>>> = vec![
    (&counter_ref).into(),
    (&logger_ref).into(),
];

// Send to all
for handler in &handlers {
    handler.tell(Ping).await?;
}
}

See Handler Traits for details.

How do I manage actors of different types?

Use ActorControl for type-erased lifecycle management:

#![allow(unused)]
fn main() {
use rsactor::ActorControl;

let controls: Vec<Box<dyn ActorControl>> = vec![
    (&worker_ref).into(),
    (&logger_ref).into(),
];

// Stop all actors
for control in &controls {
    control.stop().await?;
}
}

See Actor Control for details.

Can I use generics with actors?

Yes, the #[message_handlers] macro supports generic actors:

#![allow(unused)]
fn main() {
#[derive(Debug)]
struct GenericActor<T: Send + Debug + Clone + 'static> {
    data: Option<T>,
}

impl<T: Send + Debug + Clone + 'static> Actor for GenericActor<T> {
    type Args = Option<T>;
    type Error = anyhow::Error;

    async fn on_start(args: Self::Args, _: &ActorRef<Self>) -> Result<Self, Self::Error> {
        Ok(GenericActor { data: args })
    }
}

#[message_handlers]
impl<T: Send + Debug + Clone + 'static> GenericActor<T> {
    #[handler]
    async fn handle_set_value(&mut self, msg: SetValue<T>, _: &ActorRef<Self>) {
        self.data = Some(msg.0);
    }

    #[handler]
    async fn handle_get_value(&mut self, _: GetValue, _: &ActorRef<Self>) -> Option<T> {
        self.data.clone()
    }
}
}

How does rsActor handle type safety?

rsActor provides comprehensive type safety:

  1. ActorRef<T>: Compile-time type safety — only allows sending messages the actor can handle
  2. Handler traits: Type-erased at runtime while maintaining message type safety
  3. Automatic inference: Return types are inferred from handler signatures
#![allow(unused)]
fn main() {
// Compile-time safety
let count: u32 = actor_ref.ask(IncrementMsg(5)).await?; // Type-safe
}

For more detailed information, refer to the complete documentation and examples.