Writing a Concurrency Testing Library (Part 1)

Date
Tags concurrency, dejafu, haskell, minifu, programming
Target Audience Haskell programmers.
Epistemic Status This is building up to a simplified form of dejafu, which is based on published research.
Attention Conservation Notice This is setting up the basic types and functions we'll need, no fancy testing in this post.

Wel­come to the first part of a tu­torial on writing your very own con­cur­rency testing lib­rary for Haskell. Be­fore we get into the de­tails, let’s just cla­rify what I mean by a “con­cur­rency testing lib­rary”. The goal is a func­tion which, given some con­cur­rent Haskell like so:

ex­ample = do
  a <- newEmptyMVar
  forkIO (put­MVar a 1)
  forkIO (put­MVar a 2)
  takeMVar a

Will tell us the pos­sible res­ults of that com­pu­ta­tion:

λ> test ex­ample
[1, 2]

We’re going to build this from the ground up, using the con­cur­rency lib­rary, as it provides a type­class ab­strac­tion over fork­ing, MVars, STM, and such­like.

You may have come across my de­jafu lib­rary be­fore. If not, don’t worry, but you may want to check it out as we’re going to be building some­thing very sim­ilar.

Let’s get down to busi­ness

Ok, with the pre­lim­in­aries over, let’s get cod­ing! All the code written in this series is on GitHub, with one tag for each post. The code for this post is under the “post-01” tag.

The goal in this post is to be able to im­ple­ment a func­tion which can ex­ecute simple thread-and-MVar com­pu­ta­tions (like the ex­ample from the be­gin­ning) with a stateful sched­uler. Firstly, let’s say what we know:

That sounds rather like some­thing based on con­tinu­ations or a free monad. Fur­ther­more, we’re going to need mut­able state to im­ple­ment all of this, as we’re mod­el­ling a DSL with mut­able ref­er­ences, and doing that purely is a huge pain.

Let’s write down some types. Be­cause we’re writing a min­i-de­jafu, I’m calling this pro­ject “mini­fu”. So we want a func­tion:

im­port qual­i­fied Con­trol.­Con­cur­rent.­Classy as C
im­port Data.L­ist.NonEmpty (NonEmpty(..))

new­type ThreadId = ThreadId Int
  de­riving (Eq, Ord)

type Sched­uler s = NonEmpty ThreadId -> s -> (ThreadId, s)

minifu :: C.Mon­ad­Conc m => Sched­uler s -> s -> MiniFu m a -> m (Maybe a, s)

For some suit­able MiniFu monad trans­former. Now we’re going to take the standard way of con­structing a free monad, and have a data struc­ture rep­res­enting our class of in­terest (Mon­ad­Conc), with one con­structor for every func­tion. Be­cause we’re only talking about threads and MVars in this post, it will be a fairly small type:

