I came across an interesting problem while working on my project, Deneb. The problem is not specific to this project and can be found in many domains.

Deneb is a directory synchronization tool implemented as a file system in userspace (FUSE) module. FUSE modules are userspace processes which receive and process requests originating from the operating system kernel.

In addition to file system request coming from the kernel, the central processing engine of Deneb also needs to respond to user actions and network events. This system is implemented by running the event loop of the engine in a separate thread. A standard Rust multiple-producer single-consumer channel is be used to connect the various sources of requests to the engine: each source stores a clone of the sending end of the channel to submit requests. On the receiving end, owned by the engine, requests are read and processed in the same order they are received. To each type of request corresponds a specific type of reply.

Deneb is implemented in Rust, a programming language with a very strong, expressive, type system. We would like to use the type system to describe this simple communication protocol between the two parties.

Session types

The problem described here is the simplest form of a more general one - the specification of communication protocols. In the general case, protocols can contain an arbitrary number of steps and can involve more than two parties. By using type systems to encode the communication protocol, it is possible to ensure, at compilation time, that our implementation of the protocol offers certain guarantees, such as: participants can only move between states in a well specified way, request types are matched with specific reply types, replies should be received for any outstanding requests before continuing the protocol, etc.

This type-based approach is usually called session types and is a very active topic of research 1234567. The protocol used inside Deneb only involves a single request-reply step between two parties.

Initial attempt: request and reply variants

In the first attempt, Rust enums are used to describe both the set of requests and the replies. Assuming a protocol with two types of request, one containing a signed integer, the other a string, the following is an enum describing the requests:

enum Request {
    RequestA(i64),
    RequestB(String),
}

The protocol defines the replies to the two requests as a floating point number and an unsigned integer, respectively. These replies are also described with an enum:

enum Reply {
    ReplyA(f64),
    ReplyB(usize),
}

One advantage of using enums for the request and reply types is that the communication channels can be defined very easily. All replies can be sent back across a channel of Reply values:

type ReplySender = std::sync::mpsc::Sender<Reply>;
type ReplyReceiver = std::sync::mpsc::Receiver<Reply>;

Requests are bundled together with a ReplySender and are sent across a channel:

type PackedRequest = (Request, ReplySender);
type RequestSender = std::sync::mpsc::Sender<PackedRequest>;
type RequestReceiver = std::sync::mpsc::Receiver<PackedRequest>;

In the engine, running in the background thread, the packaged requests are extracted from the channel:

let (tx, rx) = channel::<PackedRequest>();
let _ = tspawn(move || {
    let mut engine = Engine::new();
    info!("Starting engine event loop");
    for (request, tx) in rx.iter() {
        engine.handle_request(event, &tx);
    }
    info!("Engine event loop finished.");
});

The handle_request method contains a match expresssion dispatching to the processing logic for each request type. Replies are sent back through the ReplySender end of the reply channel.

On the client side, PackedRequest objects are sent over the channel to the engine:

fn make_request(&self, req: Request) -> Result<Reply> {
    let (tx, rx) = channel(0);
    self.channel
        .clone()
        .send((req, tx))
        .map_err(|_| EngineError::SendFailed)?;
    rx.recv().map_err(|e| e.into())
}

For each received reply, its variant needs to be checked, to ensure it is the correct one for the originating request:

fn make_requestA(&self, _id: &RequestId, index: i64) -> Result<f64> {
    let reply = self.make_request(Request::RequestA { index })?;
    if let Reply::ReplyA(result) = reply {
        result
    } else {
        Err(EngineError::InvalidReply.into())
    }
}

The fact that this run-time check needs to be done manually is a disadvantage of this approach. It is verbose and error-prone, in the case of protocols with many different types of request. In the next session, an alternative approach is described, which does not have this issue.

Session traits

While researching an improved way to describe this protocol, I came across the Actix project, an implementation of the actor model in Rust. Programs built using this model are made up of a number of actors, independent entities which execute concurrently and which can only communicate by passing messages to each other.

The implementation of the message passing protocol in Actix uses the type system to constrain which reply data can be expected for any given request. The solution I describe here is based on Actix, but it is simplified for the more specific use case in Deneb. I encourage browsing the source code of Actix, if you are interested in the general solution.

The following approach uses traits to describe the various requests and replies, instead of enums. The first step is defining the Request trait:

trait Request: Send {
    type Reply: Send;
}

This trait does not contain any methods, but defines an associated type, Reply - the type of data expected as a response to the request. Additionally, the trait forces both the request and the reply types to be Send types, needed for passing them across thread boundaries.

The second trait which needs to be defined is the RequestHandler:

trait RequestHandler<R>
where
    R: Request,
{
    fn handle(&mut self, request: &R) -> R::Reply;
}

This is a generic trait, parameterized on the Request type. The trait only defines the handle method, which allows the handler’s internal state to change and takes a Request object and processes it to return the Reply type corresponding to the request.

Next, we need a way to encode the relationship between the request, the reply, and the handler with processes the request:

