Explicit Parallelism via Thread Pools

March 13, 2009

I’m happy to say that this post is being finished up from my new home in Portland, Oregon! After many years of searching around I’ve finally decided to study at Portland State. Its probably optimisitic of me to think I’ll have more time as a graduate student, but I hope this is the beginning of much more involvement for me in the Haskell community.

Thread Pools from the Control-Engine Package

Control.Engine was recently released on hackage, providing a simple way to instantiate worker threads to split-up the processing of streaming data.  Its was originally developed as a spin-off library from my DHT and I’ve since generalized it to cover numerous cases.

Trivial Thread Pools

The trivial module Control.ThreadPool can cover static examples such as a recent question asked on the haskell-cafe:

I have a function that does some IO (takes a file path, read the file, parse, and return some data), and I would like to parallelize it, so that multiple files can be parsed in parallel.

‘Control.ThreadPool’ gives us an easy answer (but not as slick as the map reduce answer the person was asking for on cafe). First we have our typical fluff.

import Control.ThreadPool (threadPoolIO)
import System.IO (openFile, IOMode(..))
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Monad (forM_)
import qualified Data.ByteString.Lazy.Char8 as L

main = do
    as <- getArgs

As you can see below, we simply say how many threads we want in our thread pool and what action (or pure computation, using ‘threadPool’) we wish to perform. After that its just channels – send input in and read results out!

	(input,output) <- threadPoolIO nrCPU op
	mapM_ (writeChan input) as   -- input stream
	forM_ [1..length as] (\_ -> readChan output >>= print)
  where
  nrCPU = 4
  op f = do
	h <- openFile f ReadMode
	c <- L.hGetContents h
	let !x = length . L.words $ c
	hClose h
	return (f,x)

And while this does nothing to demonstrate paralellism, it does work:

[tom@Mavlo Test]$ ghc -O2 parLines.hs --make -threaded -fforce-recomp
[1 of 1] Compiling Main             ( web.hs, web.o )
Linking web ...
[tom@Mavlo Test]$ find ~/dev/Pastry -name *lhs | xargs ./parLines +RTS -N4 -RTS
("/home/tom/dev/Pastry/Network/Pastry/Module/Joiner.lhs",107)
("/home/tom/dev/Pastry/Network/Pastry/Config.lhs",14)
("/home/tom/dev/Pastry/Network/Pastry/Data/LeafSet.lhs",120)
("/home/tom/dev/Pastry/Network/Pastry/Data/Router.lhs",87)
("/home/tom/dev/Pastry/Network/Pastry/Data/RouteTable.lhs",75)
("/home/tom/dev/Pastry/Network/Pastry/Data/Address.lhs",152)

Control Engine Setup

The thread pools are simple, but what if you need more flexibility or power? What happens if you want to have an up-to-date state shared amoung the threads, or there’s a non-paralizable cheap computation you need to perform before the main operation? The answer is to use Control.Engine instead of Control.ThreadPool. The engine provides managed state, numerous hook location, and an abilty to inject information to mid-engine locations.

Control Engine

Injectors

The inject* calls can bypass the input hooks (injectPreMutator) or bypass everything besides the output hooks (injectPostMutator) – thus creating a ‘result’ that had no corrosponding ‘job’.

Hooks

Hooks are capable of modifying or filtering jobs or results. All hooks are of type state -> a -> IO (Maybe a); its important to note the type can not change and if a hook returns Nothing then the job or result stops there.

Hooks can either be input, pre-mutate, post-mutate, or output. Input and output hooks are ran in series on all jobs or results respectivly; this is intended for low computation tasks that shouldn’t be done in parallel later. Pre and post mutate hooks happen on the (parallel) worker threads before an after the main task, called the mutator.

Mutator

The engine consists of N mutator threads, which is the only operation capable of transforming the jobs into a different (result) type.

State Management

Control.Engine was built with the idea that jobs and state reads were frequent while alterations to the state were rare. A design decision was made to use STM to resolve all contention on state alterations and have a manager watch the TVar for change then bundle those changes in a quicker to read fashion (MVar) for the input, output, and worker threads.

