Why I Ditched Actix for Tokio: Building a Simpler Rust Actor Framework (Part 1/2)

⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣠⣦⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣠⣾⡿⢿⣿⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⣤⣄⣠⣿⣿⣀⡀⣹⣿⣀⣀⣀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⣿⣿⡿⢿⣯⣿⣿⣿⡿⣿⡿⣿⣿⣿⡆⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⢠⣶⣦⣤⡄⢀⣀⡀⢠⣶⣿⣶⣶⣿⠀⠘⢿⣿⣿⣷⣽⣿⣿⣿⣿⣵⣾⣿⣿⣿⠃⢠⣆⣀⣀⣀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⣶⣤⣘⣿⣿⡍⢸⣿⣻⣷⣶⣿⣿⣿⣿⣿⣷⣤⣠⣴⣿⣷⣿⣿⣿⣿⣿⣷⣿⣿⣀⠀⣀⣼⣿⣿⣿⣿⣿⡂⣴⣦⠀⣾⣿⢻⣧⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⢠⣴⣼⣿⣿⠿⢿⣿⢿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣷⣿⣶⣿⣿⣿⣧⣴⣴⣦⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⢾⣿⣿⣿⢱⣶⣆⣿⣿⡿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⠿⠛⠻⣿⣿⣶⣶⡆⠀⠀⠀
⠀⠀⠀⠀⢻⣿⣿⣿⣽⣿⣿⣿⣿⣿⣾⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⢟⣿⣿⣷⣿⣿⣻⣿⣿⣿⡇⠀⠀⠀
⠀⠀⠀⣤⣾⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣾⣿⣿⣿⣿⣯⣿⣿⣿⣿⠃⠀⠀⠀
⠀⢀⣠⣴⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⡿⠟⠋⠉⠀⠈⠉⠛⢿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⡿⠛⠛⠛⠻⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣷⡤⠀⠀⠀
⣰⣿⡟⣉⠉⣙⣛⣿⣿⣿⣿⣿⣟⠋⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⠻⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⠟⠉⠀⠀⠀⠀⠀⠀⠀⠙⠻⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣥⣄⠀⠀
⣿⣻⣸⡿⠼⠿⠟⢿⣿⣿⣿⣿⣿⣷⣤⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢻⣿⣿⣿⣿⣿⣿⣿⣿⡿⠁⠐⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢉⣿⣿⣿⣿⣿⣿⣯⣉⠉⣙⣿⡗⣦
⠹⠏⢉⣡⣤⣶⣶⣾⣿⣿⣿⣿⣿⣿⣿⣿⣿⣷⣶⣶⣶⣶⣶⣶⣶⣶⣾⣿⣿⣿⣿⣿⣿⣿⣿⣧⣤⣤⣀⣀⣀⣀⣀⠀⢀⣀⣠⣤⣾⣿⣿⣿⣿⣿⣿⣿⠿⠻⡿⣿⣹⣃⣾
⢠⣾⣿⡿⠟⠋⠉⠉⣹⣿⣿⣿⣿⡿⢿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⢿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣷⣶⣤⣉⠁⠛⠁
⠛⠁⠛⠀⠀⠀⠀⡾⢿⣟⠿⢿⣿⣷⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⢻⣿⣿⣿⣿⣿⡉⠉⠙⠻⢿⣿⣷⡄⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠀⠈⠛⠋⣿⡟⢿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⡿⠿⠿⠿⠿⠿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⠿⠿⣻⡿⠿⠆⠀⠀⠀⠛⠉⠛⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢿⠇⠈⠛⠁⠈⠉⠛⠛⠛⠿⠟⠉⠀⠀⠀⠀⠀⠀⠈⠙⢿⣿⠿⢿⣿⠿⠿⠻⣿⡿⢛⣿⡏⠛⠋⠀⠀⠈⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⠿⠇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀

image courtesy emojicombos

I have had the misfortune of working with the very powerful Actix Actor Model library in production over complex business logic and whilst it is probably the best known actor model framework for Rust it pushes you in various directions that you probably don't really want to go in - well not initially - and in all honesty I probably will not use it again.

It is probably part of my therapudic process that I feel compelled to write this article in the hope that others may avoid my mistakes.

