Queueing IO actions

Posted in: haskell.

Lately, I’ve been working a bit on an automated system to count laps for a running contest called 12 Urenloop (Dutch). We’re working on this application with our computer science club, and I’m writing the counting backend in Haskell (the repo for the full application is here).

I came accross a problem that turned out to have a quite easy and elegant solution – or, at least, more easy and elegant than I initially thought.

After cleaning it up a little, I decided to provide it here as a blogpost. I think it’s a decent example of how one can not only build useful abstractions for pure problems in Haskell, but also for very imperative-like and impure code. Additonally, I vaguely recall people on #haskell saying we need more tutorials on real-world stuff.

The problem is quite simple: a component generates a number of requests to a REST API running on another machine. We need to implement this in a fail-safe way: basically, the other machine or the network can go down for a while. In this case we should cache all requests and try them again later (but in the same order).

Since this is written in Literate Haskell, we first have a few imports you can safely skip over.

> module Queue where
> 
> import Control.Applicative ((<$>))
> import Control.Concurrent.MVar (MVar, newMVar, takeMVar, putMVar, modifyMVar_)
> import Control.Concurrent (threadDelay, forkIO)
> import Control.Exception (try, IOException)
> import Control.Monad (forever)
> import Data.Sequence (Seq, (|>))
> import qualified Data.Sequence as S
> import System.IO (hPutStrLn, stderr)

Let’s step away from the problem of making REST API calls and come up with a more general, informal description: we have some sort of “action” which runs or fails. If the action fails, we need it try it again later.

> data Retry = Retry | Done
>            deriving (Show, Eq, Ord)

The above datatype represents a nice return code for the “actions” we need. We now define an “action” as a Retryable, simply any IO returning an exit code as described above:

> type Retryable = IO Retry

We want our queue to be thread-safe. For this purpose, a simple MVar will do. Our queue will be represented by a Sequence. We could also use a simpler queue data structure as given by Okasaki, but we’ll stick with Sequence since it’s in the commonly used containers package.

And so we define our Queue: an MVar for thread-safety around our actual queue.

> newtype Queue = Queue {unQueue :: MVar (Seq Retryable)}

People unfamiliar with the Haskell language might be confused at this point: we used data, type, and newtype – three different ways to create a type! Let’s elaborate on this a little:

A first operation we can define is the creation of new, empty Queue.

> makeQueue :: Int -> IO Queue

When an action fails, we will wait a specified delay before we try again. This is the first parameter: the delay specified in seconds.

> makeQueue delay = do
>     queue <- Queue <$> newMVar S.empty
>     _ <- forkIO $ forever $ threadDelay (delay * 1000000) >> pop queue
>     return queue

The implementation is pretty straight-forward:

This means pop will be called automatically – this makes sense, since we want failed actions to be retried whenever possible. The user never has to call pop manually.

Pushing a new Retryable on the queue has an even simpler definition:

> push :: Queue -> Retryable -> IO ()
> push queue retryable = do
>     modifyMVar_ (unQueue queue) $ return . (|> retryable)
>     pop queue

We modify our thread-safe variable by appending (|> is appending to the right side of a Sequence) our new Retryable. Then we immediately try to pop from the queue. This final pop gives us the nice property that all actions are performed immediately after being pushed in an everything-works-fine-scenario.

The definition of pop holds the main logic for our queue-based system, and it’s defition looks more complicated, mostly because we need to handle all different cases:

> pop :: Queue -> IO ()
> pop queue@(Queue mvar) = do
>     q <- takeMVar mvar
>     if S.null q then putMVar mvar S.empty
>                 else do r <- q `S.index` 0
>                         case r of Retry -> putMVar mvar q
>                                   Done  -> do putMVar mvar (S.drop 1 q)
>                                               pop queue

The reasoning behind it, however, follows simple rules: if the queue we find is empty, we restore an empty queue. Otherwise, we run the action we find in the front of the queue (S.index q 0), and:

These twenty-or-so lines of source code were all that is needed to implement our thread-safe queueing system for IO actions! Now, we’re going to add a little more code to make it easier for users.

We first create a function to convert an IO action without any exit code into a Retryable, simply assuming it succeeds:

> assumeSuccesful :: IO () -> Retryable
> assumeSuccesful action = action >> return Done

We can also create a wrapper takes a Retryable (usually one created by the function above) and converts it to another Retryable which will yield Retry when an IOException occurs:

> wrapIOException :: Retryable -> Retryable
> wrapIOException retryable = do
>     result <- try retryable
>     case result of Left e -> failed e >> return Retry
>                    Right r -> return r
>   where
>     failed :: IOException -> IO ()
>     failed e = hPutStrLn stderr $ "Queue.wrapIOException: " ++ show e

Other wrappers are possible – for example, in the application I am writing, I had an IO action which performed an HTTP request and returned Done only when the HTTP response code is a 2xx success code.

If you want to play around with this code, let’s define a simple test function:

> test :: IO ()
> test = do
>     queue <- makeQueue 1
>     let action = readFile "/tmp/foo" >>= putStr
>     push queue $ wrapIOException $ assumeSuccesful action

Load this file in GHCI, run test, wait for a bit, and then write something to /tmp/foo. Success!

Comments

ce0f13b2-4a83-4c1c-b2b9-b6d18f4ee6d2