The state provided to the hooks and mutator is always consistent but not guarenteed up-to-date. When modifications to the state occur a transactional variable is modified, which wakes the stateManager; in turn, the state manager updates the state MVar which is read by each thread before processing the next job. In the future IORefs might be used instead of the MVar – all contention is handled by the STM and the only writer for the MVar (future IORef) should be the State Manager.

Web Crawling

Now lets figure out how to help users who need more flexibility using Control.Engine instead of Control.ThreadPool.

MyCatVerbs, from #haskell, suggested a web crawler that uses URls as the job and the mutator (worker) can add all the links of the current page as new jobs while ignoring any URL that was already visited.  Lets start!

The imports aren’t too surprising – tagsoup, concurrent, bloomfilter and Control-Engine are the packages I draw on.

module Main where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Chan
import Control.Monad (forever, when)
import Control.Engine			-- Control-Engine
import Control.Exception as X
import Data.BloomFilter 		-- bloomfilter
import Data.BloomFilter.Hash		-- bloomfilter
import Data.BloomFilter.Easy		-- bloomfilter
import Data.IORef
import System.Environment (getArgs)
import Text.HTML.Download		-- tagsoup
import Text.HTML.TagSoup		-- tagsoup

type URL = String

data Job = GetURL URL | ParseHTML URL String deriving (Eq, Ord, Show)

main = do
	(nrCPU:url:_) <- getArgs

The library tries to remain flexible which makes you do a little more work but don’t let that scare you! It needs an IO action to get tasks and an IO action that delivers the results. Most people will probably just want a channel, but sockets or files would do just as well.

	input <- newChan
	output <- newChan

Starting the engine is a one line affair. You provide the number of threads, input, output, a mutator function and initial state. In return you are provided with an ‘Engine’ with which you can modify the hooks and use the injection points.

For this web crawler my ‘state’ is just a bloom filter of all visited URLs so I’ll keep that in the one hook its needed and declare the engine-wide state as a null – (). For the chosen task the mutator needs a way to add more jobs (more URLs) so as pages are parsed any new URLs can be queued for future crawling; this is handled via partial application of mutator funciton.

	eng <- initEngine (read nrCPU) (readChan input) (writeChan output) (mutator (writeChan input)) ()

As mentioned, I’ll use a bloom filter to keep the web crawler from re-visiting the same site many times. This should happen exactly once for each URL and is fairly fast so I’ll insert it as an ‘Input Hook’ which means a single thread will process all jobs before they get parsed out to the parallel thread pool.

	let initialBF = fromListB (cheapHashes nrHashes) nrBits []
            (nrBits, nrHashes) = suggestSizing 100000 (10 ** (-6))
	bf <- newIORef (initialBF,0)
	let bfHook = Hk (uniqueURL bf) 1 "Ensure URLs have not already been crawled"
	addInputHook eng bfHook

Finishing up main, we print all results then provide an initial URL. Notice we run forever – there’s no clean shutdown in this toy example.

	forkIO $ forever $ printResult output
	writeChan input (GetURL url)
	neverStop eng
  where
  neverStop eng = forever $ threadDelay maxBound

And from here on we’re just providing the worker routine that will run across all the threads and we’ll define the input hook. TagSoup performs all the hard work of downloading the page and parsing HTML. Just pull out the <a href=”…”> tags to add the new URLs as jobs before returning any results. In this example I decided to avoid any sort of error checking (ex: making sure this is an HTML document) and simply returning the number of words as a result.

mutator :: (Job -> IO ()) -> st -> Job -> IO (Maybe (URL,Int))
mutator addJob _ (GetURL url) = forkIO (do
	e <- X.try (openURL url) :: IO (Either X.SomeException String)
	case e of
		Right dat -> addJob (ParseHTML url dat)
		_ -> return () )
	>> return Nothing
