Writing a Concurrency Testing Library (Part 2): Exceptions

Date
Tags concurrency, dejafu, haskell, minifu, programming
Target Audience Haskell programmers.
Epistemic Status This is building up to a simplified form of dejafu, which is based on published research.
Attention Conservation Notice Still no fancy testing.

Wel­come back to my series on im­ple­menting a con­cur­rency testing lib­rary for Haskell. This is part 2 of the series, and today we’ll im­ple­ment ex­cep­tions. If you missed part 1, you can read it here.

As be­fore, all code is avail­able on GitHub. The code for this post is under the “post-02” tag.


Did you do last time’s home­work task? It was to im­ple­ment this in­ter­face:

data CRef m a = -- ...

new­CRef :: a -> MiniFu m (CRef m a)

read­CRef :: CRef m a -> MiniFu m a

write­CRef :: CRef m a -> a -> MiniFu m ()

atom­ic­Modi­fyCRef :: CRef m a -> (a -> (a, b)) -> MiniFu m b

Here are my solu­tions, avail­able at the “home­work-01” tag:

  1. (2070bdf) Add the CRef type, the PrimOp con­struct­ors, and the wrapper func­tions
  2. (188eec5) Im­ple­ment the primops

I also made some changes, avail­able at the “pre-02” tag:

  1. (7ce6e41) Add a helper for primops which don’t create any iden­ti­fiers
  2. (2419796) Move some defin­i­tions into an in­ternal module
  3. (9c49f9d) Change the type of the block helper to MVarId -> Threads m -> Threads m
  4. (dab­d84b) Im­ple­ment read­MVar

Now on to the show…

Syn­chronous ex­cep­tions

We can’t im­ple­ment ex­cep­tions with what we have already. We’re going to need some new primops. I think you’re get­ting a feel for how this works now, so I won’t drag this out. Here we go:

im­port qual­i­fied Con­trol.Ex­cep­tion as E

data PrimOp m where
  -- ...
  Throw :: E.Ex­cep­tion e => e -> PrimOp m
  Catch :: E.Ex­cep­tion e => MiniFu m a -> (e -> MiniFu m a) -> (a -> PrimOp m)
        -> PrimOp m
  PopH  :: PrimOp m -> PrimOp m

throw :: E.Ex­cep­tion e => e -> MiniFu m a
throw e = MiniFu (K.cont (\_ -> Throw e))

catch :: E.Ex­cep­tion e => MiniFu m a -> (e -> MiniFu m a) -> MiniFu m a
catch act h = MiniFu (K.cont (Catch act h))

Throwing an ex­cep­tion with throw jumps back to the closest en­closing catch with an ex­cep­tion handler of the ap­pro­priate type, killing the thread if there is none. The PopH primop will pop the top ex­cep­tion handler from the stack. We’ll in­sert those as ap­pro­priate when en­tering a catch.

Be­fore we can ac­tu­ally im­ple­ment these primops, we need to give threads a place to store their ex­cep­tion hand­lers. You might have guessed it when I said “stack”: we’ll just give every thread a list of them. This re­quires chan­ging our Thread type and thread func­tion:

data Thread m = Thread
  { threadK     :: PrimOp m
  , thread­B­lock :: Maybe MVarId
  , threadExc   :: [Handler m]                              -- <- new
  }

data Handler m where
  Handler :: E.Ex­cep­tion e => (e -> PrimOp m) -> Handler m

thread :: PrimOp m -> Thread m
thread k = Thread
  { threadK     = k
  , thread­B­lock = Nothing
  , threadExc   = []                                        -- <- new
  }

As Ex­cep­tion is a sub­class of Type­able, given some ex­cep­tion value we’re able to look for the first matching hand­ler:

raise :: E.Ex­cep­tion e => e -> Thread m -> Maybe (Thread m)
raise exc thrd = go (threadExc thrd) where
  go (Handler h:hs) = case h <$> E.fromEx­cep­tion exc' of
    Just pop -> Just (thrd { threadK = pop, thread­B­lock = Nothing, threadExc = hs })
    Nothing  -> go hs
  go [] = Nothing

  exc' = E.to­Ex­cep­tion exc

If raise re­turns a Just, then a handler was found and entered. Oth­er­wise, no handler ex­ists and the thread should be re­moved from the Threads col­lec­tion. This can be ex­pressed rather nicely as M.up­date . raise.

Now we have enough sup­port to im­ple­ment the primops:

