FP Complete


The other day I threw together a quick program to solve an annoyance some people on our team were expressing. We sometimes do our development on remote machines, but would still like to use local file editors. There are plenty of solutions for this (SSHFS, inotify+rsync), but none of us ever found a prebaked solution that was low-latency and robust enough to use regularly.

The program I put together is a simple client/server combo, where the client watches for local file changes and sends the updates to the server, which writes them to disk. This approach reduces latency significantly by keeping an open TCP connection, and can be tunneled over SSH for security. It’s simple, and the code is short and readable.

This program turned out to be a good demonstration of a few different common problem domains when writing practical, real world Haskell code:

This blog post series will build up from basics to being able to implement a program like the simple file mirror. This first post of three planned posts will cover communication protocols and streaming of data. You can also read:

Prereqs

This series will contain a number of self-contained code samples, presented as runnable scripts. In order to run these scripts, copy them into a file ending with .hs (e.g., practical.hs), and then run it with stack practical.hs). You will need to install the Stack build tool. Stack will take care of downloading and installing any compiler and libraries necessary. Because of that: your first time running a script will take a bit of time while downloading and installing.

If you’d like to test that you’re ready to proceed, try running this program:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc

main :: IO ()
main = putStrLn "Hello world!"

Also, this series of posts will use the classy-prelude-conduit library, which provides a lot of built-in functionality to make it easier to follow. If you’d like to kick off a build of that library now so it’s available when you want to use it, run:

$ stack --resolver nightly-2016-09-10 --install-ghc build classy-prelude-conduit

Textual versus binary data

We humans have an interesting tendancy to want to communicate – with each other and our computers – in human languages. Computers don’t care: they just see binary data. Often times, this overlaps just fine, specifically when dealing with the ASCII subset of the character set. But generally speaking, we need some way to distinguish between textual and binary data.

In Haskell, we do this by using two different data types: Text and ByteString. In order to convert between these two, we need to choose a character encoding, and most often we’ll choose UTF-8. We can perform this conversion with the encodeUtf8 and decodeUtf8 functions:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

main :: IO ()
main = do
    let someText :: Text
        someText = unlines
            [ "Hello, this is English."
            , "Hola, este es el español."
            , "שלום, זה עברית."
            ]
        binary :: ByteString
        binary = encodeUtf8 someText

        filePath :: FilePath
        filePath = "say-hello.txt"

    writeFile filePath binary
    binary2 <- readFile filePath :: IO ByteString

    let text2 = decodeUtf8 binary2
    putStrLn text2

If you’re coming from a language that doesn’t have this strong separation, it may feel clunky at first. But after years of experience with having a type-level distinction between textual and binary data, I can attest to the fact that the compiler has many times prevented me from making some mistakes that would have been terrible in practice. One example popped up when working on the simple-file-mirror tool itself: I tried to send the number of characters in a file path over the wire, instead of sending the number of bytes after encoding it.

The OverloadedStrings language extension lets us use string literals for all of these string-like things, including (as you saw above) file paths.

NOTE: For historical reasons which are being worked on, there is additionally a String type, which is redundant with Text but far less efficient. I mention it because you may see references to it when working with some Haskell documentation. Whenever possible, stick to Text or ByteString, depending on your actual data content.

Some simple data streaming

OK, that was a bit abstract. Let’s do something more concrete. We’re going to use the conduit library to represent streaming data. To start off simple, let’s stream the numbers 1 to 10 and print each of them:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

main :: IO ()
main = yieldMany [1..10] $$ mapM_C print

Our yieldMany function will take a sequence of values – in this case a list from 1 to 10 – and yield them downstream. For those familiar, this is similar to yielding in Python with generators. The idea is that we will build a pipeline of multiple components, each awaiting for values from upstream and yielding values downstream.

Our mapM_C print component will apply the function print to every value it receives from upstream. The C suffix is used for disambiguating conduit functions from non-conduit functions. Finally, the $$ operator in the middle is the “connect” function, which connects a source of data to a sink of data and runs it. As you might guess, the above prints the numbers 1 to 10.

