FP Complete


Introduction

At FP Complete we develop many tools for our clients to help them achieve their goals. Most of these tools are written in Haskell (and, more recently, some are in Rust), and that has helped us write them more quickly and produce more maintainable apps going forward.

In this post I would like to describe one of those tools which should be of interest to any person or company which has a present or legacy database with SQL server.

A client was migrating from SQL Server to PostgreSQL across the board, however, like in many large companies, they had many databases spread across the country in different servers and used by many people in different departments. It is not so easy to simply replace a database that many people are using directly. They still wanted to provide a SQL Server query interface to their various departments, so they asked us whether we could develop a standalone service that could pretend to be SQL Server but really talk to any JSON service behind the scenes. We did! This article takes a look at how it was done.

Requirements and Architecture

The high-level architecture of the system looks like this:

No alt provided

Requirements:

So, a user of the system would write a regular SQL Server query, such as SELECT * FROM "customer" and that would be sent to a fake SQL server, which in turn would query the real background service, whatever that is, and return results back to the user.

Approach

In order to implement a fake SQL Server, we need:

I looked into the protocol used by SQL Server and its clients. It’s called TDS, which means “tabular data stream”. It was initially designed and developed by Sybase Inc. for their Sybase SQL Server relational database engine in 1984. Later it was adopted by Microsoft for Microsoft SQL Server. It is a fairly straightforward binary protocol, as binary protocols go. Microsoft have documentation for the TDS protocol, which you can eyeball in your own time.

Regarding a client program, today it is much easier to get access to Microsoft client libraries in Linux or Mac OS X, thanks to their releases of their ODBC package. You can now get it on Linux and macOS! I was surprised too! This made it easy to write a test suite for our server.

How TDS works

TDS is a binary protocol. Communication is done using messages, where each message is a sequence of so-called packets, each consisting of a header and (usually, but not always) a payload:

Packet

Header Payload

The header is 8 bytes long and described by the table below. I’ve crossed out the ones we don’t use.

Header

Field Type Description
Type Word8 Not used by us
Status Word8 0 = normal message, 1 = end of message
Length Word16 Length of packet big endian
SPID Word16 Not used by us
Packet Word8 Not used by us and ignored by SQL Server
Window Word8 Not used, should be ignored

It turns out we only need the status and length fields!

A typical scenario:

For example:

We dealt with both types of messages, but we’ll just look at simple batch queries in this article.

stack-column-center

Server Architecture

My second step was to start up a Haskell project and plan the libraries that I would use for this task.

I used these Haskell packages:

In diagram form, that pipeline looks like this:

No alt provided

We’ll look at each one in detail.

Stream processing: conduit

Conduit is used to achieve streaming. Other languages such as Rust call these iterators, or in Python they are called generators (data producers) and coroutines (data consumers). In Haskell it’s implemented as a normal library. It is a conceptually simple streaming API, consisting of two really key pieces:

We make use of these things in the server and so do all the libraries that I use for this task.

Consider this example pipeline,

pipeline =
  source .| conduit1 .| conduit2 .| sink

(Note: the .| can be read like a UNIX pipe.)

The source may be a socket, a file or a database. These three kinds of I/O sources of chunks can yield bytes, or e.g. rows from a database query. In a conduit pipeline, the sink (the final thing in the pipeline) drives the computation, creating a domino effect back down the pipline every time it awaits for input, all the way back to the source, which consults some external provider (the socket, file or database).

No alt provided

I used conduit-extra because you can put together a listening service in about 3 lines. That looks like this:

