When deciding which language to use to solve challenges that require heavy concurrent algorithms, it's hard to not consider Haskell. Its immutable and persistent data structures reduce the introduction of accidental complexity, and the GHC runtime facilitates the creation of thousands of (green) threads without having to worry as much about the memory and performance costs.
The epitome of Haskell's concurrent API is the async
package, which provides higher-order functions (e.g. race
, mapConcurrently
, etc.) that allow us to run IO
sub-routines and combine their results in various ways while executing concurrently. It also offers the type Concurrently
which allows developers to give normal sub-routines concurrent properties, and also provides Applicative
and Alternative
instances that help in the creation of values from composing smaller sub-routines.
In this blog post, we will discuss some of the drawbacks of using the Concurrently
type when composing sub-routines. Then we will show how we can overcome these shortcomings by taking advantage of the structural nature of the Applicative
and Alternative
typeclasses; re-shaping and optimizing the execution of a tree of sub-routines.
And, if you simply want to get these performance advantages in your Haskell code today, you can cut to the chase and begin using the new Conc
datatype we've introduced in unliftio
0.2.9.0.
The drawbacks of Concurrently
Getting started with Concurrently
is easy. We can wrap an IO a
sub-routine with the Concurrently
constructor, and then we can compose async values using the map (<$>
), apply (<*>
), and alternative (<|>
) operators. An example might be:
myPureFunction :: String -> String -> String -> String
myPureFunction a b c = a ++ " " ++ b ++ " " ++ c
myComputation :: Concurrently String
myComputation =
myPureFunction
<$> Concurrently fetchStringFromAPI1
<*> ( Concurrently fetchStringFromAPI2_Region1
<|> Concurrently fetchStringFromAPI2_Region2
<|> Concurrently fetchStringFromAPI2_Region3
<|> Concurrently fetchStringFromAPI2_Region4)
<*> Concurrently fetchStringFromAPI3
Let's talk a bit on the drawbacks of this approach. How many threads do you think we need to make sure all these calls execute concurrently? Try to come up with a number and an explanation and then continue reading.
I am guessing you are expecting this code to spawn six (6) threads, correct? One for each IO
sub-routine that we are using. However, with the existing implementation of Applicative
and Alternative
in Concurrently
, we will spawn at least ten (10) threads. Let's explore these instances to have a better understanding of what is going on:
instance Applicative Concurrently where
pure = Concurrently . return
Concurrently fs <*> Concurrently as =
Concurrently $ (\(f, a) -> f a) <$> concurrently fs as
instance Alternative Concurrently where
Concurrently as <|> Concurrently bs =
Concurrently $ either id id <$> race as bs
First, let us expand the alternative calls in our example:
Concurrently fetchStringFromAPI2_Region1
<|> Concurrently fetchStringFromAPI2_Region2
<|> Concurrently fetchStringFromAPI2_Region3
<|> Concurrently fetchStringFromAPI2_Region4
--- is equivalent to
Concurrently (
either id id <$>
race {- 2 threads -}
fetchStringFromAPI2_Region1
(either id id <$>
race {- 2 threads -}
fetchStringFromAPI2_Region2
(either id id <$>
race {- 2 threads -}
fetchStringFromAPI2_Region3
fetchStringFromAPI2_Region4))
)
Next, let us expand the applicative calls:
Concurrently (myPureFunction <$> fetchStringFromAPI1)
<*> Concurrently fetchStringFromAPI2
<*> Concurrently fetchStringFromAPI3
--- is equivalent to
Concurrently (
(\(f, a) -> f a) <$>
concurrently {- 2 threads -}
( (\(f, a) -> f a) <$>
concurrently {- 2 threads -}
(myPureFunction <$> fetchStringFromAPI1)
fetchStringFromAPI2
)
fetchStringFromAPI3
)
You may tell we are always spawning two threads for each pair of sub-routines. Suppose we have 7 sub-routines we want to compose via Applicative
or Alternative
. Using this implementation we would spawn at least 14 new threads when at most 8 should do the job. For each composition we do, an extra thread is going to be spawned to deal with bookkeeping.
Another drawback to consider: what happens if one of the values in the call is a pure
call? Given this code:
pure foo <|> bar
We get to spawn a new thread (unnecessarily) to wait for foo
, even though it has already been computed and it should always win. As we mentioned before, Haskell is an excellent choice for concurrency because it makes spawning threads cheap; however, these threads don't come for free, and we should strive to avoid redundant thread creation.
Introducing the Conc
type
To address the issues mentioned above, we implemented a new type called Conc
in our unliftio
package. It has the same purpose as Concurrently
, but it offers some extra guarantees:
- There is going to be only a single bookkeeping thread for all
Applicative
and Alternative
compositions.
- If we have
pure
calls in an Applicative
or an Alternative
composition, we will not spawn a new thread.
- We will optimize the code for trivial cases. For example, not spawning a thread when evaluating a single
Conc
value (instead of a composition of Conc
values).
- We can compose more than
IO
sub-routines. Any monadic type that implements MonadUnliftIO
is accepted.
- Children threads are always launched in an unmasked state, not the inherited state of the parent thread.
The Conc
type is defined as follows:
data Conc m a where
Action :: m a -> Conc m a
Apply :: Conc m (v -> a) -> Conc m v -> Conc m a
LiftA2 :: (x -> y -> a) -> Conc m x -> Conc m y -> Conc m a
Pure :: a -> Conc m a
Alt :: Conc m a -> Conc m a -> Conc m a
Empty :: Conc m a
instance MonadUnliftIO m => Applicative (Conc m) where
pure = Pure
(<*>) = Apply
(*>) = Then
liftA2 = LiftA2
instance MonadUnliftIO m => Alternative (Conc m) where
(<|>) = Alt
If you are familiar with Free
types, this will look eerily familiar. We are going to represent our concurrent computations as data so that we can later transform it or evaluate as we see fit. In this setting, our first example would look something like the following:
myComputation :: Conc String
myComputation =
myPureFunction
<$> conc fetchStringFromAPI1
<*> ( conc fetchStringFromAPI2_Region1
<|> conc fetchStringFromAPI2_Region2
<|> conc fetchStringFromAPI2_Region3
<|> conc fetchStringFromAPI2_Region4)
--- is equivalent to
Apply (myPureFunction <$> fetchStringFromAPI1)
(Alt (Action fetchStringFromAPI2_Region1)
(Alt (Action fetchStringFromAPI2_Region2)
(Alt (Action fetchStringFromAPI2_Region3)
(Action fetchStringFromAPI2_Region4))))
You may notice we keep the tree structure of the Concurrently
implementation. However, given we are dealing with a pure data structure, we can modify our Conc
value to something that is easier to evaluate. Indeed, thanks to the Applicative
interface, we don't need to evaluate any of the IO
sub-routines to do transformations (magic!).
We have additional (internal) types that flatten all our alternatives and applicative values:
data Flat a
= FlatApp !(FlatApp a)
| FlatAlt !(FlatApp a) !(FlatApp a) ![FlatApp a]
data FlatApp a where
FlatPure :: a -> FlatApp a
FlatAction :: IO a -> FlatApp a
FlatApply :: Flat (v -> a) -> Flat v -> FlatApp a
FlatLiftA2 :: (x -> y -> a) -> Flat x -> Flat y -> FlatApp a
These types are equivalent to our Conc
type, but they have a few differences from Conc
:
- The
Flat
type separates Conc
values created via Applicative
from the ones created via Alternative
- The
FlatAlt
constructor flattens an Alternative
tree into a list (helping us spawn all of them at once and facilitating the usage of a single bookkeeping thread).
- Note that we represent this as a "at least two" list, with a similar representation of a non empty list from the
semigroups
package.
- The
Flat
and FlatApp
records are not polymorphic on their monadic context given they rely directly on IO
. We can transform the m
parameter in our Conc m a
type to IO
via the MonadUnliftIO
constraint.
The first example of our blog post, when flattened, would look something like the following:
FlatApp
(FlatApply
(FlatApp (FlatAction (myPureFunction <$> fetchStringFromAPI1)))
(FlatAlt (FlatAction fetchStringFromAPI2_Region1)
(FlatAction fetchStringFromAPI2_Region2)
[ FlatAction fetchStringFromAPI2_Region3
, FlatAction fetchStringFromAPI2_Regoin4 ]))
Using a flatten
function that transforms a Conc
value into a Flat
value, we can later evaluate the concurrent sub-routine tree in a way that is optimal for our use case.
So given that the Conc
API reduces the number of threads created via Alternative
, our implementation should work best, correct? Sadly, it is not all peachy. To ensure that we get the result of the first thread that finishes on an Alternative
composition, we make use of the STM API. This approach works great when we want to gather values from multiple concurrent threads. Sadly, the STM monad doesn't scale too well when composing lots of reads, making this approach prohibitive if you are composing tens of thousands of Conc
values.
Considering this limitation, we only use STM
when an Alternative
function is involved; otherwise, we rely on MVar
s for multiple thread result composition via Applicative
. We can do this without sweating because we can change the evaluator of the sub-routine tree created by Conc
on the fly.
Conclusions
We showcased how we can model the composition of computations using an Applicative
and Alternative
tree, and then, taking advantage of this APIs; we transformed this computation tree into something more approachable to execute concurrently. We also took advantage of this sub-routines as data approach to change the evaluator from MVar
to STM
compositions.
Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.
Do you like this blog post and need help with Next Generation Software Engineering, Platform Engineering or Blockchain & Smart Contracts? Contact us.