I should add that whilst Actix is generally complex to use and may result in your business logic being unnecessarily convoluted it is still one of the fastest Actor Model libraries around and the suggestion I put forward in this blog will not be as fast as using Actix for an Actor Model system.

Speed is not everything however. These days I suggest following Alice Rhyll's wonderful actor model article on constructing actors using Tokio channels which is much more usable especially if you want to retain your sanity and have more control over the system you are building.

In fact this series could be considered the spiritual successor to her technique as I expand on the basics to flesh out a more comprehensive system.

What sort of things do you need Actors for?

Actors like lambda calculus are a whole computation paradigm so you certainly can make your entire system a set of interacting actors and that can work - however in a typical system it might make more sense to use actors to represent a logical boundary for services your system will use in order to manage concurrency.

A good example of this might be boundary services for example a network system, websocket, crypto rpc, database.

graph LR
    subgraph frameworks
        subgraph actorDB["DB Actor"]
            C[(Database)]
        end
        A[UI]
        subgraph actorNet["Net Actor"]
            D[Network]
        end
        subgraph adaptors
            CC["<< database >>"]
            DD["<< network >>"]
            subgraph domain
                B[Usecases]
            end
        end
    end
    A --> B
    B -.-> CC
    B -.-> DD
    CC -.PUT.-> C
    DD -.-> D
    D ---> B
linkStyle 3 color: #2d6a4f
style actorDB stroke-dasharray: 5 5, fill: #FFFACD, stroke: #FFD700, color: #1b4332, stroke-width: 3px
style actorNet stroke-dasharray: 5 5, fill: #FFFACD, stroke: #FFD700, color: #1b4332, stroke-width: 3px

Here I am thinking of building things like instant messenging applications - but you can apply this architecture to large distributed systems as well where actors might be individual microservices - or microservice systems.

The referentially transparent nature of actors makes them suitable to distributed systems where one actor can simply send a message to another actor and they need not have any indication of where that actor exists or how it is managed. This way you can get complex distributed applications that can be scaled in interesting ways or can even self assemble in peer to peer networks by giving each peer a role to accomplish.

A simple example

Start function spawns loop and returns channel

To have an actor you need a thing that asynchronously spins on messages and processes them one at a time. You also need a way to send the actor a message. A tokio channel is an easy well understood mechanism to achieve this.

Let's say we have a process function:

fn actor_process<M>(msg:M);

This will process all our messages a simple illustration here might be something like this:

fn actor_start<M>() -> mpsc::Sender<M> {
    let (tx, mut rx) = mpsc::channel::<M>(100);
    tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            actor_process(msg)
        }
    });

    tx
}

You could then do something like:

let sender = actor_start();
sender.send("say hello!");

And assuming that the processing function knows what to do with the &str "say hello!" you might find the actor_process() function doing something fancy like printing hello to the console perhaps.

The good parts

One good thing about Actix is that its trait API for actors and messages is quite nice as it avoids having a single monolithic receive() function on actors and allows our actors to be both type-safe as well as dynamic in the way they accept messages.

The good parts of the Actix API can influence our Rust API:

struct Counter {
    count: i64,
}
 
impl Actor for Counter {}
 
#[derive(Message)]
struct Increment;
 
impl Handler<Increment> for Counter {
    fn handle(&mut self, _: Increment) {
        self.count += 1;
    }
}

 
let counter = Counter { count: 0 }.start();

The issue here is that achieving this API involves a few advanced-ish rust concepts such as type erasure and trait objects.

The terrible parts

One bad thing about Actix is the god awful way in which it handles async handlers:

// The following is Actix code. 
// This is an example of what we DON'T want!
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
// Get ready for pain...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  
 
 
 
 
 
 
 
 
 
 
 

// Don't say I did not warn you...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

// Ok here it comes...
 
// Using actix 
use actix::prelude::*;
 
struct MyActor {
    counter: u64,
    label: String,
}
 
impl Actor for MyActor {
    type Context = Context<Self>;
}
 
// This is a little convoluted with returning labels
// or whatever but I am trying to demonstrate the issues with Actix's API
#[derive(Message)]
#[rtype(result = "String")]
struct Increment(u64);
 
impl Handler<Increment> for MyActor {
    type Result = ResponseActFut<Self, String>;

