Stack can perform actions during building concurrently. I wanted to understand how it acheived that.
IORef
As a preliminary, Haskell provides for mutable references, in the IO
monad. For example, the code below makes use of the mutable variable (of type IORef Int
) created by action newIORef :: a -> IO (IORef a)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
module Main (main) where import Control.Monad ( replicateM_ ) import Data.IORef ( IORef, newIORef, readIORef, writeIORef ) main :: IO () main = do count <- newIORef (0 :: Int) replicateM_ 10 (incrCount count) incrCount :: IORef Int -> IO () incrCount count = do n <- readIORef count writeIORef count (n + 1) print n |
The action incrCount count
is replicated 10 times. The action increments the Int
referenced by count
.
That code will execute in a single thread. It outputs the integers 0
to 9
sequentially to the standard output channel.
replicateConcurrently_ and TVar
As a second preliminary, the code below does something similar, but in more than one thread:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
module Main (main) where -- From the async package: import Control.Concurrent.Async ( replicateConcurrently_ ) -- From the stm package: import Control.Concurrent.STM ( TVar, atomically, newTVarIO, readTVar, writeTVar ) main :: IO () main = do count <- newTVarIO (0 :: Int) replicateConcurrently_ 10 (incrCount count) incrCount :: TVar Int -> IO () incrCount count = do n <- atomically $ do n <- readTVar count writeTVar count (n + 1) pure n print n |
Two key aspects of the code are replicateConcurrently_
and atomically
. replicateConcurrently_ :: Int -> IO a -> IO ()
performs the given action in the given number of threads (here, 10
) and ignores the results of the actions.
atomically :: STM a -> IO a
performs a series of STM
actions (see further below) atomically. Atomicity is a guarantee that either all of the actions will be performed or none of them will be.
STM
is a monad that supports atomic memory transactions (STM is an abbreviation of Software Transactional Memory).
newTVar :: a -> STM (TVar a)
creates a transactional variable – that is, a shared memory location that supports atomic memory transactions. newTVarIO :: a -> IO (TVar a)
does the same, but in the IO
monad.
The code will execute in more than one thread. The atomicity guarantee means that more than one thread will not be incrementing the same Int
value at the same time. It outputs the integers 0 to 9 to the standard output channel, but not sequentially.
Stack
Stack’s module Control.Concurrent.Execute
has two key functions runActions
(exported) and runActions'
(not exported). runActions
is:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
runActions :: Int -- ^ threads -> Bool -- ^ keep going after one task has failed -> [Action] -> (TVar Int -> TVar (Set ActionId) -> IO ()) -- ^ progress updated -> IO [SomeException] runActions threads keepGoing actions withProgress = do es <- ExecuteState <$> newTVarIO (sortActions actions) -- esActions <*> newTVarIO [] -- esExceptions <*> newTVarIO Set.empty -- esInAction <*> newTVarIO 0 -- esCompleted <*> pure keepGoing -- esKeepGoing _ <- async $ withProgress (esCompleted es) (esInAction es) if threads <= 1 then runActions' es else replicateConcurrently_ threads $ runActions' es readTVarIO $ esExceptions es |
Ignoring the withProgress
function and threads <= 1
for now, the runActions
action creates a number of transactional variables in the IO
monad, as a value of type ExecuteState
. It then replicateConcurrently_ threads (runActions' es)
. That is, the same action (runActions' es
) will be performed in the given number of threads.
runActions'
is where all the action is:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
runActions' :: ExecuteState -> IO () runActions' ExecuteState {..} = loop where breakOnErrs :: STM (IO ()) -> STM (IO ()) breakOnErrs inner = do errs <- readTVar esExceptions if null errs || esKeepGoing then inner else pure $ pure () withActions :: ([Action] -> STM (IO ())) -> STM (IO ()) withActions inner = do as <- readTVar esActions if null as then pure $ pure () else inner as loop :: IO () loop = join $ atomically $ breakOnErrs $ withActions $ \as -> case break (Set.null . actionDeps) as of (_, []) -> do inAction <- readTVar esInAction if Set.null inAction then do unless esKeepGoing $ modifyTVar esExceptions (toException InconsistentDependenciesBug:) pure $ pure () else retry (xs, action:ys) -> do inAction <- readTVar esInAction case actionConcurrency action of ConcurrencyAllowed -> pure () ConcurrencyDisallowed -> unless (Set.null inAction) retry let as' = xs ++ ys remaining = Set.union (Set.fromList $ map actionId as') inAction writeTVar esActions as' modifyTVar esInAction (Set.insert $ actionId action) pure $ mask $ \restore -> do eres <- try $ restore $ actionDo action ActionContext { acRemaining = remaining , acDownstream = downstreamActions (actionId action) as' , acConcurrency = actionConcurrency action } atomically $ do modifyTVar esInAction (Set.delete $ actionId action) modifyTVar esCompleted (+1) case eres of Left err -> modifyTVar esExceptions (err:) Right () -> let dropDep a = a { actionDeps = Set.delete (actionId action) $ actionDeps a } in modifyTVar esActions $ map dropDep restore loop |
runActions' ExecuteState {..}
is equated with loop
, which in turn is equated with join $ atomically $ breakOnErrs $ withActions ...
.
We can see from the type of breakOnErrs
that the application of atomically
will have type IO (IO ())
. The application of join :: Monad m => m (m a) -> m a
will have the type of an action (IO ()
).
If there are exceptions and esKeepGoing
is False, then breakOnErrs
lifts IO ()
(do nothing) into STM
. Otherwise it equates to the withActions ...
STM
action.
If there are no actions (referenced by the transactional variable esActions
) then that STM
action lifts IO ()
into STM
. Otherwise, it applies its function ([Action] -> STM (IO ())
) to the list of actions.
That function first breaks the list of actions into a list of those that are dependent on other actions and a list where the first action (if any) is not dependent on other actions: break (Set.null . actionDeps) as
.
If all the actions are dependent on others ((_, [])
) but no actions are ‘in action’, then something has gone wrong. Otherwise the STM
action to which atomically
is applied is tried again (retry
).
Otherwise, the action with no dependencies is distinguished ((xs, action:ys)
).
If that action does not allow concurrency (actionConcurrency action
is ConcurrencyDisallowed
) and at least one action is ‘in action’, then the STM
action to which atomically
is applied is tried again.
The action is removed from the list of actions referenced by esActions
(writeTVar esActions
xs ++ ys). The action is inserted into the set of actions referenced by esInAction
(modifyTVar esInAction (Set.insert $ actionId action)
).
The action is then ‘done’, with a context (ActionContext
) that sets the remaining actions as the xs
, the ys
and the other actions ‘in action’; the downstream actions as the xs
and ys
that depend on the action; and the concurrency of the action as its actionConcurrency
. Then, atomically
, the action is deleted from the set of actions referenced by esInAction
; the Int
referenced by esCompleted
is incremented; and (if it succeeded) the action is deleted as a dependency of the actions referenced by esActions
. If the action did not succeed, the exception is appended to the list of exceptions referenced by esExceptions
. The action ends with loop
. This overall action is lifted into STM
.
The action is masked (an unlifted version of mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b
; mask $ \restore -> do ...
) from asynchronous exceptions. However, the prevailing masking state is restored when the action is ‘done’ (eres <- try $ restore $ actionDo ...
) and for the final loop
(restore loop
).
If there are no exceptions, loop
loops until the list of actions referenced by esActions
is empty.