We can also put other components into our pipeline, including functions that both await values from upstream and yield them downstream. For example:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

main :: IO ()
main = yieldMany [1..10] $$ mapC (* 10) =$ mapM_C print

The mapC function applies a function to each value in the stream and yields it downstream, while the =$ operator connects two components together. Take a guess at what the output from this will be, and then give it a try.

NOTE There’s a subtle but important different between $$ and =$. $$ will connect two components and then run the result to completion to get a result. =$ will connect two components without running them so that it can be further connected to other components. In a single pipeline, you’ll end up with one $$ usage.

We can also do more interesting consumption of a stream, like summing:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

main :: IO ()
main = do
    total <- yieldMany [1..10] $$ mapC (* 10) =$ sumC
    print total

    -- or more explicitly...
    total <- yieldMany [1..10] $$ mapC (* 10) =$ foldlC (+) 0
    print total

Or limit how much of the stream we want to consume:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

main :: IO ()
main = do
    total <- yieldMany [1..10] $$ mapC (* 10) =$ takeC 5 =$ sumC
    print total

This only scratches the surface of what we can do with conduit, but hopefully it gives enough of a basic intuition for the library to get started. If you’re interested in diving in deep on the conduit library, check out the previously linked tutorial.

Streaming chunked data

Having a stream of individual bytes turns out to be inefficient in practice. It’s much better to chunk a series of bytes into an efficient data structure like a ByteString. Let’s see what it looks like to stream data from a file to standard output:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit
import qualified System.IO as IO

main :: IO ()
main = do
    writeFile "some-file.txt" ("This is just some text." :: ByteString)

    -- Yes, this is clunky. We'll do something much slicker in a bit.
    IO.withBinaryFile "some-file.txt" IO.ReadMode $ fileHandle ->
           sourceHandle fileHandle
        $$ decodeUtf8C
        =$ stdoutC

This is good, but what if we want to deal with the individual bytes in the stream instead of the chunks. For example, let’s say we want to get just the first 10 bytes of our file. The takeC function we used above would take the first five chunks of data. We instead need a function which will work on the elements of the bytestrings (the individual bytes). Fortunately, conduit provides for this with the E-suffixed element-specific functions:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit
import qualified System.IO as IO

main :: IO ()
main = do
    writeFile "some-file.txt" ("This is just some text." :: ByteString)

    -- Yes, this is clunky. We'll do something much slicker in a bit.
    IO.withBinaryFile "some-file.txt" IO.ReadMode $ fileHandle ->
           sourceHandle fileHandle
        $$ takeCE 10
        =$ decodeUtf8C
        =$ stdoutC

In the simple-file-mirror program, we will be sending files over the network, and will need to limit some operations to the actual file sizes in question. Functions like takeCE will be vital for doing this.

Managing resources

While the withBinaryFile approach above worked, it felt kind of clunky. And for more complicated control flows, opening up the file in advance won’t be an option (like when we’ll only know which file to open after the network connection tells us the file path). To allow for these cases, we’re going to introduce the runResourceT, which allows us to acquire resources in an exception-safe manner. Let’s rewrite the above example with runResourceT:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

main :: IO ()
main = do
    writeFile "some-file.txt" ("This is just some text." :: ByteString)

    runResourceT
         $ sourceFile "some-file.txt"
        $$ takeCE 10
        =$ decodeUtf8C
        =$ stdoutC

Internally, sourceFile uses the bracketP function, which runs some initialization function (in our case, opening a file handle), registers some cleanup function (in our case, closing that file handle), and then performs an action with the resource. To demonstrate what that looks like more explicitly, let’s write a modified sourceFile function which will return some default file contents if the requested file can’t be read from.

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit
import qualified System.IO as IO

sourceFileDef :: MonadResource m
              => FilePath
              -> Source m ByteString