mutator addJob _ (ParseHTML url dat) = do
	let !urls = getURLs dat
	    !len = length urls
	    fixed = map (\u -> if take 4 u /= "http" then url ++ '/' : u else u) urls
	mapM_ (addJob . GetURL) fixed
	return $ Just (url,len)
  where
  getURLs :: String -> [URL]
  getURLs s = 
	let tags = parseTags s
	in map snd (concatMap hrefs tags)
  hrefs :: Tag -> [(String,String)]
  hrefs (TagOpen "a" xs) = filter ( (== "href") . fst) xs
  hrefs _ = []

printResult :: (Show result) => Chan result -> IO ()
printResult c = readChan c >>= print

Filtering out non-unique URLs is just the bloom filter in action.

uniqueURL :: IORef (Bloom URL, Int) -> st -> Job -> IO (Maybe Job)
uniqueURL _ _ j@(ParseHTML _ _) = return $ Just j
uniqueURL bf _ j@(GetURL url) =  do
	(b,i) <- readIORef bf
	if elemB url b
		then putStrLn ("Reject: " ++ url) >> return Nothing
		else do writeIORef bf (insertB url b, i + 1)
			when (i `rem` 100 == 0) (print i)
			return $ Just j

Performance

P.S. No serious performance measurements have been made beyond extremely expensive (and trivially parallel) problems, so those don’t count.

14Mar2009 EDIT: Said something about Control.Engine before showing the diagram to make reading smoother.
EDIT2: ThreadPool example shown was a version using MD5, not length – oops! Fixed now.

About these ads

9 Responses to “Explicit Parallelism via Thread Pools”

  1. Don Stewart Says:

    Welcome to the Oregon! Hope to see you at Bailey’s soon, Thomas.

  2. Craig T. Nelson Says:

    This looks pretty dang cool!

    But something seems missing from the second code block – possibly a formatting problem? Or I am completely failing to grok something.

  3. tommd Says:

    Craig: Yep, code was missing but should be there now.

  4. Micah Says:

    Congrats on the move! We’ll have to get together next time I am at the top of the state. Of course you could always make a trip to Medford; I’m just about equidistant between you and Justin ;-)

    Just downloaded the 0.2 version of VisualHaskell, so I’ll be playing with that, though C# and PowerShell are getting most of my attention right now. I’ve got the time.

  5. wren ng thornton Says:

    Its probably optimisitic of me to think I’ll have more time as a graduate student, but I hope this is the beginning of much more involvement for me in the Haskell community.

    No need to be optimistic, you can make the Haskell part of your work as a grad student. Get in touch with Mark Jones and Tim Sheard if you haven’t already. (I think Sergio Antoy may also still be doing Haskell stuff.) PSU isn’t a research-heavy school but honestly it’s up there with UNSW, Edinburgh, and Glasgow for places to be working on Haskell.

  6. tommd Says:

    wren: Yes, I’ve exchanged a few e-mails with Mark (more with Andrew Tolmach) and am planning to take some of Masseys courses, which seem designed to give him an army of students to direct toward open-source projects :-). IOW, I’m aware of PSUs Haskell leanings and chose it for partly this reason. I’ve also been following Kennys recent changes to House (dubbed ‘Lighthouse’) – I’m very excited to start here.

    Micah: I’ve been following your powershell comments, if I were ever forced to use Windows again I’d probably learn it. As it stands I just wish hsh took off more.

    Edit: Corrected misleading “conversation” with Mark Jones.


  7. I’m just new to actually doing Haskell and I must say I definitely find this cool.

    I use thread pools a lot with C++, and even though I can do functional programming in C++, it doesn’t beat the expressiveness that Haskell allows. Thanks for posting the code and the overall idea.

  8. Bart Massey Says:

    Dang. You have divined my evil scheme to take over the open source world with Haskell bits. Now I must…utilize you.

    Seriously, welcome to Portland State! Drop me some email or give me a call on my cellphone, and we’ll set up a time to meet and chat about stuff. Looking forward to seeing you in my courses.


  9. The past two years have been marked by dramatic extremes for our economy and the construction industry. As a result, this period may be one of the most volatile on record when it comes to pricing of construction supplies, materials and services.


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: