Haskell and concurrency

Stack can perform actions during building concurrently. I wanted to understand how it acheived that.

IORef

As a preliminary, Haskell provides for mutable references, in the IO monad. For example, the code below makes use of the mutable variable (of type IORef Int) created by action newIORef :: a -> IO (IORef a):

The action incrCount count is replicated 10 times. The action increments the Int referenced by count.

That code will execute in a single thread. It outputs the integers 0 to 9 sequentially to the standard output channel.

replicateConcurrently_ and TVar

As a second preliminary, the code below does something similar, but in more than one thread:

Two key aspects of the code are replicateConcurrently_ and atomically. replicateConcurrently_ :: Int -> IO a -> IO () performs the given action in the given number of threads (here, 10) and ignores the results of the actions.

atomically :: STM a -> IO a performs a series of STM actions (see further below) atomically. Atomicity is a guarantee that either all of the actions will be performed or none of them will be.

STM is a monad that supports atomic memory transactions (STM is an abbreviation of Software Transactional Memory).

newTVar :: a -> STM (TVar a) creates a transactional variable – that is, a shared memory location that supports atomic memory transactions. newTVarIO :: a -> IO (TVar a) does the same, but in the IO monad.

The code will execute in more than one thread. The atomicity guarantee means that more than one thread will not be incrementing the same Int value at the same time. It outputs the integers 0 to 9 to the standard output channel, but not sequentially.

Stack

Stack’s module Control.Concurrent.Execute has two key functions runActions (exported) and runActions' (not exported). runActions is:

Ignoring the withProgress function and threads <= 1 for now, the runActions action creates a number of transactional variables in the IO monad, as a value of type ExecuteState. It then replicateConcurrently_ threads (runActions' es). That is, the same action (runActions' es) will be performed in the given number of threads.

runActions' is where all the action is:

runActions' ExecuteState {..} is equated with loop, which in turn is equated with join $ atomically $ breakOnErrs $ withActions ....

We can see from the type of breakOnErrs that the application of atomically will have type IO (IO ()). The application of join :: Monad m => m (m a) -> m a will have the type of an action (IO ()).

If there are exceptions and esKeepGoing is False, then breakOnErrs lifts IO () (do nothing) into STM. Otherwise it equates to the withActions ... STM action.

If there are no actions (referenced by the transactional variable esActions) then that STM action lifts IO () into STM. Otherwise, it applies its function ([Action] -> STM (IO ())) to the list of actions.

That function first breaks the list of actions into a list of those that are dependent on other actions and a list where the first action (if any) is not dependent on other actions: break (Set.null . actionDeps) as.

If all the actions are dependent on others ((_, [])) but no actions are ‘in action’, then something has gone wrong. Otherwise the STM action to which atomically is applied is tried again (retry).

Otherwise, the action with no dependencies is distinguished ((xs, action:ys)).

If that action does not allow concurrency (actionConcurrency action is ConcurrencyDisallowed) and at least one action is ‘in action’, then the STM action to which atomically is applied is tried again.

The action is removed from the list of actions referenced by esActions (writeTVar esActions xs ++ ys). The action is inserted into the set of actions referenced by esInAction (modifyTVar esInAction (Set.insert $ actionId action)).

The action is then ‘done’, with a context (ActionContext) that sets the remaining actions as the xs, the ys and the other actions ‘in action’; the downstream actions as the xs and ys that depend on the action; and the concurrency of the action as its actionConcurrency. Then, atomically, the action is deleted from the set of actions referenced by esInAction; the Int referenced by esCompleted is incremented; and (if it succeeded) the action is deleted as a dependency of the actions referenced by esActions. If the action did not succeed, the exception is appended to the list of exceptions referenced by esExceptions. The action ends with loop. This overall action is lifted into STM.

The action is masked (an unlifted version of mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b; mask $ \restore -> do ...) from asynchronous exceptions. However, the prevailing masking state is restored when the action is ‘done’ (eres <- try $ restore $ actionDo ...) and for the final loop (restore loop).

If there are no exceptions, loop loops until the list of actions referenced by esActions is empty.