sourceFileDef fp = do
    let -- tryIO catches all IO exceptions and puts them in a Left
        -- value.
        open = tryIO $ IO.openBinaryFile fp IO.ReadMode

        -- Exception occurred, so nothing to close.
        close (Left e) =
            putStrLn $ "No file to close, got an exception: " ++ tshow e
        -- No exception, so close the file handle.
        close (Right h) = hClose h

    bracketP open close $ eitherHandle ->
        case eitherHandle of
            Left ex -> do
                yield "I was unable to open the file in question:n"
                yield $ encodeUtf8 $ tshow ex ++ "n"
            Right fileHandle -> sourceHandle fileHandle

main :: IO ()
main = runResourceT
     $ sourceFileDef "some-file.txt"
    $$ decodeUtf8C
    =$ stdoutC

Implementing our protocol

Let’s at least get started on our actual simple-file-mirror code. The wire protocol we’re going to use is defined in the README, but we can describe it briefly as:

So let’s get started with implementing the integer send/receive logic in conduit. There are many ways of doing this, some more efficient than others. For this program, I elected to go with the simplest approach possible (though you can see some historical more complicated/efficient versions). Let’s start with sending, which is pretty trivial:

#!/usr/bin/env stack
-- stack --resolver nightly-2016-09-10 --install-ghc runghc --package classy-prelude-conduit

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import ClassyPrelude.Conduit

sendInt :: Monad m => Int -> Producer m ByteString
sendInt i = yield $ encodeUtf8 $ tshow i ++ ":"

main :: IO ()
main =
    yieldMany sampleInts $$ awaitForever sendInt =$ stdoutC
  where
    sampleInts =
        [ 1
        , 10
        , -5
        , 0
        , 60
        ]

Here we’ve introduced a new function, awaitForever, which repeatedly applies a function as long as data exists on the stream. Take a guess at what the output of this program will be, and then try it out.

Now let’s try out the receiving side of this, which is slightly more complicated, but not too bad:

#!/usr/bin/env stack
{- stack --resolver nightly-2016-09-10 --install-ghc runghc
    --package classy-prelude-conduit
    --package word8
-}

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import ClassyPrelude.Conduit
import Data.Word8 (_colon)

sendInt :: Monad m => Int -> Producer m ByteString
sendInt i = yield $ encodeUtf8 $ tshow i ++ ":"

recvInt :: MonadThrow m => Consumer ByteString m Int
recvInt = do
    intAsText <- takeWhileCE (/= _colon) =$ decodeUtf8C =$ foldC
    dropCE 1 -- drop the colon from the stream
    case readMay $ unpack intAsText of
        Nothing -> error $ "Invalid int: " ++ show intAsText
        Just i -> return i

main :: IO ()
main =
       yieldMany sampleInts
    $$ awaitForever sendInt
    =$ peekForeverE (do i <- recvInt
                        print i)
  where
    sampleInts =
        [ 1
        , 10
        , -5
        , 0
        , 60
        ]

peekForeverE is similar to awaitForever, in that it repeatedly performs an action as long as there is data on the stream. However, it’s different in that it doesn’t grab the data off of the stream itself, leaving it to the action provided to do that, and it deals correctly with chunked data by ignoring empty chunks.

We’ve also introduced takeWhileCE, which is like takeCE, but instead of giving it a fixed size of the stream to consume, it continues consuming until it finds the given byte. In our case: we consume until we get to a colon. Then we decode into UTF-8 data, and use foldC to concatenate multiple chunks of Text into a single Text value. Then we use readMay to parse the textual value into an Int. (And yes, we could do a much better job at error handling, but using error is the simplest approach.)

Building on top of these two functions, it becomes a lot easier to send and receive complete file paths. Let’s put that code in here, together with a test suite to prove it’s all working as expected:

#!/usr/bin/env stack
{- stack --resolver nightly-2016-09-10 --install-ghc runghc
    --package classy-prelude-conduit
    --package word8
    --package hspec
-}

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import ClassyPrelude.Conduit
import Data.Word8 (_colon)
import Test.Hspec
import Test.Hspec.QuickCheck (prop)