    fn handle(&mut self, msg: Increment, _ctx: &mut Context<Self>) -> Self::Result {
        // We need `label` inside the async block, but we can't borrow self,
        // so we clone it out. For every. single. field.
        let label = self.label.clone();
        let amount = msg.0;

        let fut = async move {
            // pretend this is an async DB call or HTTP request
            sleep(Duration::from_millis(10)).await;
            (amount, label)
        };

        // into_actor: converts a normal future into an ActorFuture.
        // .map: gives us &mut self back — but only AFTER the future resolves,
        //       and only in this synchronous callback. So the actual mutation
        //       lives here, separated from the async logic above.
        Box::pin(fut.into_actor(self).map(|(amount, label), actor, _ctx| {
            actor.counter += amount;
            format!("[{}] counter is now {}", label, actor.counter)
        }))
    }
}

#[actix::main]
async fn main() {
    let addr = MyActor { counter: 0, label: "myactor".into() }.start();
    let res = addr.send(Increment(5)).await.unwrap();
    println!("{res}"); // [myactor] counter is now 5
}

So this is hot junk.

I can think of 3 reasons we are forced to dance like this:

  1. Actix's historical baggage
  2. Dogmatic actor model theory
  3. Efficiency

What is happening here is that we are explicitly separating mutation from our async routines so I assume these operations can be scheduled and run concurrently.

This is great if you are someone that enjoys exfoliating with a cheesegrater.

Initially I figured Actix was created before async was normal in Rust so there is historical baggage there as Actix had to reinvent lots of stuff and not everything was done in the most straightforward way but there are some other reasons this is here.

The dogmatic justification for this might be that actors are irreducably meant to do only three things:

  1. create new actors
  2. send messages
  3. change what it should do with the next message (also known as change state)

Changing what the actor will do with the next message is very much a kind of thing you probably should do at the very end of the routine and is good practice. Actix forces you to do this with its exotic future API but this in my opinion is totally not necessary. Honestly it does not really matter. The outcome is the same. You change state and use that state in your next message processing run. There is a semantic difference in the case of failure but here we are using Rust so failure is managed for the most part.

The last reason for doing this is that it is possible for framework authors to optimize immutable routines more easily than mutable ones. Actix is highly optimized - in fact it's empirically faster than using raw Tokio channels (I've tested this), and I am sure its API is designed to aid that optimization.

However, using it on a large project with complex logic is a total nightmare. This is probably a good example of premature optimization. The Actix API doesn't lend itself well to reasoning about control flow, and this separation of async and mutation leads to fragmentation of business logic that fails to explain the story of what the code is doing - I know this from painful experience!

We can do better. Fortunately better in this case is pretty easy.

Let's just force async processing and enjoy life.

#[async_trait]
impl Handler<Increment> for Counter {
    // Behold the magical async handler!
    async fn handle(&mut self, _: Increment) {
        self.count += 1;
    }
}

I mean we never need to care about race conditions as our processing loop looks like this:

fn actor_start() -> mpsc::Sender<M> {
    let (tx, mut rx) = mpsc::channel::<M>(100);
    tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            actor_process(msg).await 
            // now lets loop and process the next message..
        }
    });
    tx
}

This means our framework might be a touch slower than actix as the optimization opportunities are more limited but I believe anyone that has used Actix in anger shall probably find this idea reasonable in terms of tradeoff for a decent number of usecases.

Build out the Framework

So let's get to the business of abstraction to formulate our API.

The Actor trait

We can start with an Actor trait:

pub trait Actor {
    // we will flesh this out in a minute...
}

This allows us to turn a struct into an Actor.

struct Counter {}
 
impl Actor for Counter {} // Now an Actor

Define a Message trait

As we begin to flesh this out we need to define a trait for a message. This will allow us to define different Message structs we can use to communicate with actors. We need to make this Send + 'static so that the message can be sent over the channel and through to the tokio::spawn() call in our processing loop.

pub trait Message: Send + 'static {}

Handle the Message with a Handler

#[async_trait]
pub trait Handler<M>
where
    Self: Actor,
    M: Message,
{
    async fn handle(&mut self, msg: M);
}

Notice the handle function is async. We will also need to connect Actor and Message later but this is the basic structure.

Add an Envelope to hold a message M

