Writing your own Rust-Based Actor Model: Supervision

Actors in Tokio over Actix - Supervision

One of the interesting features of the Actor model is how errors are propagated orthogonally to how code is run.

In normal Rust programs errors are managed with Result<T,E> so errors are always manged and in general the caller will handle the error.

graph TD
  Caller["Caller (recovers)"] -.-> Callee
  style Callee fill: #FF3333 
  style Caller stroke-dasharray: 5 5, stroke: #FF3333, stroke-width: 4px

In contrast the Actor Model manages errors using a supervision heirarchy, where parent actors supervise child actors and manage restarts and escalation

graph TD
  Parent["Parent (recovers)"] --> Child1
  Parent --> Child2
  Caller -.-> Child2
style Child2 fill: #FF3333 
style Parent stroke-dasharray: 5 5, stroke: #FF3333, stroke-width: 4px

In the last article we put together a simple Actor Model framework in Rust that ergonomically manages sending and handling messages between addresses and actors using tokio channels.

Here I want to look at adding supervision and restarts to manage panics within actors.

I want to start by making a kind of shopping list as to what devices and programming constructs in Rust we are going to need to implement our supervision behaviour.

Supervision: "Let it crash"

Rust handles in process errors pretty well using the Result<T,E> enum there is little point in looking to manage non exceptional invariants with our actor system. Unexpected Actor failure however is a different issue therefore we are mainly concerned with things that are unlikely or out of our control that cause panics. There is a concept in reactive / actor Model programming called "let it crash".

"Prefer a full component restart to internal failure handling." - Reactive Design Patterns

The idea here is that if an actor errors unexpectedly we should simply let the process it crash and then get another process to restart it.

This should have an influence on how we design our systems in that if data is critical it should be persisted.

I want to keep this easy to reason about and limit the communication between parent and child actors so in our system we can assume that on crash we will automatically restart the child until it hits a threshold of restarts in a certain window at which point the child will escalate to the parent to handle the error.

For example:

child panic
child restart
child panic
child restart
child panic
child restart
escalating to parent
parent panic
parent restart
new child created

Now we can add the first things to our shopping list.

For our list we will need to...

  1. Catch panics within processing our messages
  2. Create a loop around our existing process loop that will recreate the actor hooking it back up to the mailbox

Context: Managing Heirarchy and Utilities

To form a heirarchy we will need a place to store the tree of registered nodes. We will want these to be managed so that they can be easily stopped if required as well as expose to allow an actor's children to all be stopped.

Secondly, within processing the actor has no way to know its own address which is sometimes usefull when working with actors.

For our list we will need to...

  1. Provide a context Ctx<A> to our handlers for:
    • Managing child trees
    • Exposing the actor's address
    • Actor control: stop, wait, stop_all_children() etc.

Stopping actors

We have Actors that form trees through context Ctx<A> but now when a parent node stops or panics all it's children should also stop.

This way when the actor is restarted it has the option of recreating the children that it was managing.

When we tell an actor to stop we need it to tell us that it has stopped so we can do things like gracefull shutdown.

For our list we will need to...

  1. Asynchronously signal all children to stop when a parent has stopped.
  2. Inform listeners that our main run loop has ended.
  3. Listen for stop signals and process them before anything else.

Escalation

We need a way for children to signal their parents that they have restarted more times than the threshold and to escalate so that the parent can determine what to do.

For our list we will need...

  1. An escalation channel to tell the parent a child has died

Lifecycle methods

We will need some lifecycle methods to allow developers to hook into these stages of an actor's lifescycle.

We want to do operations when an actor has started such as get the latest state from the database or network.

We also want to do cleanup operations if an actor is stopped such as make sure data is saved.

We want to do operations when restarted such as manage exponential backoff if required.

For our list we will need

  1. These callbacks:
    • started()
    • stopped()
    • restarted()

Our completed feature list

So all together we need the following pieces. Here I will add the Rust based solutions we can use to the right in this table:

ItemSolution
1. Catch panic!() during processingAssertUnwindSafe and the futures::FutureExt extension for handling async panics
2. Loop around the outside of our while loopRust loop {}
3. Provide a context to our handlerssetup the Ctx<A> object at the top of our run loop and pass it to everything in order to control the actor
4. Asynchronously signal all children to stop when a parent has stoppedUse Ctx<A> API iterate through all children and wait for them to stop
5. A mechanism to inform Ctx<A> and Addr<A> that our run loop has stoppedUse tokio_util::CancelationToken with drop_guard() that falls out of scope at the end of our control loop
6. Listening for stop signals and process them firstUse tokio::select! macro with biased; keyword and interact with CancelationToken
7. An escalation channel for children to trigger their parentsUse tokio::sync::mpsc::unbounded_channel
8. Lifecycle callbacksCall during the main run loop

Let's get building!

