Implementing pid1 with Rust and async/await

This post covers a simple Rust application to explore topics like async/await, unsafe Rust, signal handling, and error handling.

A few years back, I wrote up a detailed blog post on Docker's process 1, orphans, zombies, and signal handling. Please read the gory details if you're interested, but the high level summary is:

The solution from three years ago was a Haskell executable providing this functionality and a Docker image based on Ubuntu. I use that image for the base of almost all of my Docker work, and problem solved.

Side note: as a few people pointed out to me, including u/valarauca14 on Reddit, Docker has a --init flag which addresses the concerns around a missing PID1 process. There are still some downsides to that versus an ENTRYPOINT calling a pid1 executable, including (1) needing remember the extra flag each time, and lack of support in higher level tooling. But this post isn't about solving a real problem anyway, it's about playing with Rust!

Rewrite it in Rust!

A few of the Haskellers on the FP Complete team have batted around the idea of rewriting pid1 in Rust as an educational exercise, and to have a nice comparison with Haskell. No one got around to it. However, when Rust 1.39 came out with async/await support, I was looking for a good use case to demonstrate, and decided I'd do this with pid1. While the real motivation here is to demonstrate Rust to those curious—especially my Haskell-favoring coworkers—there are some real advantages to Rust over Haskell for this use case:

But to reiterate, this was mostly about learning and teaching. So the rest of this post will be about walking through the implementation and explaining some of the interesting points. We'll be hitting topics like:

The full code for this is available on Github as pid1-rust-poc. Apologies to my many coworkers who insisted that I rename this to "the grim reaper."

Intended behavior

The program we're writing is intended to be called with a command line invocation such as pid1 command arg1 arg2 arg3. It will then:

There's a slight race condition in the above, since we launch the child process before the signal handlers are installed. I'm leaving that as-is to make the code a bit easier to understand, but feel free to improve this if you're looking for a challenge!

Additionally, as pointed out by /u/wmanley on Reddit, locking a mutex inside a signal handler may deadlock. If you're looking for another challenge, you can rewrite this using signal_hook::pipe.

Parse the command

You can get the list of command line arguments as an iterator. This iterator will have the current executable's name as the first value, which we want to ignore. We want to return a pair of the command name and a vector of the rest of the arguments. And if there's no command provided, we'll use a Result to capture the error. Putting that all together, the function looks like this:

fn get_command() -> Result<(String, Vec<String>), Pid1Error> {
    let mut args = std::env::args();
    let _me = args.next();
    match args.next() {
        None => Err(Pid1Error::NoCommandGiven),
        Some(cmd) => Ok((cmd, args.collect())),
    }
}

We have to capture the result of std::env::args() inside a mutable variable, since each subsequent call to next() mutates the value, essentially popping a value off a stack. We're able to ignore the first value, then pattern match on the second value. If it's None, then the command is missing, and we return an Err value.

Otherwise, if there's a Some value, we take that as the command, and collect all of the remaining arguments from args into a Vec. Some interesting things to point out, especially to Haskellers:

Enough of that, let's move on!

The type of main

Our application needs to be able to handle a few things:

We're going to represent all of this with the signature of the main function. This looks like:

async fn main() -> Result<(), Pid1Error>

By returning a Result, we're telling the compiler: if this function produces an Err variant, print an error message to stderr and set the exit code to a failure. By adding async, we're saying: this function may await some stuff. Under the surface, this means that main is actually producing a value that is an instance of Future, but we'll get to that later. For now, the important thing to understand is that, in order to run a function like this, we need some kind of a scheduler to be available.

One option would be to rename main to main_inner, and then write a main function like:

fn main() -> Result<(), Pid1Error> {
    async_std::task::block_on(main_inner())
}

However, there's a crate called async-attributes which let's us do something a little bit slicker:

#[async_attributes::main]
async fn main() -> Result<(), Pid1Error> {
    // all of our code with .await
}

This almost makes Rust feel like a language like Haskell, Go, or Erlang, with a green threaded system just built in. Instead, Rust requires a little more effort for getting this async code, but it's almost entirely userland-code instead of a runtime system. It also means you can easily swap out different schedulers.

Launching and error handling

Inside our main function, we start by calling the get_command function:

let (cmd, args) = get_command()?;

To the uninitiated, two questions may pop up:

  1. I thought that function returns a Result value, why does it look like it's returning a pair?
  2. What's that question mark?