A message exists as its own concrete type that implements the Message trait. The message does not need to know about things such as who to reply to once the message has been processed and what Actor the Message is for.

To solve this we hold Message in an Envelope<M>. This allows us to combine behaviours and control the type erasure on a concrete struct. The envelope holds an Option<M> so that we can use take() to avoid cloning it when it comes time to process.

pub struct Envelope<M: Message> {
    pub msg: Option<M>,
}

Channel of messages for an Actor

An actor needs to be able to process all Messages it has been sent in order, however, we have a problem in that Rust channels are typed. An mpsc::channel::<Increment> can only carry Increment. But an actor needs to process Increment, Decrement, GetCount - many different types through one receive channel.

The solution is a trait object: Box<dyn SomeTrait>. That erases the concrete type. But what trait? We need one that says "I know how to process myself against this actor" without revealing what message is inside.

#[async_trait]
trait ActorMessage<A> : Send {
    async fn process(&mut self, act: &mut A);
}

With this trait we can update our Actor to actually process messages.


type PointerToActorMessage<A> = Box<dyn ActorMessage<A>>;
 
pub trait Actor: Send + Sized + 'static {
    fn start(self) -> UnboundedSender<PointerToActorMessage<A>> {
        let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<Self>>();
        tokio::spawn(async move {
            let mut this = self;
            while let Some(mut msg) = rx.recv().await {
                msg.process(&mut this).await;
            }
        });
        tx
    }
}

Now we can implement this over our Envelope with this where clause that combines the Actor A with the Message M monomorphically:

#[async_trait]
impl<A, M> ActorMessage<A> for Envelope<M>
where
    A: Actor + Handler<M>,
    M: Message { /*...*/ }

This is the whole Envelope:

pub struct Envelope<M: Message> {
    pub msg: Option<M>,
}
 
impl<M: Message> Envelope<M> {
    pub fn new(msg: Option<M>) -> Box<Self> {
        Box::new(Self { msg })
    }
}
 
#[async_trait]
impl<A, M> ActorMessage<A> for Envelope<M>
where
    A: Actor + Handler<M>,
    M: Message,
{
    // A new version of this handler is created for each 
    // combination of Actor and Message
    async fn process(&mut self, act: &mut A) {
        if let Some(msg) = self.msg.take() {
            // call handle on the actor
            act.handle(msg).await;
        }
    }
}

Notice that the compiler will create many versions of process() for each valid combination of Actor and Message. It will also call the correct one for us.

This means if we have a channel of trait objects of type Box<dyn ActorMessage<A>> we can iterate through them and call msg.process(actor).

How to reference Actors with Addr<A>

When we create actors we need to reference them to send them messages.

let counter: Addr<Counter> = Counter { count:0 }.start();
counter.tell(Increment); // yet to be implemented...

To do this we need a concrete thing that start() returns that is part of our framework instead of just a channel sender.

pub struct Addr<A>
where
    A: Actor,
{
    tx: UnboundedSender<PointerToActorMessage<A>>>,
}
 
impl<A> Clone for Addr<A>
where
    A: Actor,
{
    fn clone(&self) -> Self {
        Addr {
            tx: self.tx.clone(),
        }
    }
}

Now we can hook this up to the Actor.

pub trait Actor: Send + Sized + 'static {
    fn start(self) -> Addr<Self> {
        let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<Self>>();
        tokio::spawn(async move {
            let mut this = self;
            while let Some(mut msg) = rx.recv().await {
                msg.process(&mut this).await;
            }
        });
        Addr { tx }
    }
}

Sending a message addr.tell()

To send a message now we call the tell(msg) method on Addr<A> passing your message. This gets wrapped in an Envelope to erase the message type and sent down the processing channel. The type systems ensures that the actor A has a handler registered to accept M.

// making this a trait allows us to 
// unify semantics and connect types
#[async_trait]
pub trait Sender<M>
where
    M: Message,
{
    fn tell(&self, msg: M);
}
 
#[async_trait]
impl<M, A> Sender<M> for Addr<A>
where
    M: Message,
    A: Actor + Handler<M>,
{
    fn tell(&self, msg: M) {
        let _ = self.tx.send(Envelope::new(Some(msg)));
    }
}

We now have all the pieces in place for unidirectional messaging between actors.

