FP Complete

FIXME This module needs to be updated to discuss the UnliftIO.Async module instead of async itself.

Software Transactional Memory, or STM, is a technique for storing mutable variables in Haskell. Unlike other mutable variables in Haskell, or mutable variables in most languages, it provides transactional capabilities. This is best understood by looking at a typical failure case in other mutable variable techniques:

runServer (|request| => {
  from := accounts.lookup(request.from)
  to := accounts.lookup(request.to)
  accounts.set(request.from, from - request.amt)
  accounts.set(request.to, to + request.amt)
})

This kind of pseudo-code is vulnerable to a major race condition, where two threads both read an account’s current value at the same time, and then update the value, the second thread overwriting changes by the first. This kind of problem is usually solved by locking, which brings on its own concern about deadlocks. Instead, let’s see how this is accomplished with STM:

runServer $ request -> atomically $ do
  let fromVar = lookup (from request) accounts
      toVar = lookup (to request) accounts
  origFrom <- readTVar fromVar
  writeTVar fromVar (origFrom - amt request)
  origTo <- readTVar toVar
  writeTVar toVar (origTo + amt request)

Notice that we’re doing the same style of updates as the bad example. However, with STM, this is safe. Within atomically, all reads and writes to a transactional variable—or TVar—are tracked. At the end of the transaction, the runtime checks if any of those variables have been updated since we looked at them.

If not, it commits the transaction, updating all of the variables at once in a consistent state. If some variables were changed by another thread in the interim, our results are thrown away, and the transaction is retried. Thanks to Haskell’s purity, we know that no side effects occur within the atomically call, making this a safe operation.

Basics

STM comes with a few basic types and operations, and builds a rich ecosystem from them.

These types and functions, along with many more, are exposed from the Control.Concurrent.STM module in the stm package.

EXERCISE Fill out the implementation of the following program so that it gives the output provided below.

#!/usr/bin/env stack
-- stack --resolver lts-12.21 script
import Control.Concurrent.STM
import Control.Monad (replicateM_)

makeCounter :: IO (IO Int)
makeCounter = do
  var <- newTVarIO 1
  return undefined

main :: IO ()
main = do
  counter <- makeCounter
  replicateM_ 10 $ counter >>= print

Should print:

1
2
3
4
5
6
7
8
9
10

Failure, retry, and alternative

One of the most powerful concepts in STM is the ability to retry. As a motivating example: let’s say we have two TVars, representing the bank accounts for Alice and Bob. Alice makes $5 every second (pretty nice!), and wants to give Bob $20. Our code might initially look like:

#!/usr/bin/env stack
-- stack --resolver lts-12.21 script
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (forever)
import Say

main :: IO ()
main = do
  aliceVar <- newTVarIO 0
  bobVar <- newTVarIO 0

  _ <- forkIO $ payAlice aliceVar

  atomically $ do
    currentAlice <- readTVar aliceVar
    writeTVar aliceVar (currentAlice - 20)
    currentBob <- readTVar bobVar
    writeTVar bobVar (currentBob + 20)

  finalAlice <- atomically $ readTVar aliceVar
  finalBob <- atomically $ readTVar bobVar

  sayString $ "Final Alice: " ++ show finalAlice
  sayString $ "Final Bob: " ++ show finalBob

payAlice :: TVar Int -> IO ()
payAlice aliceVar = forever $ do
  threadDelay 1000000
  atomically $ do
    current <- readTVar aliceVar
    writeTVar aliceVar (current + 5)
  sayString "Paid Alice"

There are no race conditions, thanks to STM. But this program is still buggy, at least at the logic bug level. The issue is that we allow Alice to give money she doesn’t have! Let’s look at our output:

Final Alice: -20
Final Bob: 20

Instead, we want to check that Alice’s balance is at least $20 before we let her transfer the money. In order to do this, we just need to use one new helper function:

check :: Bool -> STM ()
check b = if b then return () else retry

As the implementation indicates, this function will do nothing if b is true, but will retry if it’s false. This is the second bit of magic in STM. Since STM needs to track all of the variables it has looked at in order to handle transactions, it knows exactly what led to its current state. If you call retry, you’re saying to STM, “I don’t like this result. Run me again when one of the variables I looked at changed, and I’ll decide if things are OK now.”