Perhaps unsurprisingly, one of these answers the other. The question mark can be added to any expression in Rust to ease error handling. The exact details are more complicated than this, but the above code essentially converts to:

let (cmd, args) = match get_command() {
    Ok(pair) => pair,
    Err(e) => return Err(e),
};

In other words, if the value is an Ok, it continues the current function with that value. Otherwise, it exits this function, propagating the error value itself. Pretty nice for a single character! Explicit error handling without much noise.

The next line is a little more interesting:

let child = std::process::Command::new(cmd).args(args).spawn()?.id();

We create a new command with the cmd value, set its argument to args, and then spawn the process. Spawning may fail, so it returns a Result. We're able to put the ? in the middle of the expression, and then continue chaining additional method calls. That's really slick, and composes very nicely with the .await syntax we'll see in a bit.

However, there's one curious bit here: spawn() doesn't use Pid1Error for indicating something went wrong. Instead, it uses std::io::Error. So how does the std::io::Error become a Pid1Error? There's a special trait (like a typeclass in Haskell, or interface in Java) called From in Rust. And now we can look at our definition of Pid1Error and the implementation of the From trait:

#[derive(Debug)]
enum Pid1Error {
    IOError(std::io::Error),
    NoCommandGiven,
    ChildPidTooBig(u32, std::num::TryFromIntError),
}

impl std::convert::From<std::io::Error> for Pid1Error {
    fn from(e: std::io::Error) -> Self {
        Pid1Error::IOError(e)
    }
}

It's not necessary to be this verbose; there are helper crates available providing helper attributes for more easily deriving this trait implementation. But I still prefer being verbose, and don't mind a bit of boilerplate like this.

Converting to pid_t

The child value we got above is of type u32, meaning "unsigned 32-bit integer." This is a reasonable representation for a child PID, since they cannot be negative. However, in libc the type pid_t is represented as a signed integer: type pid_t = i32. The reason for this distinction isn't documented, but it makes sense: libc has some functions that use negative values for special cases, like sending signals to entire process groups. We'll see one of those later.

Anyway, casting from a u32 to a i32 may fail. Languages like C and even Haskell encourage unchecked casting. But the default way to do this in Rust is more explicit:

use std::convert::TryInto;
let child: libc::pid_t = match child.try_into() {
    Ok(x) => x,
    Err(e) => return Err(Pid1Error::ChildPidTooBig(child, e)),
};

The TryInto trait defines a method try_into() which we want to use. In Rust, you need to use a trait to have its methods available. Fortunately, the compiler is smart about this and provides helpful error messages. Then we pattern match on the Result and return a Pid1Error::ChildPidToBig variant if the conversion fails.

You may be wondering why we used this pattern matching instead of ?. With the right From implementation, ? would work just fine. However, if you want to include additional context with your error, like the value we were trying to convert, you need to do a bit more work like above. Alternatively, you can play with the map_err method.

Filicide

Now that we know the process ID of the child, we can install a signal handler to capture any incoming SIGINTs, and send a signal ourselves to the child. Let's start with the callback that will actually send the SIGINT along.

let interrupt_child = move || {
    unsafe {
        libc::kill(child, libc::SIGINT); // ignoring errors
    }
};

Let's start from the inside out. libc::kill is a direct FFI call to the C library's kill function, which is how you send signals. We pass in the child PID and the signal we want to send. This function can result in an error result, and ideally we would handle that correctly in Rust. But we're just ignoring such errors here.

Moving out, the next thing we see is unsafe. The FFI calls to libc are all marked as unsafe. You can read more about unsafe in the Rust book.

Next, we see this weird || { ... } syntax. The pipes are used for defining a lambda/closure. We could put a comma-separated list of arguments inside the pipes, but we don't have any. Since we're trying to create a callback that will be used later, some kind of lambda is necessary.

Finally, the move. Inside our lambda, we refer to the child variable, which is defined outside of the closure. This variable is captured in the closure's environment. By default, this is captured via a borrow. This gets us into lifetime issues, where the lifetime of the closure itself must be less than or equal to the lifetime of child itself. Otherwise, we'd end up with a closure which refers to a piece of memory that's no longer being maintained.

move changes this, and causes the child value to instead be moved into the environment of the closure, making the closure the new owner of the value. Normally in Rust, this would mean that child can no longer be used in the original context, since it's been moved. However, there's something special about child: it's an i32 value, which has an implementation of Copy. That means that the compiler will automatically create a copy (or clone) of the value when needed.

