A Multithreaded Runtime for Deja Fu

Tags concurrency, dejafu, haskell, programming, research
Target Audience People interested in the dejafu internals.
Epistemic Status This isn't actually the implementation dejafu uses now, but it's close enough (and simple enough) to help understanding.

The de­jafu situ­ation cur­rently looks some­thing like this:

  1. We have a type­class ab­stracting over con­cur­rency.
    • There’s an im­ple­ment­a­tion of this type­class using IO.
    • There’s also an im­ple­ment­a­tion of this type­class using a custom monad trans­former called ConcT.
  2. Com­pu­ta­tions of type Mon­adRef r n => ConcT r n a can be ex­ecuted with a given sched­uler, to pro­duce a result and an ex­e­cu­tion trace.

  3. Un­like IO, the threads in a ConcT r n com­pu­ta­tion are ex­ecuted in a single-step fashion based on the de­cisions of the sched­uler.

  4. To im­ple­ment this single-step ex­e­cu­tion, all threads are ex­ecuted in a single “real” thread.

It’s the third point that gives de­jafu the ability to sys­tem­at­ic­ally ex­plore dif­ferent ex­e­cu­tions. If ex­e­cu­tion were not single-step, then it wouldn’t be pos­sible in gen­eral to con­text switch between ar­bit­rary con­cur­rency ac­tions.

The fourth point greatly sim­pli­fies the im­ple­ment­a­tion, but also causes prob­lems: GHC Haskell has a no­tion of “bound threads”, which are Haskell threads bound to a par­tic­ular OS thread. Bound threads are ab­so­lutely es­sen­tial to use FFI calls which rely on thread-­local state. Deja Fu cannot sup­port bound threads if it ex­ecutes everything in a single thread!

How can we ad­dress this?


PULSE is a con­cur­rency-testing tool for Er­lang. It works by code in­stru­ment­a­tion: around every com­mu­nic­a­tion op­er­a­tion is in­serted a call to the PULSE sched­uler pro­cess. The sched­uler pro­cess tells pro­cesses when they can run. Ex­e­cu­tion is not seri­al­ised into a single thread, the dis­tinct Er­lang pro­cesses still ex­ist, but only one of them may run at a time.

We can do the same thing in Haskell.

Mini Fu

Let’s look at a much sim­pli­fied ver­sion of de­jafu to try this idea out.