sendInt :: Monad m => Int -> Producer m ByteString
sendInt i = yield $ encodeUtf8 $ tshow i ++ ":"

sendFilePath :: Monad m => FilePath -> Producer m ByteString
sendFilePath fp = do
    -- UTF-8 encode the filepath
    let bs = encodeUtf8 $ pack fp :: ByteString

    -- Send the number of bytes
    sendInt $ length bs

    -- Send the actual path
    yield bs

recvInt :: MonadThrow m => Consumer ByteString m Int
recvInt = do
    intAsText <- takeWhileCE (/= _colon) =$ decodeUtf8C =$ foldC
    dropCE 1 -- drop the colon from the stream
    case readMay $ unpack intAsText of
        Nothing -> error $ "Invalid int: " ++ show intAsText
        Just i -> return i

recvFilePath :: MonadThrow m => Consumer ByteString m FilePath
recvFilePath = do
    -- Get the byte count
    fpLen <- recvInt

    -- Read in the given number of bytes, decode as UTF-8 text, and
    -- then fold all of the chunks into a single Text value
    fpText <- takeCE fpLen =$= decodeUtf8C =$= foldC

    -- Unpack the text value into a FilePath
    return $ unpack fpText

main :: IO ()
main = hspec $ do
    prop "sendInt/recvInt are inverses" $ i -> do
        res <- sendInt i $$ recvInt
        res `shouldBe` i

    prop "sendFilePath/recvFilePath are inverses" $ fp -> do
        res <- sendFilePath fp $$ recvFilePath
        res `shouldBe` fp

We’ve used the hspec test framework library and its QuickCheck support to create a test suite which automatically generates test cases based on the types in our program. In this case, it will generate 100 random Ints and 100 random FilePaths each time it’s run, and ensure that our properties hold. This is a great way to quickly get significant test coverage for a program.

Sending the files themselves

OK, finally time to put all of this together. We’re going to add in some new functions for sending and receiving files themselves. This is a fairly simple composition of all of the work we’ve done until now. And this is the nice thing about Haskell in general, as well as the conduit library: purity and strong typing often make it possible to trivially combine smaller functions into more complicated beasts. Let’s see this all in action:

#!/usr/bin/env stack
{- stack --resolver nightly-2016-09-10 --install-ghc runghc
    --package classy-prelude-conduit
    --package word8
    --package hspec
    --package temporary
-}

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import ClassyPrelude.Conduit
import Data.Word8            (_colon)
import System.Directory      (createDirectoryIfMissing, doesFileExist,
                              removeFile)
import System.FilePath       (takeDirectory)
import System.IO             (IOMode (ReadMode), hFileSize, openBinaryFile)
import System.IO.Temp        (withSystemTempDirectory)
import Test.Hspec
import Test.Hspec.QuickCheck (prop)

sendInt :: Monad m => Int -> Producer m ByteString
sendInt i = yield $ encodeUtf8 $ tshow i ++ ":"

sendFilePath :: Monad m => FilePath -> Producer m ByteString
sendFilePath fp = do
    -- UTF-8 encode the filepath
    let bs = encodeUtf8 $ pack fp :: ByteString

    -- Send the number of bytes
    sendInt $ length bs

    -- Send the actual path
    yield bs

sendFile :: MonadResource m
         => FilePath -- ^ root directory
         -> FilePath -- ^ path relative to root
         -> Producer m ByteString
sendFile root fp = do
    -- Send the relative file path, so the other side knows where to
    -- put the contents
    sendFilePath fp

    let open = tryIO $ openBinaryFile fpFull ReadMode

        -- If the opening failed, we'll have an error message. So
        -- there's nothing to close, just do nothing!
        close (Left _err) = return ()
        -- Opening succeeded, so close the file handle.
        close (Right h) = hClose h

    -- Grab the file handle...
    bracketP open close $ eh ->
        case eh of
            -- No file, send a -1 length to indicate file does not
            -- exist
            Left _ex -> sendInt (-1)

            -- File exists
            Right h -> do
                -- Send the size of the file
                size <- liftIO $ hFileSize h
                sendInt $ fromInteger size

                -- And stream the contents
                sourceHandle h
  where
    fpFull = root </> fp