OK! Now that we have our callback, we're going to use the really helpful signal-hook crate to install a handler for SIGINTs:

let sigid: signal_hook::SigId =
    unsafe { signal_hook::register(signal_hook::SIGINT, interrupt_child)? };

This register call is also unsafe, so we have an unsafe block. We pass in both SIGINT and the interrupt_child callback. And we stick a question mark at the end in case this fails; if so, our whole program will exit, which seems reasonable. We capture the resulting sigid so we can unregister this handler later. It's honestly not really necessary in a program like this, but why not.

The rest of our main function looks like this:

// something about handling the reaping of zombies...

signal_hook::unregister(sigid);
Ok(())

This unregisters the handler and then uses Ok(()) to indicate that everything went fine. Now we just need to deal with that reaping business.

Futures, Streams, signals, tasks and wakers

The last thing we need to do is reap the orphans in a loop, stopping when the direct child we spawned itself exits. Using the libc blocking waitpid call, this would actually work just fine as a normal loop with blocking system calls. Since our pid1 program doesn't have anything else to do, the blocking calls will not tie up an otherwise-useful system thread.

However, the goal of this exercise is to use the new async/.await syntax and Futures, and to use only non-blocking calls. So that's what we're going to do! To do this, we're going to need to talk about tasks. A task is similar to a thread, but is implemented in pure Rust using cooperative multithreading. Instead of the OS scheduling things, with tasks:

We want the ability to "block" until a new child thread has died. Our application will be notified of this by the SIGCHLD signal. We then want to be able to generate a Stream of values indicating when a child process has died. A Stream is a slight extension of a Future which allows multiple values to be produced instead of just a single value. To represent this, we have a Zombies struct:

struct Zombies {
    sigid: signal_hook::SigId,
    waker: Arc<Mutex<(usize, Option<Waker>)>>,
}

This holds onto the SigId generating when we register the callback action, the same as we had from the SIGINT above. It also has a waker field. This waker follows the common pattern of Arc (atomic reference counted) around a Mutex around some data. This allows for reading and writing data from multiple threads with explicit locking, thereby avoiding race conditions. Rust is very good at using the type system itself to avoid many race conditions. For example, try replacing the Arc with an Rc (non-atomic reference counted) and see what happens.

Within out Arc<Mutex<...>>, we are storing a pair of values:

OK, enough talking about code. Let's look at the implementation of Zombies.

New Zombies

Within our impl Zombies { ... }, we define a new function. This is not an async function. It will do its work synchronously, and return once everything is set up. First we're going to create our Arc<Mutex<...>> bit and make a clone of it for a callback function:

let waker = Arc::new(Mutex::new((0, None)));
let waker_clone = waker.clone();

Next, the callback function, which should be called each time we get a SIGCHLD. Remember our goal: to increment the counter and call the waker if present.

let handler = move || {
    let mut guard = waker_clone.lock().unwrap();
    let pair: &mut (usize, Option<Waker>) = &mut guard;
    pair.0 += 1;
    match pair.1.take() {
        None => (),
        Some(waker) => waker.wake(),
    }
};

We use a move closure to capture the waker_clone. Unlike previously with the usize child value, our Arc<Mutex<...>> is not a Copy, so we need to explicit make our clone. Next, we lock the mutex. The lock may fail, which we handle with unwrap(). This will cause a panic. Generally that's not recommended, but if taking a lock on a mutex fails, it means you have a fundamental flaw in your program. Once we have a MutexGuard, we can use it to get a mutable reference to the pair of the count and the waker.

Incrementing the count is easy enough. So is calling waker.wake(). However, we first have to call take() to get the value inside the Option and pattern match. This also replaces the waker with a None, so that the same Waker will not be triggered a second time.

By the way, if you're looking to code golf this, you can get functional with a call to map:

pair.1.take().map(|waker| waker.wake());

But personally I prefer the explicit pattern matching. Maybe it's my Haskeller ways that make me uncomfortable about performing actions inside a map, who knows.

Finally, we can finish off the new function by registering the handler and returning a Zombies value with the Arc<Mutex<...>> and the new signal ID.

let sigid = unsafe { signal_hook::register(signal_hook::SIGCHLD, handler)? };
Ok(Zombies { waker, sigid })

Dropping Zombies

When we're done with the Zombies value, we'd like to restore the original signal handler for SIGCHLD. For our application, it doesn't actually make a difference, but it may be better in general. In any event, an implementation of Drop is easy enough:

