Welcome back to my series on implementing a concurrency testing library for Haskell. This is part 2 of the series, and today we’ll implement exceptions. If you missed part 1, you can read it here.
As before, all code is available on GitHub. The code for this post is under the “post-02” tag.
Did you do last time’s homework task? It was to implement this interface:
data CRef m a = -- ...
newCRef :: a -> MiniFu m (CRef m a)
readCRef :: CRef m a -> MiniFu m a
writeCRef :: CRef m a -> a -> MiniFu m ()
atomicModifyCRef :: CRef m a -> (a -> (a, b)) -> MiniFu m b
Here are my solutions, available at the “homework-01” tag:
- (
2070bdf
) Add theCRef
type, thePrimOp
constructors, and the wrapper functions - (
188eec5
) Implement the primops
I also made some changes, available at the “pre-02” tag:
- (
7ce6e41
) Add a helper for primops which don’t create any identifiers - (
2419796
) Move some definitions into an internal module - (
9c49f9d
) Change the type of theblock
helper toMVarId -> Threads m -> Threads m
- (
dabd84b
) ImplementreadMVar
Now on to the show…
Synchronous exceptions
We can’t implement exceptions with what we have already. We’re going to need some new primops. I think you’re getting a feel for how this works now, so I won’t drag this out. Here we go:
import qualified Control.Exception as E
data PrimOp m where
-- ...
Throw :: E.Exception e => e -> PrimOp m
Catch :: E.Exception e => MiniFu m a -> (e -> MiniFu m a) -> (a -> PrimOp m)
-> PrimOp m
PopH :: PrimOp m -> PrimOp m
throw :: E.Exception e => e -> MiniFu m a
throw e = MiniFu (K.cont (\_ -> Throw e))
catch :: E.Exception e => MiniFu m a -> (e -> MiniFu m a) -> MiniFu m a
catch act h = MiniFu (K.cont (Catch act h))
Throwing an exception with throw
jumps back to the closest enclosing catch
with an exception handler of the appropriate type, killing the thread if there is none. The PopH
primop will pop the top exception handler from the stack. We’ll insert those as appropriate when entering a catch
.
Before we can actually implement these primops, we need to give threads a place to store their exception handlers. You might have guessed it when I said “stack”: we’ll just give every thread a list of them. This requires changing our Thread
type and thread
function:
data Thread m = Thread
{ threadK :: PrimOp m
, threadBlock :: Maybe MVarId
, threadExc :: [Handler m] -- <- new
}
data Handler m where
Handler :: E.Exception e => (e -> PrimOp m) -> Handler m
thread :: PrimOp m -> Thread m
thread k = Thread
{ threadK = k
, threadBlock = Nothing
, threadExc = [] -- <- new
}
As Exception
is a subclass of Typeable
, given some exception value we’re able to look for the first matching handler:
raise :: E.Exception e => e -> Thread m -> Maybe (Thread m)
raise exc thrd = go (threadExc thrd) where
go (Handler h:hs) = case h <$> E.fromException exc' of
Just pop -> Just (thrd { threadK = pop, threadBlock = Nothing, threadExc = hs })
Nothing -> go hs
go [] = Nothing
exc' = E.toException exc
If raise
returns a Just
, then a handler was found and entered. Otherwise, no handler exists and the thread should be removed from the Threads
collection. This can be expressed rather nicely as M.update . raise
.
Now we have enough support to implement the primops:
stepThread {- ... -}
where
-- ...
go (Throw e) =
simple (M.update (raise e) tid)
go (Catch (MiniFu ma) h k) = simple . adjust $ \thrd -> thrd
{ threadK = K.runCont ma (PopH . k)
, threadExc =
let h' exc = K.runCont (runMiniFu (h exc)) k
in Handler h' : threadExc thrd
}
go (PopH k) = simple . adjust $ \thrd -> thrd
{ threadK = k
, threadExc = tail (threadExc thrd)
}
Let’s break that down:
Throw
just re-uses ourraise
function to either jump to the exception handler or kill the thread.Catch
changes the continuation of the thread to run the enclosed action, then do aPopH
action, then run the outer action. It also adds an exception continuation, which just runs the exception handler, then runs the outer action.PopH
just removes the head exception continuation.
It’s important that the exception continuation doesn’t use PopH
to remove itself: that happens in raise
when an exception is thrown. When writing this section I realised I’d made that mistake in dejafu (#139)!
So what?
So now we can use synchronous exceptions! Here’s an incredibly contrived example:
{-# LANGUAGE ScopedTypeVariables #-}
import Control.Monad (join)
example_sync :: MiniFu m Int
example_sync = do
a <- newEmptyMVar
fork (putMVar a (pure 1))
fork (putMVar a (throw E.NonTermination))
fork (putMVar a (throw E.AllocationLimitExceeded))
catch
(catch
(join (readMVar a))
(\(_ :: E.AllocationLimitExceeded) -> pure 2))
(\(_ :: E.NonTermination) -> pure 3)
demo_sync :: IO ()
demo_sync = do
g <- R.newStdGen
print . fst =<< minifu randomSched g example_sync
If we run this a few times in ghci, we can see the different exceptions being thrown and caught (resulting in different outputs):
λ> demo_sync
Just 1
λ> demo_sync
Just 3
λ> demo_sync
Just 3
λ> demo_sync
Just 2
MonadThrow and MonadCatch
MonadConc
has a bunch of superclasses, and we can now implement two of them!
import qualified Control.Monad.Catch as EM
instance EM.MonadThrow (MiniFu m) where
throwM = -- 'throw' from above
instance EM.MonadCatch (MiniFu m) where
catch = -- 'catch' from above
The exceptions package provides the MonadThrow
, MonadCatch
, and MonadMask
typeclasses, so we can talk about exceptions in a wider context than just IO
. We’ll get on to MonadMask
when we look at asynchronous exceptions.
Incompleteness!
It is with exceptions that we hit the first thing we can’t do in MiniFu.
When in IO
, we can catch exceptions from pure code:
λ> import Control.Exception
λ> evaluate undefined `catch` \e -> putStrLn ("Got " ++ show (e :: SomeException))
Got Prelude.undefined
CallStack (from HasCallStack):
error, called at libraries/base/GHC/Err.hs:79:14 in base:GHC.Err
undefined, called at <interactive>:5:10 in interactive:Ghci2
But we can’t do that in MiniFu
, as there’s no suitable evaluate
function.
Should there be an evaluate
in the MonadConc
class? I’m unconvinced, as it’s not really a concurrency operation.
Should we constrain the m
in MiniFu m
to be a MonadIO
, which would let us call evaluate
? Perhaps, that would certainly be a way to do it, and I’m currently investigating the advantages of an IO
base monad for dejafu (although originally for a different reason).
Asynchronous exceptions
Asynchronous exceptions are like synchronous exceptions, except for two details:
- They are thrown to a thread identified by
ThreadId
. We can do this already withraise
. - Raising the exception may be blocked due to the target thread’s masking state. We need to do some extra work to implement this.
When a thread is masked, attempting to deliver an asynchronous exception to it will block. There are three masking states:
Unmasked
, asynchronous exceptions are unmasked.MaskedInterruptible
, asynchronous exceptions are masked, but blocked operations may still be interrupted.MaskedUninterruptible
, asynchronous exceptions are masked, and blocked operations may not be interrupted.
So we’ll add the current masking state to our Thread
type, defaulting to Unmasked
, and also account for blocking on another thread:
data Thread m = Thread
{ threadK :: PrimOp m
, threadBlock :: Maybe (Either ThreadId MVarId) -- <- new
, threadExc :: [Handler m]
, threadMask :: E.MaskingState -- <- new
}
thread :: PrimOp m -> Thread m
thread k = Thread
{ threadK = k
, threadBlock = Nothing
, threadExc = []
, threadMask = E.Unmasked -- <- new
}
We’ll also need a primop to set the masking state:
data PrimOp m where
-- ...
Mask :: E.MaskingState -> PrimOp m -> PrimOp m
Which has a fairly straightforward implementation:
stepThread {- ... -}
where
-- ...
go (Mask ms k) = simple . adjust $ \thrd -> thrd
{ threadK = k
, threadMask = ms
}
Finally, we need to make sure that if an exception is raised, and we jump into an exception handler, the masking state gets reset to what it was when the handler was created. This means we need a small change to the Catch
primop:
stepThread {- ... -}
where
-- ...
go (Catch (MiniFu ma) h k) = simple . adjust $ \thrd -> thrd
{ threadK = K.runCont ma (PopH . k)
, threadExc =
let ms0 = threadMask thrd -- <- new
h' exc = flip K.runCont k $ do
K.cont (\c -> Mask ms0 (c ())) -- <- new
runMiniFu (h exc)
in Handler h' : threadExc thrd
}
Alright, now we have enough background to actually implement the user-facing operations.
Throwing
To throw an asynchronous exception, we’re going to need a new primop:
data PrimOp m where
-- ...
ThrowTo :: E.Exception e => ThreadId -> e -> PrimOp m -> PrimOp m
Which has a corresponding wrapper function:
throwTo :: E.Exception e => ThreadId -> e -> MiniFu m ()
throwTo tid e = MiniFu (K.cont (\k -> ThrowTo tid e (k ())))
Let’s think about the implementation of the ThrowTo
primop. It first needs to check if the target thread is interruptible and, if so, raises the exception in that thread; if not, it blocks the current thread. A thread is interruptible if its masking state is Unmasked
, or MaskedInterruptible
and it’s currently blocked.
Let’s encapsulate that logic:
import Data.Maybe (isJust)
isInterruptible :: Thread m -> Bool
isInterruptible thrd =
threadMask thrd == E.Unmasked ||
(threadMask thrd == E.MaskedInterruptible && isJust (threadBlock thrd))
Given that, the implementation of ThrowTo
is straightforward:
stepThread {- ... -}
where
-- ...
go (ThrowTo threadid e k) = simple $ case M.lookup threadid threads of
Just t
| isInterruptible t -> goto k . M.update (raise e) threadid
| otherwise -> block (Left threadid)
Nothing -> goto k
First, check if the thread exists. Then check if it’s interruptible: if it is, raise the exception, otherwise block. If the thread doesn’t exist any more, just continue.
Now we just need to handle unblocking threads which are blocked in ThrowTo
. For that, we’ll go back to the run
function and add a pass to unblock threads if the current one is interruptible after it processes its action:
run :: C.MonadConc m => Scheduler s -> s -> PrimOp m -> m s
run sched s0 = go s0 . initialise where
go s (threads, idsrc)
| initialThreadId `M.member` threads = case runnable threads of
Just tids -> do
let (chosen, s') = sched tids s
(threads', idsrc') <- stepThread chosen (threads, idsrc)
let threads'' = if (isInterruptible <$> M.lookup chosen threads') /= Just False
then unblock (Left chosen) threads'
else threads'
-- ^- new
go s' (threads'', idsrc')
Nothing -> pure s
| otherwise = pure s
runnable = nonEmpty . M.keys . M.filter (isNothing . threadBlock)
initialThreadId = fst (nextThreadId initialIdSource)
So after stepping a thread, we unblock every thread blocked on it if it either doesn’t exist, of if it does exist and is interruptible. It’s much more robust to do this once here than everywhere in stepThread
which might cause the thread to become interruptible.
Masking and MonadMask
There are two operations at the programmer’s disposal to change the masking state of a thread, mask
and uninterruptibleMask
. Here’s what the MiniFu
types will look like:
{-# LANGUAGE RankNTypes #-}
mask :: ((forall x. MiniFu m x -> MiniFu m x) -> MiniFu m a) -> MiniFu m a
uninterruptibleMask :: ((forall x. MiniFu m x -> MiniFu m x) -> MiniFu m a) -> MiniFu m a
Each takes an action to run, and runs it as either MaskedInterruptible
or MaskedUninterruptible
. The action is provided with a polymorphic callback to run a subcomputation with the original masking state.
This is going to need, you guessed it, a new primop! We could modify the Mask
primop to do this job as well, but I think it’s a little clearer to have two separate ones:
data PrimOp m where
-- ...
InMask :: E.MaskingState -> ((forall x. MiniFu m x -> MiniFu m x) -> MiniFu m a)
-> (a -> PrimOp m) -> PrimOp m
And here’s the implementations of our masking functions:
mask ma = MiniFu (K.cont (InMask E.MaskedInterruptible ma))
uninterruptibleMask ma = MiniFu (K.cont (InMask E.MaskedUninterruptible ma))
We can now fulfil another requirement of MonadConc
: a MonadMask
instance!
instance MonadMask (MiniFu m) where
mask = -- 'mask' from above
uninterruptibleMask = -- 'uninterruptibleMask' from above
The very last piece of the puzzle for exception handling in MiniFu is to implement this InMask
primop. Its type looks quite intense, but the implementation is really not that bad. There are three parts:
stepThread {- ... -}
where
-- ...
go (InMask ms ma k) = simple . adjust $ \thrd -> thrd
{ threadK =
let ms0 = threadMask thrd
-- (1) we need to construct the polymorphic argument function
umask :: MiniFu m x -> MiniFu m x
umask (MiniFu mx) = MiniFu $ do
K.cont (\c -> Mask ms0 (c ()))
x <- mx
K.cont (\c -> Mask ms (c ()))
pure x
-- (2) we need to run the inner continuation, resetting the masking state
-- when done
in K.runCont (runMiniFu (ma umask)) (Mask ms0 . k)
-- (3) we need to change the masking state
, threadMask = ms
}
The explicit type signature on umask
is needed because we’re using GADTs
, which implies MonoLocalBinds
, which prevents the polymorphic type from being inferred. We could achieve the same effect by turning on NoMonoLocalBinds
.
Demo
Now we have asynchronous exceptions, check it out:
example_async :: MiniFu m String
example_async = do
a <- newEmptyMVar
tid <- fork (putMVar a "hello from the other thread")
throwTo tid E.ThreadKilled
readMVar a
demo_async :: IO ()
demo_async = do
g <- R.newStdGen
print . fst =<< minifu randomSched g example_async
See:
λ> demo_async
Just "hello from the other thread"
λ> demo_async
Just "hello from the other thread"
λ> demo_async
Nothing
Next time…
We have come to the end of part 2! Again, I hope you enjoyed this post, any feedback is welcome. This is all on GitHub, and you can see the code we ended up with at the “post-02” tag.
Once again, I have some homework for you. Your task, should you choose to accept it, is to implement:
tryPutMVar :: MVar m a -> a -> MiniFu m Bool
tryTakeMVar :: MVar m a -> MiniFu m (Maybe a)
tryReadMVar :: MVar m a -> MiniFu m (Maybe a)
Solutions will be up in a few days, as before, at the “homework-02” tag.
Stay tuned because next time we’re going to implement STM: all of it in one go. Then we can finally get on to the testing.
Thanks to Will Sewell for reading an earlier draft of this post.