EXERCISE Use the check function to modify the program above so Alice’s bank balance is never negative. The output should look something like this:

Paid Alice
Paid Alice
Paid Alice
Paid Alice
Final Alice: 0
Final Bob: 20

On top of this retry behavior, STM also implements an Alternative instance which allows us to try a number of different transactions until one of them succeeds. For example, let’s say both Alice and Bob are trying to give Charlie $20. We can wait to see who sends the money first.

#!/usr/bin/env stack
-- stack --resolver lts-12.21 script
import Control.Applicative ((<|>))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (forever, void)
import Say

main :: IO ()
main = do
  aliceVar <- newTVarIO 0
  bobVar <- newTVarIO 0
  charlieVar <- newTVarIO 0

  payThread aliceVar 1000000 5
  payThread bobVar   1500000 8

  atomically $ transfer 20 aliceVar charlieVar
           <|> transfer 20 bobVar   charlieVar

  finalAlice <- atomically $ readTVar aliceVar
  finalBob <- atomically $ readTVar bobVar
  finalCharlie <- atomically $ readTVar charlieVar

  sayString $ "Final Alice: " ++ show finalAlice
  sayString $ "Final Bob: " ++ show finalBob
  sayString $ "Final Charlie: " ++ show finalCharlie

payThread :: TVar Int -> Int -> Int -> IO ()
payThread var interval amount = void $ forkIO $ forever $ do
  threadDelay interval
  atomically $ do
    current <- readTVar var
    writeTVar var (current + amount)

transfer :: Int -> TVar Int -> TVar Int -> STM ()
transfer amount fromVar toVar = do
  currentFrom <- readTVar fromVar
  check (currentFrom >= amount)
  writeTVar fromVar (currentFrom - amount)
  currentTo <- readTVar toVar
  writeTVar toVar (currentTo + amount)

The runtime system will try to transfer money from Alice. If that fails, it will try to transfer money from Bob. If that fails, it will wait until either Alice or Bob’s account balance changes, and then try again.

QUESTION Would it be possible to get the desired behavior if transfer called atomically itself and returned IO () instead of STM ()? If so, write the program. If not, why not?

Other helper functions

You may have already discovered in the previous exercises that there are a number of other helper functions to work with TVars. One example is:

modifyTVar :: TVar a -> (a -> a) -> STM ()

If you’re accustomed to dealing with thread-safe code, you may expect this to use a special implementation internally to perform some locking. But remember: with STM, locking is unnecessary. Instead, this is nothing more than a convenience function, with the very simple implementation:

modifyTVar :: TVar a -> (a -> a) -> STM ()
modifyTVar var f = do
    x <- readTVar var
    writeTVar var (f x)

EXERCISE This modify function is lazy. Can you change it to be strict?

Why newTVarIO?

One of the helper functions available is:

newTVarIO :: a -> IO (TVar a)

It may seem like this should have the obvious implementation of atomically . newTVar. However, this implementation would be bad for two reasons. The first is that it’s inefficient: it requires all of the machinery for running a transaction, when by its nature we know that creating a new TVar will never fail.

The second is more subtle. Let’s say we’re going to follow the common (albeit arguably evil) practice of creating a global mutable variable. You’ll end up with some code that looks like this:

#!/usr/bin/env stack
-- stack --resolver lts-12.21 script
import Control.Concurrent.STM
import Control.Monad (replicateM_)
import System.IO.Unsafe (unsafePerformIO)