struct Counter {
    count: i64,
}
 
impl Actor for Counter {}
  
struct Increment;
 
impl Message for Increment {}
 
impl Handler<Increment> for Counter {
    fn handle(&mut self, _: Increment) {
        self.count += 1;
    }
}
 
let counter = Counter { count: 0 }.start();
 
counter.tell(Increment);
counter.tell(Increment);
counter.tell(Increment);
 

But how can we get the count?

We still need a way to request information from the actor.

To this end we will implement a pattern for asking things from actors, taking advantage of tokio async.

let count:u64 = counter.ask(GetCount).await;

Adding a Response to a Message

We can add a Response associated type to a Message:

pub trait Message: Send + 'static {
    type Response: Send;
}

We can now define our messages like this:

// Increment is fire and forget
struct Increment;
impl Message for Increment {
    type Response = ();
}
 
// GetCount has an i64 as a "Response"
struct GetCount;
impl Message for GetCount {
    type Response = i64;
}

This is verbose but don't worry for now we can address that later.

We need to have the handler return the response:

#[async_trait]
pub trait Handler<M>
where
    Self: Actor,
    M: Message,
{
    async fn handle(&mut self, msg: M) -> M::Response;
}

We also need an optional oneshot channel to handle delivery of the response if there is a responder:

pub struct Envelope<M>
where
    M: Message,
{
    pub msg: Option<M>,
    pub tx: Option<oneshot::Sender<M::Response>>,
}

We then need to make sure Envelope<M> processes and delivers the response if one exists.

#[async_trait]
impl<A, M> ActorMessage<A> for Envelope<M>
where
    A: Actor + Handler<M>,
    M: Message,
{
    async fn process(&mut self, act: &mut A) {
        if let Some(msg) = self.msg.take() {
            let res = act.handle(msg).await;
            if let Some(tx) = self.tx.take() {
                let _ = tx.send(res);
            }
        }
    }
}

Then we just need to ensure that Envelope<M> is created correctly for when we want to fire and forget (tell) or get a response (ask).

#[async_trait]
impl<M, A> Sender<M> for Addr<A>
where
    M: Message,
    A: Actor + Handler<M>,
{
    async fn ask(&self, msg: M) -> M::Response {
        let (tx, rx) = oneshot::channel();
        let _ = self.tx.send(Envelope::new(Some(msg), Some(tx)));
        rx.await.expect("actor dropped before responding")
    }

    fn tell(&self, msg: M) {
        let _ = self.tx.send(Envelope::new(Some(msg), None));
    }
}

Setup a macro for Message

Above we mentioned that this is verbose.

// Increment is fire and forget
struct Increment;
impl Message for Increment {
    type Response = ();
}
 
// GetCount has an i64 as a "Response"
struct GetCount;
impl Message for GetCount {
    type Response = i64;
}

So let's make this more concise with a macro:

use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, parse_quote, DeriveInput, Type};
 
#[proc_macro_derive(Message, attributes(response))]
pub fn derive_message(input: TokenStream) -> TokenStream {
    let input = parse_macro_input!(input as DeriveInput);
    let name = &input.ident;
 
    // Specify Type explicitly with parse_args::<Type>()
    let response_type = input
        .attrs
        .iter()
        .find(|attr| attr.path().is_ident("response"))
        .map(|attr| {
            attr.parse_args::<Type>()
                .expect("Expected #[response(Type)]")
        })
        .unwrap_or_else(|| parse_quote!(()));
 
    quote! {
        impl Message for #name {
            type Response = #response_type;
        }
    }
    .into()
}

Now we can define messages like this:

#[derive(Message)]
struct Increment;
 
#[derive(Message)]
#[response(i64)]
struct GetCount;

Usage

We can use this little library like this:

use async_trait::async_trait;
use macros::Message;
use crate::{Actor, Handler, Message, Sender};
 
struct Counter {
    count: i64,
}
 
impl Actor for Counter {}
 
#[derive(Message)]
struct Increment;
 
#[derive(Message)]
struct Decrement;
 
#[derive(Message)]
#[response(i64)]
struct GetCount;
 
#[async_trait]
impl Handler<Increment> for Counter {
    async fn handle(&mut self, _: Increment) {
        self.count += 1;
    }
}
 