{-# LAN­GUAGE GADTs #-}

data PrimOp m where
  Fork         :: MiniFu m () -> (ThreadId -> PrimOp m) -> PrimOp m
  NewEmptyMVar :: (MVar m a -> PrimOp m)                -> PrimOp m
  Put­MVar      :: MVar m a -> a -> PrimOp m             -> PrimOp m
  TakeMVar     :: MVar m a -> (a -> PrimOp m)           -> PrimOp m
  Stop         :: m ()                                  -> PrimOp m

new­type MVarId = MVarId Int
  de­riving (Eq, Ord)

data MVar m a = MVar
  { mvarId  :: MVarId
  , mvarRef :: C.CRef m (Maybe a)
  }

The Stop ac­tion is what is going to let us com­mu­nicate the final result out of the com­pu­ta­tion. I’ve also defined an MVar type. Our MVars are going to be im­ple­mented as a CRef (what con­cur­rency calls an IORef) holding a maybe value, along with an iden­ti­fier. These iden­ti­fiers will come into play when we look at threads block­ing.

Given this set up, the MiniFu type is very sim­ple:

{-# LAN­GUAGE Gen­er­al­ized­New­ty­pe­De­riving #-}

im­port qual­i­fied Con­trol.­Mon­ad.­Cont as K

new­type MiniFu m a = MiniFu { run­MiniFu :: K.Cont (PrimOp m) a }
  de­riving (Functor, Ap­plic­ative, Monad)

We’re not ac­tu­ally going to write a Mon­ad­Conc in­stance for MiniFu yet, be­cause there are a bunch of con­straints which we can’t really sat­isfy. But we can still define the func­tions of in­terest:

fork :: MiniFu m () -> MiniFu m ThreadId
fork ma = MiniFu (K.cont (Fork ma))

newEmptyMVar :: MiniFu m (MVar m a)
newEmptyMVar = MiniFu (K.cont NewEmptyMVar)

put­MVar :: MVar m a -> a -> MiniFu m ()
put­MVar v a = MiniFu (K.cont (\k -> Put­MVar v a (k ())))

takeMVar :: MVar m a -> MiniFu m a
takeMVar v = MiniFu (K.cont (TakeMVar v))

Hey, not bad! Now we can slap a MiniFu m Int type sig­na­ture on our ex­ample from the start (and re­name the forkIO calls) and it com­piles!

ex­ample :: MiniFu m Int
ex­ample = do
  a <- newEmptyMVar
  fork (put­MVar a 1)
  fork (put­MVar a 2)
  takeMVar a

Take a mo­ment to make sure you’re happy with this sec­tion be­fore moving on to the next. MiniFu is going to be a layered ap­plic­a­tion: this is the basic layer which defines the func­tions we can test; the next layer ex­ecutes a MiniFu com­pu­ta­tion; the layers above that will im­ple­ment the sys­tem­atic testing be­ha­viour.

Im­ple­menting minifu

Re­call the type of minifu:

minifu :: C.Mon­ad­Conc m => Sched­uler s -> s -> MiniFu m a -> m (Maybe a, s)

So, what does it need to do? It needs to set up the ex­e­cu­tion en­vir­on­ment: in this case that’s spe­cifying that the provided com­pu­ta­tion is the main thread, and then it needs to re­peatedly call the sched­uler, ex­ecuting one PrimOp of the chosen thread at a time, until either the main thread ter­min­ates or everything is blocked.

In the best func­tional pro­gram­ming prac­tice, minifu is going to do the min­imum it can and call an­other func­tion to do the rest. So what minifu is ac­tu­ally going to do is to ex­tract the con­tinu­ation and set up the mech­anism to com­mu­nicate the final result back:

minifu sched s (MiniFu ma) = do
  out <- C.new­CRef Nothing
  s'  <- run sched s (K.run­Cont ma (Stop . C.write­CRef out . Just))
  a   <- C.read­CRef out
  pure (a, s')

Be­fore we move on to the im­ple­ment­a­tion of run, let’s first look at two con­cerns we’ll have along the way: get­ting unique names (for threads and MVars) and rep­res­enting threads.

Names

Each thread gets a unique ThreadId, and each MVar gets a unique MVarId. As these are just an Int, we can use the same source for both:

type Id­Source = Int

ini­tial­Id­Source :: Id­Source
ini­tial­Id­Source = 0

nextThreadId :: Id­Source -> (ThreadId, Id­Source)
nextThreadId n = (ThreadId n, n + 1)

next­M­VarId :: Id­Source -> (MVarId, Id­Source)
next­M­VarId n = (MVarId n, n + 1)

This is as simple as it gets, but it’s good enough for now.

Threads

What is a thread? Well, it has a con­tinu­ation, which is some value of type PrimOp m, and it might be blocked. We want to know if a thread is blocked for two reas­ons: we don’t want the sched­uler to schedule a blocked thread, and we want to be able to tell if the com­pu­ta­tion is dead­locked. Threads can only block on reading from or writing to MVars (cur­rently), so let’s use a Maybe MVarId to in­dicate whether the thread is blocked:

data Thread m = Thread
  { threadK     :: PrimOp m
  , thread­B­lock :: Maybe MVarId
  }

When we create a thread, it’s ini­tially un­blocked:

thread :: PrimOp m -> Thread m
thread k = Thread
  { threadK     = k
  , thread­B­lock = Nothing
  }

And fi­nally we need a way to con­struct our ini­tial col­lec­tion of threads:

im­port Data.Map (Map)
im­port qual­i­fied Data.Map as M

type Threads m = Map ThreadId (Thread m)

ini­tialise :: PrimOp m -> (Threads m, Id­Source)
ini­tialise k =
  let (tid, idsrc) = nextThreadId ini­tial­Id­Source
  in (M.singleton tid (thread k), idsrc)

And now back to the im­ple­ment­a­tion of minifu.

Im­ple­menting run

The run func­tion is re­spons­ible for taking the first con­tinu­ation, cre­ating the col­lec­tion of threads, and re­peatedly calling the sched­uler and step­ping the chosen thread, until the com­pu­ta­tion is done.

It has this type:

run :: C.Mon­ad­Conc m => Sched­uler s -> s -> PrimOp m -> m s

As with minifu, we shall keep it sim­ple, and del­egate most of the work to yet an­other func­tion:

im­port Data.L­ist.NonEmpty (nonEmpty)
im­port Data.Maybe (is­Nothing)

run sched s0 = go s0 . ini­tialise where
  go s (threads, ids)
    | ini­tial­ThreadId `M.member` threads = case run­nable threads of
      Just tids ->
        let (chosen, s') = sched tids s
        in go s' =<< step­Thread chosen (threads, ids)
      Nothing -> pure s
    | oth­er­wise = pure s

  run­nable = nonEmpty . M.keys . M.filter (is­Nothing . thread­B­lock)

  ini­tial­ThreadId = fst (nextThreadId ini­tial­Id­Source)

Let’s break down that go func­tion a bit:

  1. We check if the ini­tial thread still ex­ists. If not, we re­turn.
  2. We check if the col­lec­tion of run­nable threads is nonempty. If not, we re­turn.
  3. We call the sched­uler to pick a thread from the run­nable ones.
  4. We call the (not yet defined) step­Thread func­tion to ex­ecute one step of that thread.
  5. We go around the loop again.

Not too bad, hey? Fi­nally (really fi­nally) we just have one func­tion to go, step­Thread. Can you see what the type will be?

It’s going to start like this:

step­Thread :: C.Mon­ad­Conc m => ThreadId -> (Threads m, Id­Source) -> m (Threads m, Id­Source)
step­Thread tid (threads, idsrc) = case M.lookup tid threads of
    Just thrd -> go (threadK thrd)
    Nothing -> pure (threads, idsrc)
  where
    ad­just :: (Thread m -> Thread m) -> Threads m -> Threads m
    ad­just f = M.ad­just f tid

    goto :: PrimOp m -> Threads m -> Threads m
    goto k = ad­just (\thrd -> thrd { threadK = k })

    block :: Maybe MVarId -> Threads m -> Threads m
    block mv = ad­just (\thrd -> thrd { thread­B­lock = mv })

    un­block :: MVarId -> Threads m -> Threads m
    un­block v = fmap (\thrd ->
      if thread­B­lock thrd == Just v
      then thrd { thread­B­lock = Nothing }
      else thrd)

    go :: PrimOp m -> m (Threads m, Id­Source)
    -- go ...

I’ve in­tro­duced a few helper func­tions, which will crop up a lot. That go func­tion will have a case for every con­structor of PrimOp m, and it’s going to look a bit hairy, so we’ll take it one con­structor at a time. Let’s do the con­structors in or­der.

First, we can fork threads:

    go (Fork (MiniFu ma) k) =
      let (tid', id­src') = nextThreadId idsrc
          thrd' = thread (K.run­Cont ma (\_ -> Stop (pure ())))
      in pure (goto (k tid') (M.in­sert tid' thrd' threads), id­src')

Forking is pretty straight­for­ward. We simply get the next avail­able ThreadId from the Id­Source, create a thread with the provided con­tinu­ation, and in­sert it into the Threads m map.

Next up is NewEmptyMVar:

    go (NewEmptyMVar k) = do
      ref <- C.new­CRef Nothing
      let (mvid, id­src') = next­M­VarId idsrc
      pure (goto (k (MVar mvid ref)) threads, id­src')

Re­member that we’re im­ple­menting our MVar type using the CRef type of the un­der­lying Mon­ad­Conc. As the MVar starts out empty, the CRef starts out holding Nothing.

The Put­MVar and TakeMVar ac­tions are al­most the same, so let’s tackle them to­gether:

    go (Put­MVar (MVar mvid ref) a k) = do
      old <- C.read­CRef ref
      case old of
        Just _ -> pure (block (Just mvid) threads, idsrc)
        Nothing -> do
          C.write­CRef ref (Just a)
          pure (goto k (un­block mvid threads), idsrc)

    go (TakeMVar (MVar mvid ref) k) = do
      old <- C.read­CRef ref
      case old of
        Just a -> do
          C.write­CRef ref Nothing
          pure (goto (k a) (un­block mvid threads), idsrc)
        Nothing -> pure (block (Just mvid) threads, idsrc)

In both cases, we start out by reading the value of the ref­er­ence. Re­member that Nothing in­dic­ates empti­ness, and Just in­dic­ates the pres­ence of a value. So, for Put­MVar if there already is a value (and for TakeMVar if there isn’t a value), we block the thread. In the other case, we up­date the value in the ref­er­ence, put­ting in the new value (or taking out the old), un­block all the rel­evant threads, and go to the con­tinu­ation.

These im­ple­ment­a­tions are not atomic. But that’s fine: des­pite MiniFu testing con­cur­rent pro­grams, there’s no con­cur­rency going on within MiniFu it­self. We can do as much or as little as we want during one atomic “step” of our pro­gram. This will turn out to be very useful when we im­ple­ment STM in a few posts time.

Fi­nally, we have Stop:

    go (Stop mx) = do
      mx
      pure (M.de­lete tid threads, idsrc)

And we’re done! That’s it! All we need now is a sched­uler, and we can ex­ecute our ex­ample!

A Simple Sched­uler

Our ex­ample is non­determin­istic, so we want a sched­uler which will let us see that. It would be no good us im­ple­menting some­thing which al­ways made the same de­cisions, as we’d only see one res­ult! So until we im­ple­ment the sys­tem­atic testing be­ha­vi­our, let’s just use a simple random sched­uler.

im­port qual­i­fied Sys­tem.Random as R

ran­dom­Sched :: R.Ran­domGen g => Sched­uler g
ran­dom­Sched (t:|ts) g =
  let (i, g') = R.ran­domR (0, length ts) g
  in ((t:ts) !! i, g')

There’s no deep magic here, we’re just picking a random value from a nonempty list. Fi­nally, we can con­struct a little demo:

demo :: IO ()
demo = do
  g <- R.newStdGen
  print . fst =<< minifu ran­dom­Sched g ex­ample

Which we can run in ghci like so:

λ> demo
Just 1
λ> demo
Just 1
λ> demo
Just 1
λ> demo
Just 2
λ> demo
Just 1

Suc­cess!

A random sched­uler is fine for demon­stra­tion pur­poses, but not so great for test­ing. Dif­ferent seeds may lead to the same ex­e­cu­tion, which makes it hard to know how many ex­e­cu­tions of a test is enough. It can be a useful tech­nique, but for us this is only the be­gin­ning.

Next time…

Next time we’ll look at im­ple­menting ex­cep­tions, both syn­chronous and asyn­chron­ous.

I hope you en­joyed this post, any feed­back is wel­come. As I men­tioned at the start, this is on GitHub, you can get the code we ended up with at the “post-01” tag.

Be­fore next time, I have some home­work for you! You have seen how to im­ple­ment MVars, so now try im­ple­menting CRefs! Here are the func­tions should you have a go at writ­ing:

data CRef m a = -- ...

new­CRef :: a -> MiniFu m (CRef m a)

read­CRef :: CRef m a -> MiniFu m a

write­CRef :: CRef m a -> a -> MiniFu m ()

atom­ic­Modi­fyCRef :: CRef m a -> (a -> (a, b)) -> MiniFu m b

Don’t worry about any of the re­laxed memory stuff im­ple­mented in de­jafu, just do se­quen­tial con­sist­ency (and if you don’t know what that means: it means to do the ob­vi­ous). I’ll put up a solu­tion (and maybe do a little re­fact­or­ing) be­fore the next post.


Thanks to José Manuel Calderón Trilla for reading an earlier draft of this post.