Writing your own Rust-Based Actor Model: Supervision
Actors in Tokio over Actix - Supervision
One of the interesting features of the Actor model is how errors are propagated orthogonally to how code is run.
In normal Rust programs errors are managed with Result<T,E> so errors are always manged and in general the caller will handle the error.
graph TD Caller["Caller (recovers)"] -.-> Callee style Callee fill: #FF3333 style Caller stroke-dasharray: 5 5, stroke: #FF3333, stroke-width: 4px
In contrast the Actor Model manages errors using a supervision heirarchy, where parent actors supervise child actors and manage restarts and escalation
graph TD Parent["Parent (recovers)"] --> Child1 Parent --> Child2 Caller -.-> Child2 style Child2 fill: #FF3333 style Parent stroke-dasharray: 5 5, stroke: #FF3333, stroke-width: 4px
In the last article we put together a simple Actor Model framework in Rust that ergonomically manages sending and handling messages between addresses and actors using tokio channels.
Here I want to look at adding supervision and restarts to manage panics within actors.
I want to start by making a kind of shopping list as to what devices and programming constructs in Rust we are going to need to implement our supervision behaviour.
Supervision: "Let it crash"
Rust handles in process errors pretty well using the Result<T,E> enum there is little point in looking to manage non exceptional invariants with our actor system. Unexpected Actor failure however is a different issue therefore we are mainly concerned with things that are unlikely or out of our control that cause panics. There is a concept in reactive / actor Model programming called "let it crash".
"Prefer a full component restart to internal failure handling." - Reactive Design Patterns
The idea here is that if an actor errors unexpectedly we should simply let the process it crash and then get another process to restart it.
This should have an influence on how we design our systems in that if data is critical it should be persisted.
I want to keep this easy to reason about and limit the communication between parent and child actors so in our system we can assume that on crash we will automatically restart the child until it hits a threshold of restarts in a certain window at which point the child will escalate to the parent to handle the error.
For example:
child panic
child restart
child panic
child restart
child panic
child restart
escalating to parent
parent panic
parent restart
new child created
Now we can add the first things to our shopping list.
For our list we will need to...
- Catch panics within processing our messages
- Create a loop around our existing process loop that will recreate the actor hooking it back up to the mailbox
Context: Managing Heirarchy and Utilities
To form a heirarchy we will need a place to store the tree of registered nodes. We will want these to be managed so that they can be easily stopped if required as well as expose to allow an actor's children to all be stopped.
Secondly, within processing the actor has no way to know its own address which is sometimes usefull when working with actors.
For our list we will need to...
- Provide a context
Ctx<A>to our handlers for:
- Managing child trees
- Exposing the actor's address
- Actor control:
stop,wait,stop_all_children()etc.
Stopping actors
We have Actors that form trees through context Ctx<A> but now when a parent node stops or panics all it's children should also stop.
This way when the actor is restarted it has the option of recreating the children that it was managing.
When we tell an actor to stop we need it to tell us that it has stopped so we can do things like gracefull shutdown.
For our list we will need to...
- Asynchronously signal all children to stop when a parent has stopped.
- Inform listeners that our main run loop has ended.
- Listen for stop signals and process them before anything else.
Escalation
We need a way for children to signal their parents that they have restarted more times than the threshold and to escalate so that the parent can determine what to do.
For our list we will need...
- An escalation channel to tell the parent a child has died
Lifecycle methods
We will need some lifecycle methods to allow developers to hook into these stages of an actor's lifescycle.
We want to do operations when an actor has started such as get the latest state from the database or network.
We also want to do cleanup operations if an actor is stopped such as make sure data is saved.
We want to do operations when restarted such as manage exponential backoff if required.
For our list we will need
- These callbacks:
started()stopped()restarted()
Our completed feature list
So all together we need the following pieces. Here I will add the Rust based solutions we can use to the right in this table:
| Item | Solution |
|---|---|
1. Catch panic!() during processing | AssertUnwindSafe and the futures::FutureExt extension for handling async panics |
| 2. Loop around the outside of our while loop | Rust loop {} |
| 3. Provide a context to our handlers | setup the Ctx<A> object at the top of our run loop and pass it to everything in order to control the actor |
| 4. Asynchronously signal all children to stop when a parent has stopped | Use Ctx<A> API iterate through all children and wait for them to stop |
5. A mechanism to inform Ctx<A> and Addr<A> that our run loop has stopped | Use tokio_util::CancelationToken with drop_guard() that falls out of scope at the end of our control loop |
| 6. Listening for stop signals and process them first | Use tokio::select! macro with biased; keyword and interact with CancelationToken |
| 7. An escalation channel for children to trigger their parents | Use tokio::sync::mpsc::unbounded_channel |
| 8. Lifecycle callbacks | Call during the main run loop |
Let's get building!
Let's now look at implementing each one of these pieces.
Prepare the process loop
Previously we had a processing loop that looked a little like this:
fn start(self) -> Addr<Self> {
let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<Self>>();
tokio::spawn(async move {
let mut this = self;
while let Some(mut msg) = rx.recv().await {
msg.process(&mut this).await;
}
});
Addr { tx }
}
Much of the complexity in setting up an Actor Model Framework comes about from preparing supervision in this loop. This will get much larger so to prepare we should consider extracting this loop to be a standalone function also we will need to start actors in both the Actor::start() method as well as Ctx::spawn().
Let's extract the loop:
#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
fn start(self) -> Addr<Self> {
// use Option as a workaround to transfer ownership
let mut once = Some(self);
start_actor(
move || once.take().expect("Factory can only be accessed once!"),
)
}
}
fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
A: Actor + Sync,
F: FnMut() -> A + Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
tokio::spawn(async move {
// We need to use a factory as spawning will need a factory
// and the spawner will need to use the factory to recreate the actor
let mut actor = factory();
while let Some(mut msg) = rx.recv().await {
msg.process(&mut actor).await;
}
});
Addr { tx }
}Setup the panic loop
Currently we loop around receiving each process message but we will need to select for control messages and we will need to setup nested loops to handle panics.
Let's start by refactoring to use tokio::select!:
fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
A: Actor + Sync,
F: FnMut() -> A + Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
tokio::spawn(async move {
loop { // restart on panic
let mut actor = factory();
loop { // get next message
tokio::select! {
// only have a single select! branch for now...
msg = rx.recv() => {
if let Some(mut msg) = msg else {
// This will probably fail right now as it will
// loop back and try pull off a closed channel
break;
}
// TODO: make this catch panics!
msg.process(&mut actor).await;
}
}
}
}
});
Addr { tx }
}Catching the panic
Now to actually catch the panic we can install the futures crate.
cargo add futures
This will let us use AssertUnwindSafe to catch any panic that occurs when running the msg.process() function without it we could only catch panics in sync code.
use futures::FutureExt; // can apply AssertUnwindSafe to async functions.
fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
A: Actor + Sync,
F: FnMut() -> A + Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
tokio::spawn(async move {
loop {
// This is gonna fail still...
let mut actor = factory();
loop {
tokio::select! {
msg = rx.recv() => {
if let Some(mut msg) = msg else {
break;
}
if let Err(_) = AssertUnwindSafe(msg.process(&mut actor))
.catch_unwind()
.await
{
eprintln!("ACTOR PANICKED!");
break; // This will now break and reload
// the actor on panic!
}
}
}
}
}
});
Addr { tx }
}Context: Spawning a child actor
Next let's look at how to spawn a child actor. To do this we will create a Ctx struct and give it a method that will start actors by running the start_actor() function.
pub struct Ctx<A: Actor> {
addr: Addr<A>
}
impl<A: Actor> Clone for Ctx<A> {
fn clone(&self) -> Self {
Self {
addr: self.addr.clone(),
}
}
}
impl<A: Actor> Ctx<A> {
pub fn address(&self) -> Addr<A> {
self.addr.clone()
}
pub fn spawn<B, F>(&self, factory: F) -> Addr<B>
where
F: FnMut() -> B + Send + 'static,
B: Actor,
{
start_actor(factory).address()
}
}
Add the context to our run loop and then pass the context to the ActorMessage<A>::process() and Handler<M>::handler():
fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
A: Actor + Sync,
F: FnMut() -> A + Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
let ctx = Ctx::<A> {
addr: Addr { tx },
};
let ctx_loop = ctx.clone();
tokio::spawn(async move {
let ctx = ctx_loop.clone();
loop {
let mut actor = factory();
loop {
tokio::select! {
msg = rx.recv() => {
if let Some(mut msg) = msg else {
break;
}
if let Err(_) = AssertUnwindSafe(msg.process(&mut actor, &ctx))
.catch_unwind()
.await
{
eprintln!("ACTOR PANICKED!");
break;
}
}
}
}
}
});
ctx
}
We can now use the actors address as well as spawn new actors in handlers!
#[async_trait]
impl Handler<GetCounter> for Root {
async fn handle(&mut self, _: GetCounter, ctx: &Ctx<Self>) -> Addr<Counter> {
ctx.address().tell(Hello); // send a message to myself!
let db = ctx.spawn(|| Db { value: 0 });
ctx.spawn(move || Counter { db: db.clone() })
}
}
This currently does not track the new children so we will need to setup a way to do that soon.
Stopping with the Stoppable trait on Ctx<A>
Stopping is where things start to get a little more complex. We need to use the Ctx<A> to trigger a stop message for not only the current actor but also have a way to stop all this actor's children.
We start with a trait:
#[async_trait]
pub trait Stoppable {
/// Trigger stop in this actor (and all children)
fn stop(&self);
/// return a future that resolves once our run loop has finished
async fn wait_until_stopped(&self);
/// Stop all this actor's children and wait for them to all finish
async fn stop_all_children(&self);
}
We can implement this on Ctx<A> and use it to keep track of all child contexts by allowing us to keep a list of Vec<Box<dyn Stoppable>> as the children of this actor.
We use interior mutability to provide a better API for our callers and enable cloning hence the rather ugly: Arc<Mutex<Vec<Arc<dyn Stoppable + Send + Sync>>>>
This means we don't need a mutable reference to Ctx which is important as we clone it around. It is a little messy but it ok because callers never see this.
pub struct Ctx<A: Actor> {
addr: Addr<A>
children: Arc<Mutex<Vec<Arc<dyn Stoppable + Send + Sync>>>>,
}
#[async_trait]
impl<A> Stoppable for Ctx<A>
where
A: Actor,
{
fn stop(&self) {
// magic...
}
async fn wait_until_stopped(&self) {
// more magic...
}
async fn stop_all_children(&self) {
let children: Vec<_> = self.children.lock().unwrap().drain(..).collect();
for child in children {
child.stop();
child.wait_until_stopped().await;
}
}
}
We can now go back and fix the Ctx::<A>::spawn() method so it keeps track of children:
pub fn spawn<B, F>(&self, factory: F) -> Addr<B>
where
F: FnMut() -> B + Send + 'static,
B: Actor,
{
let child = start_actor(factory);
self.children.lock().unwrap().push(Arc::new(child.clone()));
child.address()
}Adding the magic... with CancellationToken!
CancellationToken comes from the tokio-util crate (NOT to be confused with tokio-utils).
It is a token which can be used to signal a cancellation request to one or more tasks.
We can trigger cancellation using CancelationToken::cancel()
mytoken.cancel(); // resolve all futures from mytoken.cancelled()
We can then track cancellation using CancellationToken::cancelled()
which aught to look something like:
mytoken.cancelled().await
// Finally awaited because elsewhere mytoken.cancel() was called!
What we can do to manage our stopping lifecycle is craftily use two cancel tokens.
sequenceDiagram
participant ctx as ctx
participant run_loop as run_loop
participant loop_cancel_token as loop_cancel_token
participant loop_stop_token as loop_stop_token
run_loop->>+loop_stop_token: start()
loop_stop_token->>loop_stop_token: ctx.stopped.drop_guard()
ctx->>loop_cancel_token: ctx.cancel.cancel()
loop_cancel_token->>loop_cancel_token: ctx.cancel.cancelled().await
note over ctx,loop_stop_token: do cleanup...
loop_stop_token->>-ctx: ctx.stopped.cancelled().await
We use one to trigger a stop and the other to inform that the stop has complete.
So in the Ctx<A>:
pub struct Ctx<A: Actor> {
addr: Addr<A>,
children: Arc<Mutex<Vec<Arc<dyn Stoppable + Send + Sync>>>>,
/// Signal to stop
cancel: CancellationToken,
/// Signal that the actor has stopped
stopped: CancellationToken,
}
#[async_trait]
impl<A> Stoppable for Ctx<A>
where
A: Actor,
{
fn stop(&self) {
self.cancel.cancel();
}
async fn wait_until_stopped(&self) {
self.stopped.cancelled().await;
}
async fn stop_all_children(&self) {
let children: Vec<_> = self.children.lock().unwrap().drain(..).collect();
for child in children {
child.stop();
child.wait_until_stopped().await;
}
}
}Add cancellation (but it does not quite work)
Now in our loop we do the following:
- create a stopped
CancellationToken - setup the stopped guard to cancel the stopped token at the end of our loop
- make sure
tokio::select!is inbiasedmode so that futures are checked in order - check the cancel token incase someone has used the
stop()method. - stop all this actors children
- note that at the end of the loop the stopped token will resolve.
The following still does not quite work:
fn start_actor<A, F>(mut factory: F) -> Addr<A>
where
A: Actor + Sync,
F: FnMut() -> A + Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
let stopped = CancellationToken::new();
let ctx = Ctx::<A> {
addr: Addr { tx },
children: Arc::new(Mutex::new(Vec::new())),
cancel,
stopped: stopped.clone(),
};
let ctx_loop = ctx.clone();
tokio::spawn(async move {
let ctx = ctx_loop.clone();
let _stopped_guard = stopped.drop_guard();
loop {
let mut actor = factory();
loop {
tokio::select! {
biased; // Important makes sure we check in order
// Wait for the cancel signal
_ = ctx.cancel.cancelled() => {
break;
}
msg = rx.recv() => {
if let Some(mut msg) = msg else {
break;
}
if let Err(_) = AssertUnwindSafe(msg.process(&mut actor, &ctx))
.catch_unwind()
.await
{
eprintln!("ACTOR PANICKED!");
break;
}
}
}
}
// Stop and wait for all children regardless of why we exited.
ctx.stop_all_children().await;
// NOTE: BROKEN WE STILL ALWAYS LOOP!!
}
// _stopped_guard drops here and signals `stopped`.
});
ctx
}Escalation is the missing piece.
After we get a panic our external loop will restart the actor automatically but we need to test for how many times the actor has been restarted and if it is within certain configurable bounds and if it has done multiple restarts then it needs to let it's parent know using a channel that is injected into the loop function.
Let's update the Actor trait to add an escalation function which will be fired when a child has escalated.
#[async_trait]
pub trait Actor: Send + Sync + Sized + 'static {
/// Start the actor
fn start(self) -> Addr<Self> {
let mut once = Some(self);
let ctx = start_actor(
move || once.take().expect("Factory can only be accessed once!"),
CancellationToken::new(),
mpsc::unbounded_channel().0,
);
ctx.address()
}
/// Override to change the interrupt behaviour. Default behaviour on escalation is to restart the parent actor
async fn child_escalated(&self, _ctx: &Ctx<Self>) -> Option<Interrupt> {
Some(Interrupt::RestartToEscalate)
}
}
And now let's adjust our loop to add the following:
- Pass in parents child escalation channel to the run loop function
- Create a new channel in the actors loop function for receiving child escalations
- Save the tx for the child_escalations to the
Ctx<A>that gets created. - Track the restarts in the restart loop and time bracket
- Create a set of Interrupt codes to use to determine what to do once interrupted.
- Match on interrupt codes and keep track of restarts to escalate
fn start_actor<A, F>(
mut factory: F,
cancel: CancellationToken,
escalate_to_parent: mpsc::UnboundedSender<()>,
) -> Ctx<A>
where
A: Actor + Sync,
F: FnMut() -> A + Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<A>>();
let (child_escalations, mut child_escalations_rx) = mpsc::unbounded_channel();
let stopped = CancellationToken::new();
let ctx = Ctx::<A> {
addr: Addr {
tx,
stopped: stopped.clone(),
},
children: Arc::new(Mutex::new(Vec::new())),
cancel,
stopped: stopped.clone(),
child_escalations,
};
let ctx_loop = ctx.clone();
tokio::spawn(async move {
let ctx = ctx_loop;
let _stopped_guard = stopped.drop_guard();
let mut restarts = 0u64;
let mut first_restart = Instant::now();
loop {
let mut actor = factory();
let code = loop {
tokio::select! {
biased; // Important makes sure we check in order
// Check the cancel signal
_ = ctx.cancel.cancelled() => {
break Interrupt::Stop;
}
// Check if our children have escalated
Some(_) = child_escalations_rx.recv() => {
if let Some(interrupt) = actor.child_escalated(&ctx).await {
break interrupt;
}
}
// Receive a message
msg = rx.recv() => {
if let Some(mut msg) = msg else {
break;
}
if let Err(_) = AssertUnwindSafe(msg.process(&mut actor, &ctx))
.catch_unwind()
.await
{
eprintln!("ACTOR PANICKED!");
break Interrupt::RestartToEscalate;
}
}
}
};
// Stop and wait for all children regardless of why we exited.
ctx.stop_all_children().await;
match code {
Interrupt::Stop => {
actor.stopped(&ctx).await;
break;
}
Interrupt::RestartToEscalate => {
if first_restart.elapsed().as_secs() > 5 {
restarts = 0;
first_restart = Instant::now();
}
restarts += 1;
if restarts >= 3 {
eprintln!("Actor restarted {} times in {}s, escalating.", 3, 5);
let _ = escalate_to_parent.send(());
break;
}
}
}
}
// _stopped_guard drops here and signals `stopped`.
});
ctx
}
We can pass in the child_escalations token from the parent to the child.
pub fn spawn<B, F>(&self, factory: F) -> Addr<B>
where
F: FnMut() -> B + Send + 'static,
B: Actor,
{
let child = start_actor(
factory,
self.cancel.child_token(),
self.child_escalations.clone(),
);
self.children.lock().unwrap().push(Arc::new(child.clone()));
child.address()
}
Finally our actor will restart on a panic and escalate to it's parent. If the parent restarts all child actors are stopped.