#[async_trait]
impl Handler<Decrement> for Counter {
    async fn handle(&mut self, _: Decrement) {
        self.count -= 1;
    }
}
 
#[async_trait]
impl Handler<GetCount> for Counter {
    async fn handle(&mut self, _: GetCount) -> i64 {
        self.count
    }
}

let counter = Counter { count: 0 }.start();

counter.tell(Increment);
counter.tell(Increment);
counter.tell(Decrement);
counter.tell(Increment);

let count = counter.ask(GetCount).await;

assert_eq!(count, 2);

All of it

Check out the example up on this repo

The whole thing is pretty concise

use async_trait::async_trait;
use tokio::sync::{
    mpsc::{self, UnboundedSender},
    oneshot,
};
 
type PointerToActorMessage<A> = Box<dyn ActorMessage<A>>;
 
pub trait Actor: Send + Sized + 'static {
    fn start(self) -> Addr<Self> {
        let (tx, mut rx) = mpsc::unbounded_channel::<PointerToActorMessage<Self>>();
        tokio::spawn(async move {
            let mut this = self;
            while let Some(mut msg) = rx.recv().await {
                msg.process(&mut this).await;
            }
        });
        Addr { tx }
    }
}
 
pub trait Message: Send + 'static {
    type Response: Send;
}
 
#[async_trait]
pub trait Handler<M>
where
    Self: Actor,
    M: Message,
{
    async fn handle(&mut self, msg: M) -> M::Response;
}
 
#[async_trait]
pub trait ActorMessage<A>: Send {
    async fn process(&mut self, act: &mut A);
}
 
pub struct Envelope<M>
where
    M: Message,
{
    pub msg: Option<M>,
    pub tx: Option<oneshot::Sender<M::Response>>,
}
 
impl<M> Envelope<M>
where
    M: Message,
{
    pub fn new(msg: Option<M>, tx: Option<oneshot::Sender<M::Response>>) -> Box<Self> {
        Box::new(Self { msg, tx })
    }
}
 
#[async_trait]
impl<A, M> ActorMessage<A> for Envelope<M>
where
    A: Actor + Handler<M>,
    M: Message,
{
    async fn process(&mut self, act: &mut A) {
        if let Some(msg) = self.msg.take() {
            let res = act.handle(msg).await;
            if let Some(tx) = self.tx.take() {
                let _ = tx.send(res);
            }
        }
    }
}
 
pub struct Addr<A>
where
    A: Actor,
{
    tx: UnboundedSender<Box<dyn ActorMessage<A>>>,
}
 
impl<A> Clone for Addr<A>
where
    A: Actor,
{
    fn clone(&self) -> Self {
        Addr {
            tx: self.tx.clone(),
        }
    }
}
 
#[async_trait]
pub trait Sender<M>
where
    M: Message,
{
    async fn ask(&self, msg: M) -> M::Response;
    fn tell(&self, msg: M);
}
 
#[async_trait]
impl<M, A> Sender<M> for Addr<A>
where
    M: Message,
    A: Actor + Handler<M>,
{
    async fn ask(&self, msg: M) -> M::Response {
        let (tx, rx) = oneshot::channel();
        let _ = self.tx.send(Envelope::new(Some(msg), Some(tx)));
        rx.await.expect("actor dropped before responding")
    }
 
    fn tell(&self, msg: M) {
        let _ = self.tx.send(Envelope::new(Some(msg), None));
    }
}

plus the macro

use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, parse_quote, DeriveInput, Type};
 
#[proc_macro_derive(Message, attributes(response))]
pub fn derive_message(input: TokenStream) -> TokenStream {
    let input = parse_macro_input!(input as DeriveInput);
    let name = &input.ident;
 
    // Specify Type explicitly with parse_args::<Type>()
    let response_type = input
        .attrs
        .iter()
        .find(|attr| attr.path().is_ident("response"))
        .map(|attr| {
            attr.parse_args::<Type>()
                .expect("Expected #[response(Type)]")
        })
        .unwrap_or_else(|| parse_quote!(()));
 
    quote! {
        impl Message for #name {
            type Response = #response_type;
        }
    }
    .into()
}

In the next article we will talk about supervision heirarchies and how to make actors create highly stable systems that never crash.