recvInt :: MonadThrow m => Consumer ByteString m Int
recvInt = do
    intAsText <- takeWhileCE (/= _colon) =$ decodeUtf8C =$ foldC
    dropCE 1 -- drop the colon from the stream
    case readMay $ unpack intAsText of
        Nothing -> error $ "Invalid int: " ++ show intAsText
        Just i -> return i

recvFilePath :: MonadThrow m => Consumer ByteString m FilePath
recvFilePath = do
    -- Get the byte count
    fpLen <- recvInt

    -- Read in the given number of bytes, decode as UTF-8 text, and
    -- then fold all of the chunks into a single Text value
    fpText <- takeCE fpLen =$= decodeUtf8C =$= foldC

    -- Unpack the text value into a FilePath
    return $ unpack fpText

recvFile :: MonadResource m
         => FilePath -- ^ directory to store files in
         -> Sink ByteString m ()
recvFile root = do
    -- Get the relative path to store in
    fpRel <- recvFilePath

    -- Prepend with the root directory to get a complete path
    let fp = root </> fpRel

    -- Get the size of the file
    fileLen <- recvInt

    if fileLen == (-1)
        -- We use -1 to indicate the file should be removed. Go ahead
        -- and call removeFile, but ignore any IO exceptions that
        -- occur when doing so (in case the file doesn't exist
        -- locally, for example)
        then liftIO $ void $ tryIO $ removeFile fp
        else do
            -- Create the containing directory
            liftIO $ createDirectoryIfMissing True $ takeDirectory fp

            -- Stream out the specified number of bytes and write them
            -- into the file
            takeCE fileLen =$= sinkFile fp

main :: IO ()
main = hspec $ do
    prop "sendInt/recvInt are inverses" $ i -> do
        res <- sendInt i $$ recvInt
        res `shouldBe` i

    prop "sendFilePath/recvFilePath are inverses" $ fp -> do
        res <- sendFilePath fp $$ recvFilePath
        res `shouldBe` fp

    -- A more standard unit test, checking that sending and receiving
    -- a file does what is expected.
    it "create and delete files" $
      -- Get temporary source and destination directories
      withSystemTempDirectory "src" $ srcDir ->
      withSystemTempDirectory "dst" $ dstDir -> do

        let relPath = "somepath.txt"
            content = "This is the content of the file" :: ByteString

        -- Ensure that sending a file that exists makes it appear in
        -- the destination
        writeFile (srcDir </> relPath) content

        runResourceT $ sendFile srcDir relPath $$ recvFile dstDir

        content' <- readFile (dstDir </> relPath)
        content' `shouldBe` content

        -- Ensure that sending a file that doesn't exist makes it
        -- disappear in the destination
        removeFile (srcDir </> relPath)

        runResourceT $ sendFile srcDir relPath $$ recvFile dstDir

        exists <- doesFileExist (dstDir </> relPath)
        exists `shouldBe` False

Our sendFile function looks very similar to our sourceFileDef example at the beginning of the post. But instead of streaming a default value, we just send a length of -1, as our protocol dictates. The recvFile function relies heavily on recvFilePath and recvInt. In the case of a -1, it removes the file in question. Otherwise, it creates the containing directory if necessary, and then composes takeCE with sinkFile to stream the correct number of bytes into the file.

We also have a unit test covering the interaction of these two new functions. While some kind of property could perhaps be devised for testing this with QuickCheck, a more standard unit test seemed far more straightforward in this case.

Next time on Practical Haskell

This part of the tutorial covered quite a number of topics, so this is a good place to take a break. Next time, we’ll dive into the network communication aspect of things, including:

If you have feedback on how to make this tutorial series more useful, please share it in the comments below, on Twitter (@snoyberg), or in any Reddit discussions about it. I’ll try to review feedback and make changes to parts 2 and 3.

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Tagged