struct RequestProxy<R, H>
where R: Request,
      H: RequestHandler<R>,
{
    req: R,
    tx: Sender<R::Reply>,
    _hd: PhantomData<fn(&mut H) -> R::Reply>,
}

The RequestProxy struct is parameterized on the Request type. The struct contains an owned copy of the request data, and the sending end of the channel used to return the reply data. The struct is additionally parameterized on the RequestHandler type, as a way to encode the fact that the RequestProxy object contains some request data that should be handled in the future by a specific type of request handler. Since this relationship is not reflected in the normal data members of the struct, the code would not compile. This is solved by including an additional data member of type PhantomData<fn(&mut H) -> R::Reply>. The phantom data member is a marker - it does not have a representation in the RequestProxy struct at run-time, it only instructs the compiler to treat the struct as owning an instance of the inner type of the phantom data. In this case, the inner type is a function going from &mut RequestHandler to Request::Reply.

A final component is needed to allow sending the requests, each with a different type, over the same standard Rust channel. We define a helper trait that renders opaque the application of a specific type of request handler to a request:

trait HandlerProxy: Send {
    type Handler;
    fn run_handler(&self, handler: &mut Self::Handler);
}

The HandlerProxy trait does not have Request as a type parameter and only defines the run_handler method, which takes a mutable reference to a Handler object - whose type is given to the trait as an associate type. It should be noted that this associated type is not bounded by the RequestHandler trait. We implement the trait for the RequestProxy type:

impl<R, H> HandlerProxy for RequestProxy<R, H>
where
    R: Request,
    H: RequestHandler<R>,
{
    type Handler = H;
    fn run_handler(&self, hd: &mut Self::Handler) {
        let reply = hd.handle(&self.req);
        let _ = self.tx.send(reply);
    }
}

Using this final trait, it is possible to construct the communication channel to the engine. The values sent over the channel are trait objects implementing the HandlerProxy trait:

struct PackagedRequest<H>
{
    inner: Box<HandlerProxy<Handler = H>>,
}

impl<H> HandlerProxy for PackagedRequest<H>
{
    type Handler = H;
    fn run_handler(&self, hd: &mut Self::Handler) {
        self.inner.run_handler(hd);
    }
}

type RequestChannel<H> = Sender<PackagedRequest<H>>;

The request processing loop on the side of the engine has minimal changes with respect to the initial version:

impl Engine {
    fn start() -> (Handle, JoinHandle<()>) {
        let (tx, rx) = channel();
        let thread_handle = spawn(move || {
            let mut engine = Engine::new();
            for request in rx.iter() {
                request.run_handler(&mut engine);
            }
        });
        (Handle { ch: tx }, thread_handle)
    }
}

The Request trait needs to be implemented for each type of request:

struct R1 {
    val: i64,
}

struct R2 {
    msg: String,
}

impl Request for R1 {
    type Reply = f64;
}

impl Request for R2 {
    type Reply = usize;
}

An implementation of RequestHandler with each request type needs to be provided for the Engine type:

impl RequestHandler<R1> for Engine {
    fn handle(&mut self, request: &R1) -> <R1 as Request>::Reply {
        request.val as f64 * 0.33
    }
}

impl RequestHandler<R2> for Engine {
    fn handle(&mut self, request: &R2) -> <R2 as Request>::Reply {
        request.msg.len()
    }
}

A generic function parameterized on the type of request and handler is used to submit the requests and wait for their completion:

fn make_request<R, H>(req: R, ch: &RequestChannel<H>) -> R::Reply
where R: Request + 'static,
      H: RequestHandler<R> + 'static,
{
    let (tx, rx) = channel();
    let envelope = PackagedRequest {
        inner: Box::new(RequestProxy { req, tx, _hd: PhantomData}),
    };
    let _ = ch.send(envelope);
    let rep = rx.recv().unwrap();
    rep
}

By construction, the type of reply is known to be the correct one for the type of request performed, which is what was sought through this design.

The code presented in this post can be found in the session-traits repository in my GitHub account. Additionally, the technique described here is used in Deneb.

Conclusions

In this post we discuss a technique for implementing a simple request-reply protocol, between different threads, in the Rust programming language. The threads are connected with generic channels and the goal is to use the type system of Rust to enforce, at compile time, that for each type of request made, the type of reply received has a specific type.

The process involves two main steps. First, a set of generic traits and types are defined, to tie together the type of request, reply, and request handler in a single proxy type. In the second step, the proxy data describing the request is packaged using boxing and trait objects, and sent over the channel to be processed.

The advantage over an approach based on maintaining two enums, one for the different requests and one for the associated replies, is that no run-time check for the type of reply is required with the trait-based approach.

Adding a new message to the protocol involves implementing the Reply trait which defined the type of reply, and implementing the RequestHandler trait for the new type of request, on the object which is processing the requests.

The generic part of the implementation is less than 100 lines long, but I suspect it’s possible to further simplify this solution. Additionally, some of the boilerplate code required for implementing the traits for the different types of request could be shortened with the help of some macros, but this has not been implemented at this point. It would also be interesting to see if this approach could be extended to multi-step or multi-party protocols.