impl Drop for Zombies {
    fn drop(&mut self) {
        signal_hook::unregister(self.sigid);
    }
}

Streaming

In order to work with the async system, we need something Future-like. As mentioned though, instead of producing a single value, we'll produce a stream of values to indicate "there's a new zombie to reap." To handle that, we'll instead use the Stream trait.

There's no additional information available each time a zombie is available, so we'll use a () unit value to represent the zombie. We could define a new struct, or perhaps do something fancy where we capture the time when the signal is received. But none of that is necessary. Here's the beginning of our trait implementation:

impl Stream for Zombies {
    type Item = ();
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
        unimplemented!()
    }
}

We have an associated type Item set to (). Our poll_next receives a pinned, mutable reference to the Zombies value itself, as well as a mutable reference to the Context of the task that requested a value. We'll return a Poll<Option<()>>, which can be one of three values:

Now let's look at the implementation. The first thing we're going to do is lock the waker and see if there's a waiting zombie:

let mut guard = self.waker.lock().unwrap();
let pair = &mut guard;
if pair.0 > 0 {
    // there's a waiting zombie
} else {
    // there isn't a waiting zombie
}

In the waiting zombie case (pair.0 > 0), we want to decrement the counter and then return our Poll::Ready(Some(())). Easy enough:

pair.0 -= 1;
Poll::Ready(Some(()))

And when there isn't a waiting zombie, we want to set the Waker to our current task's Waker (discovered via the Context), and then return a Poll::Pending:

pair.1 = Some(cx.waker().clone());
Poll::Pending

And that's it, we can now produce a stream of zombies! (Sounds like a good time to move to Hollywood, right?)

Reaping

We want to now consume that stream of zombies, reaping them in the process. We want to do this until our direct child process exits. Some information about how the system calls work for reaping:

Let's create our infinite loop of waiting forever for zombies:

while let Some(()) = self.next().await {
    // time to reap
}

panic!("Zombies should never end!");

The Stream trait doesn't represent the possibility of an infinite stream, so we need to do two things:

  1. Pattern match in the while using let Some(()).
  2. Add a panic! (or just an Ok(())) after the loop to handle the case that the compiler thinks can happen: that self.next().await will return a None.

Let's go back to that .await bit though. This is the real magic of the new async/.await syntax in Rust 1.39. .await can be appended to any expression which contains an impl Future. Under the surface, the compiler is converting this into callback-laden code. From prior experience, writing that code manually is at best tedious, especially:

However, as you can see here, the code is trivial to write, read, and explain. This is a huge usability improvement for Rust. There's another incremental improvement I can potentially see here:

async for () in self {
  ...
}

But that's a minor improvement, and would require standardizing the Stream trait. I'm more than happy with the code above.

Each step of that loop, we need to call waitpid and check its result. We have four cases:

You can slice up the error handling differently, and decide to use panic!ing differently than I have, but here's my implementation:

let mut status = 0;
let pid = unsafe { libc::waitpid(-1, &mut status, libc::WNOHANG) };
if pid == till {
    return Ok(());
} else if pid == 0 {
    panic!("Impossible: I thought there was something to reap but there wasn't");
} else {
    return Err(Pid1Error::WaitpidFailed(pid));
}

Back to main

And finally, to tie it all together, let's see what the complete end of our main function looks like, including the zombie reaping code:

let sigid: signal_hook::SigId =
    unsafe { signal_hook::register(signal_hook::SIGINT, interrupt_child)? };

Zombies::new()?.reap_till(child).await?;

signal_hook::unregister(sigid);
Ok(())

And with that, we have a non-blocking, interrupt driven, user friendly grim reaper.

Conclusion

That was a detailed walkthrough of a fairly simple program. Hopefully the takeaway, however, was how simple it was to make this happen. I believe the async/.await syntax is a real game changer for Rust. While I've strongly believed in a green threaded runtimes for concurrency applications in the past, such a powerful system giving safety guarantees is very appealing. I look forward to using this in anger and comparing against both my previous tokio callback-ridden code, as well as Haskell I would write.

If you want to hear more about my endeavors here, please let me know this is a topic you're interested in. You can ping me on Twitter @snoyberg.

Also, if you want to hear more from FP Complete about Rust, please considering signing up for our mailing list.

Are you interested to hear more about commercial services at FP Complete around software, DevOps, training, and consulting? Contact us for a free consultation with one of our engineers.

Set up an engineering consultation