callCount :: TVar Int
callCount = unsafePerformIO $ atomically $ newTVar 0
{-# NOINLINE callCount #-}

someFunction :: IO ()
someFunction = do
  count <- atomically $ do
    modifyTVar callCount (+ 1)
    readTVar callCount
  putStrLn $ "someFunction call count: " ++ show count

main :: IO ()
main = replicateM_ 10 someFunction

Besides the nauseating call to unsafePerformIO, everything looks fine. Unfortunately, running this application fails:

Main.hs: Control.Concurrent.STM.atomically was nested

The issue is that, in order to properly implement STM, you cannot embed one call to atomically inside another call to atomically. “But wait,” you exclaim, “the types prevent that from ever happening! You can’t run an IO action inside an STM action!” The problem is that unsafePerformIO let us do just this. callCount starts as a thunk which, when evaluated, calls atomically. And the first time it’s evaluated is at modifyTVar callCount (+ 1), which is also inside atomically!

For these two reasons, the newTVarIO function exists. This isn’t a promotion of global variables, but simply an explanation: if you’re going to use them, do them correctly.

There’s also a readTVarIO function available. This one is present purely for performance reasons, as reading a TVar is always a non-failing operation.

EXERCISE Fix up the code above so that it doesn’t throw an exception.

QUESTION Is it possible to use readTVarIO in this program? Is it safe? Would it still be safe if we introduced some concurrency?

Other variable types

TVars are the core variable type in STM. However, as a convenient, the stm library provides a number of other variable types built on top of it. We’ll demonstrate a few here.

TMVars

Channels and queues

There are three related variable types in stm:

In addition to these types, the stm-chans library provides a number of additional channel and queue types, including variants which can be closed. Let’s use one of these to explore the basic APIs a bit, and implement a concurrent URL downloader in the process.

NOTE This example takes advantage of the wonderful async library, which goes hand-in-hand with STM. Once you’ve finished this tutorial, it’s strongly advised to go and read about async to get the rest of the story with concurrency in Haskell. We’re also using the http-conduit library for HTTP requests.

#!/usr/bin/env stack
-- stack --resolver lts-12.21 script
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Exception (finally)
import Control.Monad (forever, void)
import qualified Data.ByteString.Lazy as BL
import Data.Foldable (for_)
import Network.HTTP.Simple

main :: IO ()
main = do
  queue <- newTBMQueueIO 16
  concurrently_
    (fillQueue queue `finally` atomically (closeTBMQueue queue))
    (replicateConcurrently_ 8 (drainQueue queue))

fillQueue :: TBMQueue (String, String) -> IO ()
fillQueue queue = do
  contents <- getContents
  for_ (lines contents) $ line ->
    case words line of
      [url, file] -> atomically $ writeTBMQueue queue (url, file)
      _ -> error $ "Invalid line: " ++ show line

drainQueue :: TBMQueue (String, String) -> IO ()
drainQueue queue =
    loop
  where
    loop = do
      mnext <- atomically $ readTBMQueue queue
      case mnext of
        Nothing -> return ()
        Just (url, file) -> do
          req <- parseRequest url
          res <- httpLBS req
          BL.writeFile file $ getResponseBody res

And this should be executed like this:

$ stack program.hs
https://www.google.com google.txt
https://www.snoyman.com snoyman.txt
^Ctrl-D

You will find the content of the above website in the respective files.

Exceptions

You can throw and catch exceptions inside an STM block if desired. The semantics are the same as catching and handling exception inside IO itself. Inside of throwIO and catch, you just use throwSTM and catchSTM.

The join trick

There’s a nifty little trick you can use when writing STM code. A common pattern is to want to perform some IO at the end of a transaction. Since you can’t run it inside the transaction itself, you instead run it right after the transaction. For example:

addFunds :: TVar Int -> Int -> IO ()
addFunds var amt = do
  new <- atomically $ do
    orig <- readTVar var
    let new = orig + amt
    writeTVar var new
    return new
  putStrLn $ "New amount: " ++ show new

Having to break up your logic like that feels wrong, so instead of simply returning the new value, we can instead return an IO action to be run after the block:

addFunds :: TVar Int -> Int -> IO ()
addFunds var amt = do
  action <- atomically $ do
    orig <- readTVar var
    let new = orig + amt
    writeTVar var new
    return $ putStrLn $ "New amount: " ++ show new
  action

And then we can use the join function to clean things up a bit further:

addFunds :: TVar Int -> Int -> IO ()
addFunds var amt = join $ atomically $ do
  orig <- readTVar var
  let new = orig + amt
  writeTVar var new
  return $ putStrLn $ "New amount: " ++ show new

UnliftIO

There are a number of functions in the stm package which are specialized to IO, such as newTVarIO. If you work with monad transformers a lot, and would like something generalized to MonadIO, you can instead import the UnliftIO.STM module from the unliftio library. It’s fully compatible with stm itself, just providing lifting (and in some cases unlifting) where needed.

Concurrency

STM is most useful in concurrent applications. The best next step is to read up on the async library.

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.