#!/usr/bin/env stack
-- stack --resolver lts-12.12 script
{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit.Network
import Conduit

main =
  runTCPServer
    (serverSettings 2019 "*")
    (app -> do
       putStrLn "Someone connected!"
       runConduit (appSource app .| mapMC print .| sinkNull)
       putStrLn "They disconnected!"
       pure ())

In this example, the source conduit coming from the socket is piped into a conduit (mapMC print) that just prints each chunk coming in. Finally, that’s piped to sinkNull which consumes all its input and discards it.

Running it looks like this:

$ ./conduit-tcp-server.hs
Someone connected!
"hin"
They disconnected!

We’re going to build up a simple example server incrementally.

If you want to follow along and run these Haskell examples on your computer, they can be run conveniently as regular scripts when you have stack installed. Check out our get started page and follow the instructions to get setup.

Type-safety

Most libraries in Haskell pride themselves on type-safety and streaming libraries like Conduit are no exception. Let’s have a brief look at that. This is the type for a conduit:

data ConduitT input output monad return

It means “a conduit has an input, an output, runs in some monad, and has a final return value”.

So for example, yield has this type:

yield :: Monad m => input -> ConduitT input output monad ()

It yields an input downstream, and returns unit ().

Whereas await has this type:

await :: Monad m => ConduitT input output monad (Maybe output)

It awaits for an output from upstream, and might return Just that, if there is anything left upstream. Otherwise it returns Nothing.

A function like map and filter also have educational types:

map :: Monad m => (input -> output) -> ConduitT input output monad ()
filter :: Monad m => (input -> Bool) -> ConduitT input input monad ()

Finally, plugging them together has to have the correct types:

filter (> 5) .| map show .| filter (=/ "6") .| ...

This lets us plug pieces together like LEGO bricks, confident that our composition is correct. The inputs, outputs and returns all have to match up.

Incremental parsing: attoparsec

I used attoparsec to parse the binary protocol. attoparsec is a parser combinator library which supports incomplete parsing of input i.e. parsing data in chunks.

Here’s a simple, closed example:

#!/usr/bin/env stack
-- stack --resolver lts-12.12 script
{-# LANGUAGE OverloadedStrings #-}
import qualified Data.ByteString as S
import qualified Data.Attoparsec.ByteString as P

main =
  case P.parseOnly myparser (S.pack [2, 97, 98]) of
    Right result -> print result
    Left err -> putStrLn err
  where
    myparser = do
      len <- P.anyWord8
      bytes <- P.take (fromIntegral len)
      return bytes

Which outputs the following:

$ ./attoparsec-example.hs
"ab"

If I create a streaming program that feeds one-byte chunks into the parser like this:

#!/usr/bin/env stack
-- stack --resolver lts-12.12 script
{-# LANGUAGE OverloadedStrings #-}
import qualified Data.ByteString as S
import qualified Data.Attoparsec.ByteString as P

main = loop (P.parse myparser) (S.pack [2, 97, 98])
  where
    loop eater input =
      do putStrLn ("Chunk " ++ show (S.unpack chunk))
         case eater chunk of
           P.Done _ value -> print value
           P.Fail _ _ err -> putStrLn err
           P.Partial next -> do
             putStrLn "Waiting for more ..."
             loop next remaining
      where (chunk, remaining) = S.splitAt 1 input
    myparser = do
      len <- P.anyWord8
      bytes <- P.take (fromIntegral len)
      return bytes

We create a loop with an “eater”. The eater eats chunks of bytes. It produces either a done/fail result, or the next eater that is ready for the next chunk.

The output is:

$ ./attoparsec-feeding.hs
Chunk [2]
Waiting for more ...
Chunk [97]
Waiting for more ...
Chunk [98]
"ab"

Note however that myparser didn’t change. It doesn’t care how the input is chunked. It’s patient. The end result is the same. This works very well with conduit, and naturally, conduit has integration with attoparsec from the conduit-extra package.

The above code can be rewritten in conduit as:

#!/usr/bin/env stack
-- stack --resolver lts-12.12 script
{-# LANGUAGE OverloadedStrings #-}
import qualified Data.ByteString as S
import qualified Data.Attoparsec.ByteString as P
import Conduit
import Data.Conduit.List
import Data.Conduit.Attoparsec

main = do
  result <- runConduit (sourceList chunks .| sinkParserEither myparser)
  case result of
    Left err -> print err
    Right val -> print val
  where
    chunks = [S.pack [2], S.pack [97], S.pack [98]]
    myparser = do
      len <- P.anyWord8
      bytes <- P.take (fromIntegral len)
      return bytes

Producing:

$ ./attoparsec-conduit.hs
"ab"

Looking at this example and the one above it, it’s easy to imagine how sinkParserEither operates. It consumes chunks from the upstream, and incrementally feeds them to myparser until a result happens.

Efficient binary writing: bytestring

Finally, to generate messages for the TDS protocol, I can efficiently generate binary streams using the bytestring Builder abstraction. Essentially, this abstraction let you insert strings of bytes into a buffer and they are appended efficiently. Additionally, these can be written to a socket or file incrementally in a streaming fashion, and therefore are also memory efficient.

Specifically for our use-case, it makes it trivial to output a binary format that involves word8s, word16s and to differentiate little-endian vs big-endian encodings easily.

For example to build our example above, we’d use <> to combine chunks together:

#!/usr/bin/env stack
-- stack --resolver lts-12.12 script
{-# LANGUAGE OverloadedStrings #-}
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString.Builder as BB
main = L.putStr (BB.toLazyByteString (BB.word8 2 <> "ab"))
$ ./builder-example.hs
ab

Or it can be written as a conduit:

#!/usr/bin/env stack
-- stack --resolver lts-12.12 script
{-# LANGUAGE OverloadedStrings #-}
import           System.IO
import           Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import qualified Data.ByteString.Lazy as L
import qualified Data.Conduit.ByteString.Builder as CB
import qualified Data.ByteString.Builder as BB

main =
  runConduitRes
    (CL.sourceList [BB.word8 2 <> "ab"] .| CB.builderToByteString .|
     CB.sinkHandle stdout)

That is, produce a source of a builder. Feed it to a conduit which converts builders to streams of bytestrings, then feed that to a sink that writes to stdout.

Implementation: the parser

With all that in mind, the message parser looks like this:

messageParser :: Parser (Packet ClientMessage)
messageParser = do
  header <- headerParser
  let payloadLength = fromIntegral (headerLength header - headerSize)
  message <-
    case headerType header of
      0x01 -> sqlbatchParser payloadLength
      0x12 -> preloginParser payloadLength
      0x0E -> transactionParser
      0x03 -> rpcParser payloadLength
      0x10 -> loginParser
      0x06 -> attentionParser
      _ -> do
        payload <- Atto.take payloadLength
        pure (UnknownMessage payload)
  pure (Packet header message)

For example, sqlbatchParser is

sqlbatchParser :: Int -> Parser ClientMessage
sqlbatchParser messageLen = do
  start <- parserPosition
  _headers <- allHeadersParser
  end <- parserPosition
  text <- Atto.take (messageLen - (end - start))
  pure (SQLBatchMessage (T.decodeUtf16LE text))

We actually ignore the headers and are just interested in the SQL query. We grab that from the stream and decode it as UTF16 little-endian, which is the specified text format for TDS.

An example looks like this, in hex editor format:

01 01 00 5C 00 00 01 00            . . . \ . . . .
16 00 00 00 12 00 00 00            . . . . . . . .
02 00 00 00 00 00 00 00            . . . . . . . .
00 01 00 00 00 00 0A 00            . . . . . . . .
73 00 65 00 6C 00 65 00            s . e . l . e .
63 00 74 00 20 00 27 00            c . t .   . ' .
66 00 6F 00 6F 00 27 00            f . o . o . ' .
20 00 61 00 73 00 20 00              . a . s .   .
27 00 62 00 61 00 72 00            ' . b . a . r .
27 00 0A 00 20 00 20 00            ' . . .   .   .
20 00 20 00 20 00 20 00              .   .   .   .
20 00 20 00                          .   .

Or in tabular format:

Part Contents Meaning
Type 01 SQL Batch request
Status 01 Last packet
Length 00 5C 92
SPID 00 00 0
Packet 01 1
Window 00 0
Headers 16 00 00 00 12 00 00 00 02 00 00 00 00 00 00 00 00 01 00 00 00 00 Not important
SQL 0A 00 73 00 65 00 6C 00 65 00 63 00 74 00 20 00 27 00 66 00 6F 00 6F 00 27 00 20 00 61 00 73 00 20 00 27 00 62 00 61 00 72 00 27 00 0A 00 20 00 20 00 20 00 20 00 20 00 20 00 20 00 20 00 "\nselect 'foo' as 'bar'\n "

The other parsers follow the same patterns. Remember that everything in attoparsec is incremental, so we don’t have to worry about boundaries and packets being spread over multiple chunks. The parser is fed until it’s satisfied.

Implementation: rendering

As it happens, for rendering replies to the server, there are really three server messages of interest. Sending login acceptance, prelogin acknowledgement, and so-called “general response”, which is what we’re interested in for the purpose of returning results from queries. The GeneralResponse consists of a vector of response tokens Vector ResponseToken. But for this article, let’s just focus on Row.

data ResponseToken = ... | Row !(Vector Value)

Specifically, the implementation to render a response token is:

renderResponseToken :: ResponseToken -> Builder
renderResponseToken =
  case
    ...
    Row vs -> word8 0xD1 <> foldMap renderValue (V.toList vs)

For returning rows for a query, we have Value:

data Value
  = TextValue !Text
  | BoolValue !Bool
  | DoubleValue !Double
  | Int32Value !Int32
  | Int16Value !Int16

Which is easy to render as a builder:

renderValue :: Value -> Builder
renderValue =
  case
    TextValue t -> word16LE (fromIntegral (T.length t * 2)) <> byteString (T.encodeUtf16LE t)
    DoubleValue d -> doubleLE d
    BoolValue b -> word8 (if b then 1 else 0)
    Int16Value i -> int16LE i
    Int32Value i -> int32LE i

Remember the properties of Builder? It incrementally builds the data structure. The conduit requesting chunks from it will request the next chunk, and write that to the socket. We’ll look at memory use of the server later and confirm that it’s O(1) in the number of rows.

Implementation: ferrying messages

To handle a connection we have two conduits:

  1. One to provide the incoming stream, also known as the source.
  2. One to provide the outgoing stream, also known as the sink.

We simply need to consume the incoming stream and dispatch on the message in a loop. Let’s show a more digestible version of that with our simple example we’ve been using.

#!/usr/bin/env stack
-- stack --resolver lts-12.12 script
{-# LANGUAGE OverloadedStrings #-}
import           Data.Conduit.Network
import qualified Data.ByteString as S
import qualified Data.Attoparsec.ByteString as P
import           Conduit
import           Data.Conduit.Attoparsec

main =
  runTCPServer
    (serverSettings 2019 "*")
    (app -> do
       putStrLn "Someone connected!"
       runConduit (appSource app .| conduitParserEither parser .| handlerSink app))
  where
    handlerSink app = do
      mnext <- await
      case mnext of
        Nothing -> liftIO (putStrLn "Connection closed.")
        Just eithermessage ->
          case eithermessage of
            Left err -> liftIO (print err)
            Right (position, message) -> do
              liftIO (print message)
              liftIO (runConduit (yield "Thanks!n" .| appSink app))
              handlerSink app
    parser = do
      len <- P.anyWord8
      bytes <- P.take (fromIntegral len)
      return bytes

Here’s what’s going on:

  1. We run a server and then immediately begin a conduit which consumes chunks from the socket.
  2. We pipe those chunks into the attoparsec parser, yielding (Either ParseError (PositionRange, ByteString)) where the ByteString is the parsed thing we want.
  3. We feed that into our handlerSink which consumes those parse results one at a time.
  4. If await produces Nothing, that’s the end of the stream.
  5. If the eithermessage is Left, then we have a parse error. We probably want to end here and print the error. So that’s what we do.
  6. If it’s Right, then we print out the message, send back "Thanks" and continue our sink loop.

Here’s what the behaviour looks like:

$ cat > writer.hs
main = putStr "5Hello5World"
chris@precision:~$ stack ghc -- writer.hs -o writer -v0
chris@precision:~$ ./writer | nc localhost 2019
Thanks!
Thanks!
C-c

The server reports:

$ ./conduit-atto.hs
Someone connected!
"Hello"
"World"
Connection closed.

Just as we’d expected! The fake SQL Server behaves in the same way, but using the parsers of the packets we’ve looked at briefly, and the renderers we’ve also looked at.

It’s worth taking a moment to digest the accomplishment of being able to handle a binary protocol this trivially. That’s thanks to the abstractions provided by attoparsec and conduit.

Testing opportunities

Testing was easy because using we’re using high-level abstractions. The conduit handling code is decoupled from the attoparsec parsing code and the rendering code.

For testing random outputs, we can implement an instance of QuickCheck’s Arbitrary,

instance Test.QuickCheck.Arbitrary Value where
  arbitrary = oneof [text, int32, int16, double, bool]
    where
      text = (TextValue . T.pack) <$> Test.QuickCheck.arbitrary
      bool = (BoolValue) <$> Test.QuickCheck.arbitrary
      double = DoubleValue <$> Test.QuickCheck.arbitrary
      int32 = Int32Value <$> Test.QuickCheck.arbitrary
      int16 = Int16Value <$> Test.QuickCheck.arbitrary

to generate a random Value for a row column:

> Test.QuickCheck.generate (Test.QuickCheck.arbitrary :: Test.QuickCheck.Gen Value)
BoolValue True
> Test.QuickCheck.generate (Test.QuickCheck.arbitrary :: Test.QuickCheck.Gen Value)
Int16Value 10485
> Test.QuickCheck.generate (Test.QuickCheck.arbitrary :: Test.QuickCheck.Gen Value)
TextValue "2130FcfSI:r*^228185R|239yk246D0~204Z255d138^P"

With this, we can setup a test suite that generates a vector of values, Vector Value, runs our server, connects to it, makes a request, makes the server return that vector of values, and then check that the ODBC library’s returned row matches what we wanted the server to return. Easy!

Exploring memory use

Let’s take a look at the memory use of this server. Here’s a simple server that accepts a number of rows to yield, and reads that many rows from a CSV file:

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
import qualified Data.Map.Strict as M
import           Data.Map.Strict (Map)
import           Data.ByteString (ByteString)
import qualified Data.Conduit.List as CL
import           Data.CSV.Conduit
import qualified Data.Conduit.Binary as CB
import           Data.Conduit.Network
import qualified Data.Attoparsec.ByteString as P
import           Conduit
import           Data.Conduit.Attoparsec

main =
  runTCPServer
    (serverSettings 2019 "*")
    (app -> do
       putStrLn "Someone connected!"
       runConduit
         (appSource app .| conduitParserEither parser .| handlerSink app))
  where
    handlerSink app = do
      mnext <- await
      case mnext of
        Nothing -> liftIO (putStrLn "Connection closed.")
        Just eithermessage ->
          case eithermessage of
            Left err -> liftIO (print err)
            Right (position, lineCount) -> do
              liftIO (print lineCount)
              liftIO
                (runConduitRes
                   (CB.sourceFile "fake-db.csv" .| intoCSV defCSVSettings .|
                    CL.mapMaybe
                      (M.lookup "Name" :: Map ByteString ByteString -> Maybe ByteString) .|
                    CL.map (<> "n") .|
                    CL.isolate lineCount .|
                    appSink app))
              handlerSink app
    parser = do
      len <- P.anyWord8
      return (fromIntegral len * 1000)

Here it simply reads a word8 value in from the input as “n thousand”. We then load up fake-db.csv in a streaming fashion, converting the bytes into CSV rows, lookup the "Name" field for each row, if any, append a newline to each name, and then isolate takes lineCount results.

Let’s compile this with runtime options enabled:

stack ghc ./sql-dummy.hs --resolver lts-12.12 -- -rtsopts

And run it with statistics output. Here is what happens when we ask for 1,000 rows:

$ ./sql-dummy +RTS -s
Someone connected!
1000
       9,149,904 bytes allocated in the heap
          36,848 bytes copied during GC
         120,112 bytes maximum residency (2 sample(s))
          35,680 bytes maximum slop
               2 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0         7 colls,     0 par    0.000s   0.000s     0.0001s    0.0001s
  Gen  1         2 colls,     0 par    0.000s   0.001s     0.0004s    0.0006s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.022s  (  4.387s elapsed)
  GC      time    0.001s  (  0.001s elapsed)
  EXIT    time    0.000s  (  0.000s elapsed)
  Total   time    0.023s  (  4.388s elapsed)

  %GC     time       2.4%  (0.0% elapsed)

  Alloc rate    422,940,926 bytes per MUT second

  Productivity  96.2% of total user, 100.0% of total elapsed

And here is what happens when we ask for 2,000 rows, via:

$ printf 'x02' | nc 127.0.0.1 2019

We get:

$ ./sql-dummy +RTS -s
Someone connected!
2000
      18,125,240 bytes allocated in the heap
          43,656 bytes copied during GC
         120,112 bytes maximum residency (2 sample(s))
          35,680 bytes maximum slop
               2 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0        16 colls,     0 par    0.000s   0.001s     0.0000s    0.0001s
  Gen  1         2 colls,     0 par    0.000s   0.001s     0.0003s    0.0006s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.029s  (  6.247s elapsed)
  GC      time    0.001s  (  0.001s elapsed)
  EXIT    time    0.000s  (  0.000s elapsed)
  Total   time    0.030s  (  6.248s elapsed)

  %GC     time       2.5%  (0.0% elapsed)

  Alloc rate    628,824,590 bytes per MUT second

  Productivity  96.6% of total user, 100.0% of total elapsed

Here are the differences:

That means that we never had to exceed the size of the largest row in the CSV for our total resident memory usage. Perfect for a server that needs to be memory efficient and shuffle lots of data around!

Circling back

Let’s go back and look at our original goals:

  1. To make a pretend SQL Server. We looked at the TDS protocol and some examples of how that works.
  2. To have clean, simple codebase. We looked through the great libraries that plug together nicely to make this happen.
  3. To have a streaming architecture so that memory use is constant. We looked at a real code sample of how this works and the results we see.

The development experience was smooth because we were able to focus on the things that matter, like the protocol at hand, and not boundaries, buffer sizes, scale problems or memory issues.

Go FP Complete

If you’re a Haskell company that needs consulting work like this, get in touch!

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Tagged