Let's now look at implementing each one of these pieces.

Prepare the process loop

Previously we had a processing loop that looked a little like this:

fn start(self) -> Addr<Self> {
    let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<Self>>();
    tokio::spawn(async move {
        let mut this = self;
        while let Some(mut msg) = rx.recv().await {
            msg.process(&mut this).await;
        }
    });
    Addr { tx }
}

Much of the complexity in setting up an Actor Model Framework comes about from preparing supervision in this loop. This will get much larger so to prepare we should consider extracting this loop to be a standalone function also we will need to start actors in both the Actor::start() method as well as Ctx::spawn().

Let's extract the loop:

#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
    fn start(self) -> Addr<Self> {
        // use Option as a workaround to transfer ownership
        let mut once = Some(self);
        start_actor(
            move || once.take().expect("Factory can only be accessed once!"),
        )
    }
}
 
fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
    A: Actor + Sync,
    F: FnMut() -> A + Send + 'static,
{
    let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
    tokio::spawn(async move {
        // We need to use a factory as spawning will need a factory
        // and the spawner will need to use the factory to recreate the actor
        let mut actor = factory();
        while let Some(mut msg) = rx.recv().await {
            msg.process(&mut actor).await;
        }
    });
    Addr { tx }
}

Setup the panic loop

Currently we loop around receiving each process message but we will need to select for control messages and we will need to setup nested loops to handle panics.

Let's start by refactoring to use tokio::select!:

fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
    A: Actor + Sync,
    F: FnMut() -> A + Send + 'static,
{
    let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
    tokio::spawn(async move {
        loop { // restart on panic
            let mut actor = factory();
            loop { // get next message
                tokio::select! {
                    // only have a single select! branch for now...
                    msg = rx.recv() => {
                        if let Some(mut msg) = msg else {
                            // This will probably fail right now as it will 
                            // loop back and try pull off a closed channel
                            break;
                        }
                        // TODO: make this catch panics!
                        msg.process(&mut actor).await;
                    }
                }
            }
        }
    });
    Addr { tx }
}

Catching the panic

Now to actually catch the panic we can install the futures crate.

cargo add futures

This will let us use AssertUnwindSafe to catch any panic that occurs when running the msg.process() function without it we could only catch panics in sync code.

use futures::FutureExt; // can apply AssertUnwindSafe to async functions.
 
fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
    A: Actor + Sync,
    F: FnMut() -> A + Send + 'static,
{
    let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
    tokio::spawn(async move {
        loop {
            // This is gonna fail still...
            let mut actor = factory();
            loop {
                tokio::select! {
                    msg = rx.recv() => {
                        if let Some(mut msg) = msg else {
                            break;
                        }
                        if let Err(_) = AssertUnwindSafe(msg.process(&mut actor))
                            .catch_unwind()
                            .await
                        {
                            eprintln!("ACTOR PANICKED!");
                            break; // This will now break and reload 
                                   // the actor on panic!
                        }

                    }
                }
            }
        }
    });
    Addr { tx }
}

Context: Spawning a child actor

Next let's look at how to spawn a child actor. To do this we will create a Ctx struct and give it a method that will start actors by running the start_actor() function.

pub struct Ctx<A: Actor> {
    addr: Addr<A>
}

impl<A: Actor> Clone for Ctx<A> {
    fn clone(&self) -> Self {
        Self {
            addr: self.addr.clone(),
        }
    }
}


impl<A: Actor> Ctx<A> {
    pub fn address(&self) -> Addr<A> {
        self.addr.clone()
    }

    pub fn spawn<B, F>(&self, factory: F) -> Addr<B>
    where
        F: FnMut() -> B + Send + 'static,
        B: Actor,
    {
        start_actor(factory).address()
    }
}

Add the context to our run loop and then pass the context to the ActorMessage<A>::process() and Handler<M>::handler():

fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
    A: Actor + Sync,
    F: FnMut() -> A + Send + 'static,
{
    let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
    let ctx = Ctx::<A> {
        addr: Addr { tx },
    }; 
    let ctx_loop = ctx.clone();
    tokio::spawn(async move {
        let ctx = ctx_loop.clone();
        loop {
            let mut actor = factory();
            loop {
                tokio::select! {
                    msg = rx.recv() => {
                        if let Some(mut msg) = msg else {
                            break;
                        }
                        if let Err(_) = AssertUnwindSafe(msg.process(&mut actor, &ctx))
                            .catch_unwind()
                            .await
                        {
                            eprintln!("ACTOR PANICKED!");
                            break; 
                        }

                    }
                }
            }
        }
    });
    ctx
}

We can now use the actors address as well as spawn new actors in handlers!