step­Thread {- ... -}
  where
    -- ...
    go (Throw e) =
      simple (M.up­date (raise e) tid)
    go (Catch (MiniFu ma) h k) = simple . ad­just $ \thrd -> thrd
      { threadK   = K.run­Cont ma (PopH . k)
      , threadExc =
        let h' exc = K.run­Cont (run­MiniFu (h exc)) k
        in Handler h' : threadExc thrd
      }
    go (PopH k) = simple . ad­just $ \thrd -> thrd
      { threadK   = k
      , threadExc = tail (threadExc thrd)
      }

Let’s break that down:

It’s im­portant that the ex­cep­tion con­tinu­ation doesn’t use PopH to re­move it­self: that hap­pens in raise when an ex­cep­tion is thrown. When writing this sec­tion I real­ised I’d made that mis­take in de­jafu (#139)!

So what?

So now we can use syn­chronous ex­cep­tions! Here’s an in­cred­ibly con­trived ex­ample:

{-# LAN­GUAGE Scoped­TypeVari­ables #-}

im­port Con­trol.­Monad (join)

ex­ample_­sync :: MiniFu m Int
ex­ample_­sync = do
  a <- newEmptyMVar
  fork (put­MVar a (pure 1))
  fork (put­MVar a (throw E.NonTer­min­a­tion))
  fork (put­MVar a (throw E.Al­loc­a­tion­Lim­i­tEx­ceeded))
  catch
    (catch
      (join (read­MVar a))
      (\(_ :: E.Al­loc­a­tion­Lim­i­tEx­ceeded) -> pure 2))
    (\(_ :: E.NonTer­min­a­tion) -> pure 3)

de­mo_­sync :: IO ()
de­mo_­sync = do
  g <- R.newStdGen
  print . fst =<< minifu ran­dom­Sched g ex­ample_­sync

If we run this a few times in ghci, we can see the dif­ferent ex­cep­tions being thrown and caught (res­ulting in dif­ferent out­puts):

λ> de­mo_­sync
Just 1
λ> de­mo_­sync
Just 3
λ> de­mo_­sync
Just 3
λ> de­mo_­sync
Just 2

Mon­ad­Throw and Mon­ad­Catch

Mon­ad­Conc has a bunch of su­per­classes, and we can now im­ple­ment two of them!

im­port qual­i­fied Con­trol.­Mon­ad.Catch as EM

in­stance EM.Mon­ad­Throw (MiniFu m) where
  throwM = -- 'throw' from above

in­stance EM.Mon­ad­Catch (MiniFu m) where
  catch = -- 'catch' from above

The ex­cep­tions package provides the Mon­ad­Throw, Mon­ad­Catch, and Mon­ad­Mask type­classes, so we can talk about ex­cep­tions in a wider con­text than just IO. We’ll get on to Mon­ad­Mask when we look at asyn­chronous ex­cep­tions.

In­com­plete­ness!

It is with ex­cep­tions that we hit the first thing we can’t do in MiniFu.

When in IO, we can catch ex­cep­tions from pure code:

λ> im­port Con­trol.Ex­cep­tion
λ> eval­uate un­defined `catch` \e -> put­StrLn ("Got " ++ show (e :: SomeEx­cep­tion))
Got Pre­lude.un­defined
Call­Stack (from Has­Call­Stack):
  er­ror, called at lib­rar­ies/­base/GH­C/Er­r.h­s:79:14 in base:GH­C.Err
  un­defined, called at <in­ter­act­ive>:5:10 in in­ter­act­ive:Ghci2

But we can’t do that in MiniFu, as there’s no suit­able eval­uate func­tion.

Should there be an eval­uate in the Mon­ad­Conc class? I’m un­con­vinced, as it’s not really a con­cur­rency op­er­a­tion.

Should we con­strain the m in MiniFu m to be a Mon­adIO, which would let us call eval­uate? Per­haps, that would cer­tainly be a way to do it, and I’m cur­rently in­vest­ig­ating the ad­vant­ages of an IO base monad for de­jafu (al­though ori­gin­ally for a dif­ferent reas­on).

Asyn­chronous ex­cep­tions

Asyn­chronous ex­cep­tions are like syn­chronous ex­cep­tions, ex­cept for two de­tails:

  1. They are thrown to a thread iden­ti­fied by ThreadId. We can do this already with raise.
  2. Raising the ex­cep­tion may be blocked due to the target thread’s masking state. We need to do some extra work to im­ple­ment this.

When a thread is masked, at­tempting to de­liver an asyn­chronous ex­cep­tion to it will block. There are three masking states:

So we’ll add the cur­rent masking state to our Thread type, de­faulting to Un­masked, and also ac­count for blocking on an­other thread:

data Thread m = Thread
  { threadK     :: PrimOp m
  , thread­B­lock :: Maybe (Either ThreadId MVarId)           -- <- new
  , threadExc   :: [Handler m]
  , thread­Mask  :: E.Mask­ing­State                           -- <- new
  }

thread :: PrimOp m -> Thread m
thread k = Thread
  { threadK     = k
  , thread­B­lock = Nothing
  , threadExc   = []
  , thread­Mask  = E.Un­masked                                -- <- new
  }

We’ll also need a primop to set the masking state:

data PrimOp m where
  -- ...
  Mask :: E.Mask­ing­State -> PrimOp m -> PrimOp m

Which has a fairly straight­for­ward im­ple­ment­a­tion:

step­Thread {- ... -}
  where
    -- ...
    go (Mask ms k) = simple . ad­just $ \thrd -> thrd
      { threadK    = k
      , thread­Mask = ms
      }

Fi­nally, we need to make sure that if an ex­cep­tion is raised, and we jump into an ex­cep­tion hand­ler, the masking state gets reset to what it was when the handler was cre­ated. This means we need a small change to the Catch primop:

step­Thread {- ... -}
  where
    -- ...
    go (Catch (MiniFu ma) h k) = simple . ad­just $ \thrd -> thrd
      { threadK   = K.run­Cont ma (PopH . k)
      , threadExc =
        let ms0 = thread­Mask thrd                           -- <- new
            h' exc = flip K.run­Cont k $ do
              K.cont (\c -> Mask ms0 (c ()))                -- <- new
              run­MiniFu (h exc)
        in Handler h' : threadExc thrd
      }

Al­right, now we have enough back­ground to ac­tu­ally im­ple­ment the user­-­fa­cing op­er­a­tions.

Throwing

To throw an asyn­chronous ex­cep­tion, we’re going to need a new primop:

data PrimOp m where
  -- ...
  ThrowTo :: E.Ex­cep­tion e => ThreadId -> e -> PrimOp m -> PrimOp m

Which has a cor­res­ponding wrapper func­tion:

throwTo :: E.Ex­cep­tion e => ThreadId -> e -> MiniFu m ()
throwTo tid e = MiniFu (K.cont (\k -> ThrowTo tid e (k ())))

Let’s think about the im­ple­ment­a­tion of the ThrowTo primop. It first needs to check if the target thread is in­ter­rupt­ible and, if so, raises the ex­cep­tion in that thread; if not, it blocks the cur­rent thread. A thread is in­ter­rupt­ible if its masking state is Un­masked, or Masked­In­ter­rupt­ible and it’s cur­rently blocked.

Let’s en­cap­su­late that lo­gic:

im­port Data.Maybe (is­Just)

isIn­ter­rupt­ible :: Thread m -> Bool
isIn­ter­rupt­ible thrd =
  thread­Mask thrd == E.Un­masked ||
  (thread­Mask thrd == E.Masked­In­ter­rupt­ible && is­Just (thread­B­lock thrd))

Given that, the im­ple­ment­a­tion of ThrowTo is straight­for­ward:

step­Thread {- ... -}
  where
    -- ...
    go (ThrowTo threadid e k) = simple $ case M.lookup threadid threads of
      Just t
        | isIn­ter­rupt­ible t -> goto k . M.up­date (raise e) threadid
        | oth­er­wise         -> block (Left threadid)
      Nothing -> goto k

First, check if the thread ex­ists. Then check if it’s in­ter­rupt­ible: if it is, raise the ex­cep­tion, oth­er­wise block. If the thread doesn’t exist any more, just con­tinue.

Now we just need to handle un­blocking threads which are blocked in ThrowTo. For that, we’ll go back to the run func­tion and add a pass to un­block threads if the cur­rent one is in­ter­rupt­ible after it pro­cesses its ac­tion:

run :: C.Mon­ad­Conc m => Sched­uler s -> s -> PrimOp m -> m s
run sched s0 = go s0 . ini­tialise where
  go s (threads, idsrc)
    | ini­tial­ThreadId `M.member` threads = case run­nable threads of
      Just tids -> do
        let (chosen, s') = sched tids s
        (threads', id­src') <- step­Thread chosen (threads, idsrc)
        let threads'' = if (isIn­ter­rupt­ible <$> M.lookup chosen threads') /= Just False
                        then un­block (Left chosen) threads'
                        else threads'
            -- ^- new
        go s' (threads'', id­src')
      Nothing -> pure s
    | oth­er­wise = pure s

  run­nable = nonEmpty . M.keys . M.filter (is­Nothing . thread­B­lock)

  ini­tial­ThreadId = fst (nextThreadId ini­tial­Id­Source)

So after step­ping a thread, we un­block every thread blocked on it if it either doesn’t ex­ist, of if it does exist and is in­ter­rupt­ible. It’s much more ro­bust to do this once here than every­where in step­Thread which might cause the thread to be­come in­ter­rupt­ible.

Masking and Mon­ad­Mask

There are two op­er­a­tions at the pro­gram­mer’s dis­posal to change the masking state of a thread, mask and un­in­ter­rupt­ibleMask. Here’s what the MiniFu types will look like:

{-# LAN­GUAGE RankN­Types #-}

mask                :: ((forall x. MiniFu m x -> MiniFu m x) -> MiniFu m a) -> MiniFu m a
un­in­ter­rupt­ibleMask :: ((forall x. MiniFu m x -> MiniFu m x) -> MiniFu m a) -> MiniFu m a

Each takes an ac­tion to run, and runs it as either Masked­In­ter­rupt­ible or MaskedUn­in­ter­rupt­ible. The ac­tion is provided with a poly­morphic call­back to run a sub­com­pu­ta­tion with the ori­ginal masking state.

This is going to need, you guessed it, a new primop! We could modify the Mask primop to do this job as well, but I think it’s a little clearer to have two sep­arate ones:

data PrimOp m where
  -- ...
  In­Mask :: E.Mask­ing­State -> ((forall x. MiniFu m x -> MiniFu m x) -> MiniFu m a)
         -> (a -> PrimOp m) -> PrimOp m

And here’s the im­ple­ment­a­tions of our masking func­tions:

mask ma = MiniFu (K.cont (In­Mask E.Masked­In­ter­rupt­ible ma))
un­in­ter­rupt­ibleMask ma = MiniFu (K.cont (In­Mask E.MaskedUn­in­ter­rupt­ible ma))

We can now fulfil an­other re­quire­ment of Mon­ad­Conc: a Mon­ad­Mask in­stance!

in­stance Mon­ad­Mask (MiniFu m) where
  mask = -- 'mask' from above
  un­in­ter­rupt­ibleMask = -- 'un­in­ter­rupt­ibleMask' from above

The very last piece of the puzzle for ex­cep­tion hand­ling in MiniFu is to im­ple­ment this In­Mask primop. Its type looks quite in­tense, but the im­ple­ment­a­tion is really not that bad. There are three parts:

step­Thread {- ... -}
  where
    -- ...
    go (In­Mask ms ma k) = simple . ad­just $ \thrd -> thrd
      { threadK =
        let ms0 = thread­Mask thrd

            -- (1) we need to con­struct the poly­morphic ar­gu­ment func­tion
            umask :: MiniFu m x -> MiniFu m x
            umask (MiniFu mx) = MiniFu $ do
              K.cont (\c -> Mask ms0 (c ()))
              x <- mx
              K.cont (\c -> Mask ms (c ()))
              pure x

        -- (2) we need to run the inner con­tinu­ation, re­set­ting the masking state
        -- when done
        in K.run­Cont (run­MiniFu (ma umask)) (Mask ms0 . k)

      -- (3) we need to change the masking state
      , thread­Mask = ms
      }

The ex­plicit type sig­na­ture on umask is needed be­cause we’re using GADTs, which im­plies Mono­Loc­al­Binds, which pre­vents the poly­morphic type from being in­ferred. We could achieve the same ef­fect by turning on NoMono­Loc­al­Binds.

Demo

Now we have asyn­chronous ex­cep­tions, check it out:

ex­ample_a­sync :: MiniFu m String
ex­ample_a­sync = do
  a <- newEmptyMVar
  tid <- fork (put­MVar a "hello from the other thread")
  throwTo tid E.ThreadKilled
  read­MVar a

de­mo_a­sync :: IO ()
de­mo_a­sync = do
  g <- R.newStdGen
  print . fst =<< minifu ran­dom­Sched g ex­ample_a­sync

See:

λ> de­mo_a­sync
Just "hello from the other thread"
λ> de­mo_a­sync
Just "hello from the other thread"
λ> de­mo_a­sync
Nothing

Next time…

We have come to the end of part 2! Again, I hope you en­joyed this post, any feed­back is wel­come. This is all on GitHub, and you can see the code we ended up with at the “post-02” tag.

Once again, I have some home­work for you. Your task, should you choose to ac­cept it, is to im­ple­ment:

try­Put­MVar :: MVar m a -> a -> MiniFu m Bool

tryTakeMVar :: MVar m a -> MiniFu m (Maybe a)

tryRead­MVar :: MVar m a -> MiniFu m (Maybe a)

Solu­tions will be up in a few days, as be­fore, at the “home­work-02” tag.

Stay tuned be­cause next time we’re going to im­ple­ment STM: all of it in one go. Then we can fi­nally get on to the test­ing.


Thanks to Will Sewell for reading an earlier draft of this post.