{-# LAN­GUAGE Gen­er­al­ized­New­ty­pe­De­riving #-}
{-# LAN­GUAGE Ex­ist­en­tialQuan­ti­fic­a­tion #-}
{-# LAN­GUAGE TypeFam­ilies #-}

im­port qual­i­fied Con­trol.­Con­cur­rent as C
im­port qual­i­fied Con­trol.­Mon­ad.­Cont as K
im­port Data.L­ist.NonEmpty (NonEmpty(..), nonEmpty)
im­port qual­i­fied Data.Map as M
im­port Data.Maybe (is­Nothing)
im­port qual­i­fied Sys­tem.Random as R

class Monad m => Mon­ad­Conc m where
  type ThreadId m :: *
  type MVar     m :: * -> *

  fork   :: m () -> m (ThreadId m)
  forkOS :: m () -> m (ThreadId m)

  newEmptyMVar :: m (MVar m a)
  put­MVar  :: MVar m a -> a -> m ()
  takeMVar :: MVar m a -> m a

new­MVar :: Mon­ad­Conc m => a -> m (MVar m a)
new­MVar a = do
  v <- newEmptyMVar
  put­MVar v a
  pure v

There’s a straight­for­ward im­ple­ment­a­tion for IO:

in­stance Mon­ad­Conc IO where
  type ThreadId IO = C.ThreadId
  type MVar     IO = C.MVar

  fork   = C.forkIO
  forkOS = C.forkOS

  newEmptyMVar = C.newEmptyMVar
  put­MVar  = C.put­MVar
  takeMVar = C.takeMVar

The testing im­ple­ment­a­tion is a little hair­ier. Be­cause we want to be able to single-step it, we’ll use con­tinu­ations:

new­type ConcT m a = ConcT { run­ConcT :: K.Cont (Ac­tion m) a }
  de­riving (Functor, Ap­plic­ative, Monad)

new­type CT­ThreadId = CT­ThreadId Int
  de­riving (Eq, Ord)

data CT­MVar m a = CT­MVar { mvarID :: Int, mvarRef :: MVar m (Maybe a) }

data Ac­tion m
  = Fork   (ConcT m ()) (CT­ThreadId -> Ac­tion m)
  | ForkOS (ConcT m ()) (CT­ThreadId -> Ac­tion m)
  | forall a. NewEmptyMVar (CT­MVar m a -> Ac­tion m)
  | forall a. Put­MVar  (CT­MVar m a) a (Ac­tion m)
  | forall a. TakeMVar (CT­MVar m a)   (a -> Ac­tion m)
  | Stop (m ())

in­stance Mon­ad­Conc (ConcT m) where
  type ThreadId (ConcT m) = CT­ThreadId
  type MVar     (ConcT m) = CT­MVar m

  fork   ma = ConcT (K.cont (Fork   ma))
  forkOS ma = ConcT (K.cont (ForkOS ma))

  newEmptyMVar = ConcT (K.cont NewEmptyMVar)
  put­MVar  mvar a = ConcT (K.cont (\k -> Put­MVar mvar a (k ())))
  takeMVar mvar   = ConcT (K.cont (TakeMVar mvar))

Let’s talk about the Ac­tion type a bit be­fore moving on. The gen­eral struc­ture is Name [<args> ...] (<res­ult> -> Ac­tion m), where m is some Mon­ad­Conc. For MVars, we’re just re-using the MVar type of the un­der­lying monad (de­jafu proper re-uses the IORefs of the un­der­lying mon­ad). For ThreadIds we’re using Ints. And we’re going to get the final result out of the com­pu­ta­tion with the Stop ac­tion.

Im­ple­menting Mini Fu

Let’s keep things simple and not sup­port most of the fancy scheduling stuff de­jafu does. Our sched­uler is just going to be a stateful func­tion from run­nable threads to a single thread:

type Sched­uler s = NonEmpty CT­ThreadId -> s -> (CT­ThreadId, s)

So now our ex­e­cu­tion func­tion is going to look like this:

minifu :: Mon­ad­Conc m => Sched­uler s -> s -> ConcT m a -> m (Maybe a, s)
minifu sched s (ConcT ma) = do
  out <- new­MVar Nothing
  s'  <- run sched s (K.run­Cont ma (\a -> Stop (takeMVar out >> put­MVar out (Just a))))
  a   <- takeMVar out
  pure (a, s')

The real meat is the run func­tion:

run :: Mon­ad­Conc m => Sched­uler s -> s -> Ac­tion m -> m s
run sched s0 a0 = go s0 ini­tial­Id­Source =<< ini­tial­Threads a0 where
  go s ids threads
    | ini­tial­ThreadId `M.member` threads = case run­nable threads of
      Just tids ->
        let (chosen, s') = sched tids s
        in un­curry (go s') =<< loop­Step­Thread ids chosen threads
      Nothing -> pure s
    | oth­er­wise = pure s

  run­nable = nonEmpty . M.keys . M.filter (is­Nothing . blockedOn)

Like in de­jafu proper, ex­e­cu­tion is going to if the main thread ter­min­ates, even if there are other threads. Threads are going to live in a map keyed by ThreadId.

type Threads m = M.Map CT­ThreadId (Thread m)

ini­tial­Threads :: Mon­ad­Conc m => Ac­tion m -> m (Threads m)
ini­tial­Threads a0 = do
  t <- fork­Thread False a0
  pure (M.singleton ini­tial­ThreadId t)

ini­tial­ThreadId :: CT­ThreadId
ini­tial­ThreadId = CT­ThreadId 0

Each thread in our pro­gram-un­der­-test is going to be ex­ecuted in an ac­tual thread. So, like PULSE, we’ll in­tro­duce com­mu­nic­a­tion (in the form of MVars) around con­cur­rency ac­tions to en­sure that we get single-step ex­e­cu­tion. So a thread is going to have three com­pon­ents: the MVar (if any) it’s cur­rently blocked on, an MVar to signal that it should ex­ecute one step, and an MVar to com­mu­nicate what the thread did.

data Thread m = Thread
  { blockedOn   :: Maybe Int
  , sig­nal­Step  :: MVar m Id­Source
  , awaitResult :: MVar m (Id­Source, Thread­Result m)

data Thread­Result m
  = Busi­nes­sAs­U­sual
  | Killed
  | Up­dated Int
  | Blocked Int
  | Forked (Thread m)

The Id­Source is used to gen­erate new unique thread and MVar IDs:

type Id­Source = (Int, Int)

ini­tial­Id­Source :: Id­Source
ini­tial­Id­Source = (1, 0)

nextThreadId :: Id­Source -> (CT­ThreadId, Id­Source)
nextThreadId (t, m) = (CT­ThreadId t, (t + 1, m))

next­M­VarId :: Id­Source -> (Int, Id­Source)
next­M­VarId (t, m) = (m, (t, m + 1))

Forking a thread is going to set up these MVars and the small bit of logic to en­sure things happen as we like:

fork­Thread :: Mon­ad­Conc m => Bool -> Ac­tion m -> m (Thread m)
fork­Thread isOS act = do
  signal <- newEmptyMVar
  await  <- newEmptyMVar
  _ <- (if isOS then forkOS else fork) (runThread signal await act)
  pure (Thread Nothing signal await)

runThread :: Mon­ad­Conc m => MVar m Id­Source -> MVar m (Id­Source, Thread­Result m) -> Ac­tion m -> m ()
runThread signal await = go where
  go act = do
    ids <- takeMVar signal
    (act', ids', res) <- run­Step­Thread ids act
    put­MVar await (ids', res)
    maybe (pure ()) go act'

The final pieces of the puzzle are the two *Step­Thread func­tions, which ex­ecutes one ac­tion of our chosen thread. These are a little tricker than in normal de­jafu.

Firstly, loop­Step­Thread, which tells the thread that was chosen by the sched­uler to step:

loop­Step­Thread :: Mon­ad­Conc m => Id­Source -> CT­ThreadId -> Threads m -> m (Id­Source, Threads m)
loop­Step­Thread ids tid threads = case M.lookup tid threads of
  Just thread -> do
    put­MVar (sig­nal­Step thread) ids
    (ids', res) <- takeMVar (awaitResult thread)
    let resf = case res of
          Busi­nes­sAs­U­sual -> id
          Killed -> M.de­lete tid
          Up­dated i -> fmap (\t -> if blockedOn t == Just i then t { blockedOn = Nothing } else t)
          Blocked i -> M.in­sert tid (thread { blockedOn = Just i })
          Forked thread' -> M.in­sert (fst (nextThreadId ids)) thread'
    pure (ids', resf threads)
  Nothing -> pure (ids, threads)

Fi­nally run­Step­Thread, which ex­ecutes an ac­tion:

run­Step­Thread :: Mon­ad­Conc m => Id­Source -> Ac­tion m -> m (Maybe (Ac­tion m), Id­Source, Thread­Result m)
run­Step­Thread ids (Fork (ConcT ma) k) = do
  t <- prim­Fork False ma
  let (tid', ids') = nextThreadId ids
  pure (Just (k tid'), ids', Forked t)
run­Step­Thread ids (ForkOS (ConcT ma) k) = do
  t <- prim­Fork True ma
  let (tid', ids') = nextThreadId ids
  pure (Just (k tid'), ids', Forked t)
run­Step­Thread ids (NewEmptyMVar k) = do
  v <- newEmptyMVar
  put­MVar v Nothing
  let (mvid, ids') = next­M­VarId ids
  let mvar = CT­MVar mvid v
  pure (Just (k mvar), ids', Busi­nes­sAs­U­sual)
run­Step­Thread ids k0@(Put­MVar (CT­MVar mvid v) a k) = do
  old <- takeMVar v
  case old of
    Just _  -> put­MVar v old      >> pure (Just k0, ids, Blocked mvid)
    Nothing -> put­MVar v (Just a) >> pure (Just k, ids,  Up­dated mvid)
run­Step­Thread ids k0@(TakeMVar (CT­MVar mvid v) k) = do
  old <- takeMVar v
  case old of
    Nothing -> put­MVar v old     >> pure (Just k0,    ids, Blocked mvid)
    Just a  -> put­MVar v Nothing >> pure (Just (k a), ids, Up­dated mvid)
run­Step­Thread ids (Stop ma) = do
  pure (Nothing, ids, Killed)

prim­Fork :: Mon­ad­Conc m => Bool -> K.Cont (Ac­tion m) () -> m (Thread m)
prim­Fork isOS ma = fork­Thread isOS (K.run­Cont ma (\_ -> Stop (pure ())))

This looks pretty hor­rible, but each case is fairly small, so just look at those.

Now we can run it (with a random sched­uler for fun) and see that it works:

test :: Mon­ad­Conc m => m Int
test = do
  a <- newEmptyMVar
  b <- new­MVar 2
  c <- new­MVar 3
  forkOS (put­MVar a b)
  forkOS (put­MVar a c)
  forkOS (takeMVar b >> put­MVar b 14)
  forkOS (takeMVar c >> put­MVar c 15)
  takeMVar =<< takeMVar a

ran­dom­Sched :: Sched­uler R.StdGen
ran­dom­Sched (t:|ts) g =
  let (i, g') = R.ran­domR (0, length ts) g
  in ((t:ts) !! i, g')

main :: IO ()
main = do
  g <- R.newStdGen
  print . fst =<< minifu ran­dom­Sched g test


λ> main
Just 14
λ> main
Just 2
λ> main
Just 14
λ> main
Just 2
λ> main
Just 14
λ> main
Just 14
λ> main
Just 15
λ> main
Just 15

That wasn’t so bad!

Next Steps to Deja Fu

Mini Fu is much smaller than Deja Fu, but it demon­strates the key con­cepts. To get a mul­ti­th­readed runtime into de­jafu, I think the main change to this stuff is to figure out how thread com­mu­nic­a­tion is going to work: in de­jafu proper, ac­tions can change the con­tinu­ation of an ar­bit­rary thread (eg, throwing an ex­cep­tion to a thread will call its ex­cep­tion hand­ler).

The over­head of this method com­pared to the single-­threaded ap­proach must be meas­ured. It would be great to sup­port bound threads, but not at the cost of everything else be­coming much worse! If the over­head is bad, per­haps a hy­brid ap­proach could be used: un­bound threads in the pro­gram-un­der­-test are ex­ecuted as they are cur­rently, whereas bound threads get the fancy mul­ti­th­readed im­ple­ment­a­tion. It would com­plicate things, but pos­sibly elim­inate the over­head in the common case.

Fi­nally, when the main thread ter­min­ates, any still-run­ning ones should ter­minate as well, so the Thread re­cord will need to con­tain the ThreadId m of the un­der­lying monad, so kill­Thread can be used.