#[async_trait]
impl Handler<GetCounter> for Root {
    async fn handle(&mut self, _: GetCounter, ctx: &Ctx<Self>) -> Addr<Counter> {
        ctx.address().tell(Hello); // send a message to myself!
        let db = ctx.spawn(|| Db { value: 0 });
        ctx.spawn(move || Counter { db: db.clone() })
    }
}

This currently does not track the new children so we will need to setup a way to do that soon.

Stopping with the Stoppable trait on Ctx<A>

Stopping is where things start to get a little more complex. We need to use the Ctx<A> to trigger a stop message for not only the current actor but also have a way to stop all this actor's children.

We start with a trait:

#[async_trait]
pub trait Stoppable {
    /// Trigger stop in this actor (and all children)
    fn stop(&self);
     
    /// return a future that resolves once our run loop has finished
    async fn wait_until_stopped(&self);
     
    /// Stop all this actor's children and wait for them to all finish
    async fn stop_all_children(&self);
}

We can implement this on Ctx<A> and use it to keep track of all child contexts by allowing us to keep a list of Vec<Box<dyn Stoppable>> as the children of this actor.

We use interior mutability to provide a better API for our callers and enable cloning hence the rather ugly: Arc<Mutex<Vec<Arc<dyn Stoppable + Send + Sync>>>>

This means we don't need a mutable reference to Ctx which is important as we clone it around. It is a little messy but it ok because callers never see this.

pub struct Ctx<A: Actor> {
    addr: Addr<A>
    children: Arc<Mutex<Vec<Arc<dyn Stoppable + Send + Sync>>>>,
}

#[async_trait]
impl<A> Stoppable for Ctx<A>
where
    A: Actor,
{
    fn stop(&self) {
        // magic...
    }

    async fn wait_until_stopped(&self) {
        // more magic...
    }

    async fn stop_all_children(&self) {
        let children: Vec<_> = self.children.lock().unwrap().drain(..).collect();
        for child in children {
            child.stop();
            child.wait_until_stopped().await;
        }
    }
}

We can now go back and fix the Ctx::<A>::spawn() method so it keeps track of children:

pub fn spawn<B, F>(&self, factory: F) -> Addr<B>
where
    F: FnMut() -> B + Send + 'static,
    B: Actor,
{
    let child = start_actor(factory);
    self.children.lock().unwrap().push(Arc::new(child.clone()));
    child.address()
}

Adding the magic... with CancellationToken!

CancellationToken comes from the tokio-util crate (NOT to be confused with tokio-utils).

It is a token which can be used to signal a cancellation request to one or more tasks.

We can trigger cancellation using CancelationToken::cancel()

mytoken.cancel(); // resolve all futures from mytoken.cancelled()

We can then track cancellation using CancellationToken::cancelled()

which aught to look something like:

mytoken.cancelled().await
// Finally awaited because elsewhere mytoken.cancel() was called!

What we can do to manage our stopping lifecycle is craftily use two cancel tokens.

sequenceDiagram
    participant ctx as ctx
    participant run_loop as run_loop
    participant loop_cancel_token as loop_cancel_token
    participant loop_stop_token as loop_stop_token

    run_loop->>+loop_stop_token: start()
    loop_stop_token->>loop_stop_token: ctx.stopped.drop_guard()
    ctx->>loop_cancel_token: ctx.cancel.cancel()
    loop_cancel_token->>loop_cancel_token: ctx.cancel.cancelled().await
    note over ctx,loop_stop_token: do cleanup...
    loop_stop_token->>-ctx: ctx.stopped.cancelled().await

We use one to trigger a stop and the other to inform that the stop has complete.

So in the Ctx<A>:

pub struct Ctx<A: Actor> {
    addr: Addr<A>,
    children: Arc<Mutex<Vec<Arc<dyn Stoppable + Send + Sync>>>>,
 
    /// Signal to stop
    cancel: CancellationToken,
    /// Signal that the actor has stopped
    stopped: CancellationToken,
}

#[async_trait]
impl<A> Stoppable for Ctx<A>
where
    A: Actor,
{
    fn stop(&self) {
        self.cancel.cancel();
    }

    async fn wait_until_stopped(&self) {
        self.stopped.cancelled().await;
    }

    async fn stop_all_children(&self) {
        let children: Vec<_> = self.children.lock().unwrap().drain(..).collect();
        for child in children {
            child.stop();
            child.wait_until_stopped().await;
        }
    }
}

Add cancellation (but it does not quite work)

Now in our loop we do the following:

The following still does not quite work:

fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
    A: Actor + Sync,
    F: FnMut() -> A + Send + 'static,
{
    let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
    let stopped = CancellationToken::new();
    let ctx = Ctx::<A> {
        addr: Addr { tx },
        children: Arc::new(Mutex::new(Vec::new())),
        cancel,
        stopped: stopped.clone(),
    }; 
    let ctx_loop = ctx.clone();
    tokio::spawn(async move {
        let ctx = ctx_loop.clone();
        let _stopped_guard = stopped.drop_guard();

        loop {
            let mut actor = factory();
            loop {
                tokio::select! {
                    biased; // Important makes sure we check in order

                    // Wait for the cancel signal
                    _ = ctx.cancel.cancelled() => {
                        break;
                    }

                    msg = rx.recv() => {
                        if let Some(mut msg) = msg else {
                            break;
                        }
                        if let Err(_) = AssertUnwindSafe(msg.process(&mut actor, &ctx))
                            .catch_unwind()
                            .await
                        {
                            eprintln!("ACTOR PANICKED!");
                            break; 
                        }

                    }
                }
            }
                     
            // Stop and wait for all children regardless of why we exited.
            ctx.stop_all_children().await;

            // NOTE: BROKEN WE STILL ALWAYS LOOP!!
        }
         
        // _stopped_guard drops here and signals `stopped`.
    });
    ctx
}

Escalation is the missing piece.

After we get a panic our external loop will restart the actor automatically but we need to test for how many times the actor has been restarted and if it is within certain configurable bounds and if it has done multiple restarts then it needs to let it's parent know using a channel that is injected into the loop function.

Let's update the Actor trait to add an escalation function which will be fired when a child has escalated.

#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
    /// Start the actor
    fn start(self) -> Addr<Self> {
        let mut once = Some(self);
        let ctx = start_actor(
            move || once.take().expect("Factory can only be accessed once!"),
            CancellationToken::new(),
            mpsc::unbounded_channel().0,
        );
        ctx.address()
    }
    /// Override to change the interrupt behaviour. Default behaviour on escalation is to restart the parent actor
    async fn child_escalated(&self, _ctx: &Ctx<Self>) -> Option<Interrupt> {
        Some(Interrupt::RestartToEscalate)
    }
}

And now let's adjust our loop to add the following:

fn start_actor<A, F>(
    mut factory: F,
    cancel: CancellationToken,
    escalate_to_parent: mpsc::UnboundedSender<()>,
) -> Ctx<A>
where
    A: Actor + Sync,
    F: FnMut() -> A + Send + 'static,
{
    let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
    let (child_escalations, mut child_escalations_rx) = mpsc::unbounded_channel();
    let stopped = CancellationToken::new();
    let ctx = Ctx::<A> {
        addr: Addr {
            tx,
            stopped: stopped.clone(),
        },
        children: Arc::new(Mutex::new(Vec::new())),
        cancel,
        stopped: stopped.clone(),
        child_escalations,
    };
    let ctx_loop = ctx.clone();
    tokio::spawn(async move {
        let ctx = ctx_loop;
        let _stopped_guard = stopped.drop_guard();
         
        let mut restarts = 0u64;
        let mut first_restart = Instant::now();
        loop {
            let mut actor = factory();
            let code = loop {
                tokio::select! {
                    biased; // Important makes sure we check in order
 
                    // Check the cancel signal
                    _ = ctx.cancel.cancelled() => {
                        break Interrupt::Stop;
                    }
 
                    // Check if our children have escalated
                    Some(_) = child_escalations_rx.recv() => {
                        if let Some(interrupt) = actor.child_escalated(&ctx).await {
                            break interrupt;
                        }
                    }
 
                    // Receive a message
                    msg = rx.recv() => {
                        if let Some(mut msg) = msg else {
                            break;
                        }
                        if let Err(_) = AssertUnwindSafe(msg.process(&mut actor, &ctx))
                            .catch_unwind()
                            .await
                        {
                            eprintln!("ACTOR PANICKED!");
                            break Interrupt::RestartToEscalate;
                        }
                    }
                }
            };
 
            // Stop and wait for all children regardless of why we exited.
            ctx.stop_all_children().await;
 
            match code {
                Interrupt::Stop => {
                    actor.stopped(&ctx).await;
                    break;
                }
                Interrupt::RestartToEscalate => {
                    if first_restart.elapsed().as_secs() > 5 {
                        restarts = 0;
                        first_restart = Instant::now();
                    }
                    restarts += 1;
                    if restarts >= 3 {
                        eprintln!("Actor restarted {} times in {}s, escalating.", 3, 5);
                        let _ = escalate_to_parent.send(());
                        break;
                    }
                }
            }
        }
        // _stopped_guard drops here and signals `stopped`.
    });
    ctx
}

We can pass in the child_escalations token from the parent to the child.

pub fn spawn<B, F>(&self, factory: F) -> Addr<B>
where
    F: FnMut() -> B + Send + 'static,
    B: Actor,
{
    let child = start_actor(
        factory,
        self.cancel.child_token(),
        self.child_escalations.clone(),
    );
    self.children.lock().unwrap().push(Arc::new(child.clone()));
    child.address()
}

Finally our actor will restart on a panic and escalate to it's parent. If the parent restarts all child actors are stopped.