I’m happy to say that this post is being finished up from my new home in Portland, Oregon! After many years of searching around I’ve finally decided to study at Portland State. Its probably optimisitic of me to think I’ll have more time as a graduate student, but I hope this is the beginning of much more involvement for me in the Haskell community.

Thread Pools from the Control-Engine Package

Control.Engine was recently released on hackage, providing a simple way to instantiate worker threads to split-up the processing of streaming data.  Its was originally developed as a spin-off library from my DHT and I’ve since generalized it to cover numerous cases.

Trivial Thread Pools

The trivial module Control.ThreadPool can cover static examples such as a recent question asked on the haskell-cafe:

I have a function that does some IO (takes a file path, read the file, parse, and return some data), and I would like to parallelize it, so that multiple files can be parsed in parallel.

‘Control.ThreadPool’ gives us an easy answer (but not as slick as the map reduce answer the person was asking for on cafe). First we have our typical fluff.

import Control.ThreadPool (threadPoolIO)
import System.IO (openFile, IOMode(..))
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Monad (forM_)
import qualified Data.ByteString.Lazy.Char8 as L

main = do
    as <- getArgs

As you can see below, we simply say how many threads we want in our thread pool and what action (or pure computation, using ‘threadPool’) we wish to perform. After that its just channels – send input in and read results out!

	(input,output) <- threadPoolIO nrCPU op
	mapM_ (writeChan input) as   -- input stream
	forM_ [1..length as] (\_ -> readChan output >>= print)
  where
  nrCPU = 4
  op f = do
	h <- openFile f ReadMode
	c <- L.hGetContents h
	let !x = length . L.words $ c
	hClose h
	return (f,x)

And while this does nothing to demonstrate paralellism, it does work:

[tom@Mavlo Test]$ ghc -O2 parLines.hs --make -threaded -fforce-recomp
[1 of 1] Compiling Main             ( web.hs, web.o )
Linking web ...
[tom@Mavlo Test]$ find ~/dev/Pastry -name *lhs | xargs ./parLines +RTS -N4 -RTS
("/home/tom/dev/Pastry/Network/Pastry/Module/Joiner.lhs",107)
("/home/tom/dev/Pastry/Network/Pastry/Config.lhs",14)
("/home/tom/dev/Pastry/Network/Pastry/Data/LeafSet.lhs",120)
("/home/tom/dev/Pastry/Network/Pastry/Data/Router.lhs",87)
("/home/tom/dev/Pastry/Network/Pastry/Data/RouteTable.lhs",75)
("/home/tom/dev/Pastry/Network/Pastry/Data/Address.lhs",152)

Control Engine Setup

The thread pools are simple, but what if you need more flexibility or power? What happens if you want to have an up-to-date state shared amoung the threads, or there’s a non-paralizable cheap computation you need to perform before the main operation? The answer is to use Control.Engine instead of Control.ThreadPool. The engine provides managed state, numerous hook location, and an abilty to inject information to mid-engine locations.

Control Engine

Injectors

The inject* calls can bypass the input hooks (injectPreMutator) or bypass everything besides the output hooks (injectPostMutator) – thus creating a ‘result’ that had no corrosponding ‘job’.

Hooks

Hooks are capable of modifying or filtering jobs or results. All hooks are of type state -> a -> IO (Maybe a); its important to note the type can not change and if a hook returns Nothing then the job or result stops there.

Hooks can either be input, pre-mutate, post-mutate, or output. Input and output hooks are ran in series on all jobs or results respectivly; this is intended for low computation tasks that shouldn’t be done in parallel later. Pre and post mutate hooks happen on the (parallel) worker threads before an after the main task, called the mutator.

Mutator

The engine consists of N mutator threads, which is the only operation capable of transforming the jobs into a different (result) type.

State Management

Control.Engine was built with the idea that jobs and state reads were frequent while alterations to the state were rare. A design decision was made to use STM to resolve all contention on state alterations and have a manager watch the TVar for change then bundle those changes in a quicker to read fashion (MVar) for the input, output, and worker threads.

The state provided to the hooks and mutator is always consistent but not guarenteed up-to-date. When modifications to the state occur a transactional variable is modified, which wakes the stateManager; in turn, the state manager updates the state MVar which is read by each thread before processing the next job. In the future IORefs might be used instead of the MVar – all contention is handled by the STM and the only writer for the MVar (future IORef) should be the State Manager.

Web Crawling

Now lets figure out how to help users who need more flexibility using Control.Engine instead of Control.ThreadPool.

MyCatVerbs, from #haskell, suggested a web crawler that uses URls as the job and the mutator (worker) can add all the links of the current page as new jobs while ignoring any URL that was already visited.  Lets start!

The imports aren’t too surprising – tagsoup, concurrent, bloomfilter and Control-Engine are the packages I draw on.

module Main where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Chan
import Control.Monad (forever, when)
import Control.Engine			-- Control-Engine
import Control.Exception as X
import Data.BloomFilter 		-- bloomfilter
import Data.BloomFilter.Hash		-- bloomfilter
import Data.BloomFilter.Easy		-- bloomfilter
import Data.IORef
import System.Environment (getArgs)
import Text.HTML.Download		-- tagsoup
import Text.HTML.TagSoup		-- tagsoup

type URL = String

data Job = GetURL URL | ParseHTML URL String deriving (Eq, Ord, Show)

main = do
	(nrCPU:url:_) <- getArgs

The library tries to remain flexible which makes you do a little more work but don’t let that scare you! It needs an IO action to get tasks and an IO action that delivers the results. Most people will probably just want a channel, but sockets or files would do just as well.

	input <- newChan
	output <- newChan

Starting the engine is a one line affair. You provide the number of threads, input, output, a mutator function and initial state. In return you are provided with an ‘Engine’ with which you can modify the hooks and use the injection points.

For this web crawler my ‘state’ is just a bloom filter of all visited URLs so I’ll keep that in the one hook its needed and declare the engine-wide state as a null – (). For the chosen task the mutator needs a way to add more jobs (more URLs) so as pages are parsed any new URLs can be queued for future crawling; this is handled via partial application of mutator funciton.

	eng <- initEngine (read nrCPU) (readChan input) (writeChan output) (mutator (writeChan input)) ()

As mentioned, I’ll use a bloom filter to keep the web crawler from re-visiting the same site many times. This should happen exactly once for each URL and is fairly fast so I’ll insert it as an ‘Input Hook’ which means a single thread will process all jobs before they get parsed out to the parallel thread pool.

	let initialBF = fromListB (cheapHashes nrHashes) nrBits []
            (nrBits, nrHashes) = suggestSizing 100000 (10 ** (-6))
	bf <- newIORef (initialBF,0)
	let bfHook = Hk (uniqueURL bf) 1 "Ensure URLs have not already been crawled"
	addInputHook eng bfHook

Finishing up main, we print all results then provide an initial URL. Notice we run forever – there’s no clean shutdown in this toy example.

	forkIO $ forever $ printResult output
	writeChan input (GetURL url)
	neverStop eng
  where
  neverStop eng = forever $ threadDelay maxBound

And from here on we’re just providing the worker routine that will run across all the threads and we’ll define the input hook. TagSoup performs all the hard work of downloading the page and parsing HTML. Just pull out the <a href=”…”> tags to add the new URLs as jobs before returning any results. In this example I decided to avoid any sort of error checking (ex: making sure this is an HTML document) and simply returning the number of words as a result.

mutator :: (Job -> IO ()) -> st -> Job -> IO (Maybe (URL,Int))
mutator addJob _ (GetURL url) = forkIO (do
	e <- X.try (openURL url) :: IO (Either X.SomeException String)
	case e of
		Right dat -> addJob (ParseHTML url dat)
		_ -> return () )
	>> return Nothing
mutator addJob _ (ParseHTML url dat) = do
	let !urls = getURLs dat
	    !len = length urls
	    fixed = map (\u -> if take 4 u /= "http" then url ++ '/' : u else u) urls
	mapM_ (addJob . GetURL) fixed
	return $ Just (url,len)
  where
  getURLs :: String -> [URL]
  getURLs s = 
	let tags = parseTags s
	in map snd (concatMap hrefs tags)
  hrefs :: Tag -> [(String,String)]
  hrefs (TagOpen "a" xs) = filter ( (== "href") . fst) xs
  hrefs _ = []

printResult :: (Show result) => Chan result -> IO ()
printResult c = readChan c >>= print

Filtering out non-unique URLs is just the bloom filter in action.

uniqueURL :: IORef (Bloom URL, Int) -> st -> Job -> IO (Maybe Job)
uniqueURL _ _ j@(ParseHTML _ _) = return $ Just j
uniqueURL bf _ j@(GetURL url) =  do
	(b,i) <- readIORef bf
	if elemB url b
		then putStrLn ("Reject: " ++ url) >> return Nothing
		else do writeIORef bf (insertB url b, i + 1)
			when (i `rem` 100 == 0) (print i)
			return $ Just j

Performance

P.S. No serious performance measurements have been made beyond extremely expensive (and trivially parallel) problems, so those don’t count.

14Mar2009 EDIT: Said something about Control.Engine before showing the diagram to make reading smoother.
EDIT2: ThreadPool example shown was a version using MD5, not length – oops! Fixed now.

hsXenCtrl and pureMD5

August 7, 2008

On vacation I found some time to upload the new hsXenCtrl library (0.0.7) and pureMD5 (0.2.4)

The new hsXenCtrl includes the System.Xen module, which is a WriterT ErrorT transformer stack and a brief attempt at ‘Haskellifying’ the xen control library.  I find it much more useful for simple tasks like pausing, unpasing, creating and destroying domains.  The API is still subject to change without notice as plenty of function are still very ‘C’ like (ex: scheduler / sedf functions).

pureMD5 received a much smaller change – some users noticed the -fvia-c caused compilation headaches on OS X.  After removing the offending flag, some benchmarks revealed no measureable difference in speed, so this is an over-due change. OS X users rejoice!

Hello planet, as my first post that gets placed on planet.haskell.org I decided to do a quick recap of the libraries I maintain and muse about future libraries.  My past posts include why static buffers make baby Haskell Curry cry and fun academic papers.

The Past

* pureMD5: An implementation of MD5 in Haskell using lazy ByteStrings.  It performs within an order of magnitude of the typical ‘md5sum’ binary, but has known inefficiencies and could be improved.

* ipc: A trivial to use inter-process communication library.  This could use some work, seeing as structures that serialize to over 4k get truncated currently.  I’ll probably only come back to this if I end up with a need for it.

* control-event: An event system for scheduling and canceling events, optimized for use with absolute clock times.

The Present

* hsXenCtrl: This library is intended to open the doors for Haskell apps to interact with and perhaps manage Xen.  Currently its just straight forward ‘c’ bindings to an old version of <xenctrl.h>, but the intent is to build a higher level library with useful plumbing.

* NumLazyByteString: Not sure if I’ll bother finishing this one, but it adds ByteString to the Num, Enum, and Bits type classes.  I just thought it would be funny to have lazy adding allowing: L.readFile “/dev/urandom” >>= \a -> print (a + a `mod` 256)

The Future

I tend to be a bit of a bouncing ball in terms of what nterests me.  Near and mid-term tasks will probably be a couple of these:

* A .cabal parser that can create a dependency graph for Haskell packages (not modules).  But should I use an outside package like graphviz or go pure / self contained Haskell?

* An implementation of something distributed like a P2P or ad-hoc networking protocol.  Would this be Pastry then Awerbuchs work or OLSR2?  These would be large tasks with their own ups and downs.

* Finally learn happs and make some sort of web Xen management system using hsXenCtrl.

* Learn Erlang – just because it looks cool too.

* Forget programming (and blogging) – read more TaPL!

Planet Haskell

June 9, 2008

Yes, the owner of this blog is requesting to get added to Planet Haskell – is this authentication enough? ;-)

Recommended Reading

May 25, 2008

I’ve been wanting to advance my education through a PhD program for a while now. As such, I’ve been reading a reasonable number of papers mostly in the field programming languages (strong bias toward SPJs work), but also in Ad-hoc networks (strong bias toward Baruch Awerbuch papers). I can’t say I’m too selective on what I like, but here are some of my likes anyway. Enjoy and feel free to post your papers or any discussion of the ideas presented in these papers.

Within The World of Languages

Simon Peyton-Jones, “Call-pattern Specialisation for Haskell Programs” *

Simon Marlow et al “Faster Laziness Using Dynamic Pointer Tagging

Simon Peyton-Jones et al, “Playing by the Rules: Rewriting as a practical optimisation technique in GHC” *

Tom Schrijvers et al “Type Checking with Open Type Functions” *

Duncan Coutts et al “Stream Fusion: From Lists to Streams to Nothing at All” *

Neil Mitchell and Colin RuncimanA Supercompiler for Core Haskell” * (Looks great, but I want to try it on my own programs to see if it will benefit me as much as I hope)

Peng Li et al “Lightweight Concurrency Primitives for GHC” * (A simpler to understand RTS would be great, but I fear for the performance)

Robert Ennals et al “Task Partitioning for Multi-Core Network Processors” *

Peng Li and Steve Zdancewic “Encoding Information Flow in Haskell” (perhaps not sound, but certainly useful)

Tim Harris and Simon Peyton Jones “Transactional Memory with Data Invariants” * (some functions aren’t available in the standard GHC/STM load, but the paper is fun anyway)

Dana Xu et al “Static Contract Checking for Haskell” * (I don’t know about you, but I almost can’t wait to see the work embodied in a GHC release!)

Every name you know “Roadmap for Enhanced Languages and Methods to Aid Verification

Ad hoc / Distributed Systems / Protocols

Baruch Awerbuch et al “Towards Scalable and Robust Overlay Networks” (See the entire line of papers, including “A Denial-of-Service Resistant DHT” and “Towards a Scalable and Robust DHT”)

Rudolf Ahlswede et al “Network Information Flow

Sachin Katti et al “Network Coding Made Practical” * (Now why isn’t this an option when I click network manager -> ad hoc network in Fedora 9?)

Joshua Guttman “Authentication Tests and the Structure of Bundles” *

Baruch Awerbuch et al “Provably Competitive Adaptive Routing” *

Baruch Awerbuch et al “Medium Time Metric” (This one is just begging for someone to write a paper “The opportunity cost metric”, don’t you think?)

* Easy read (even if it isn’t your field) / very enjoyable

I posted this as a page by accident – so here it is as a blog entry and I’ll delete the page some day.

My previous post discussed how inet_ntoa uses a static buffer which can cause a race condition. Unlike in ‘C’, this is particularly likely to cause a race in Haskell programs due to the quick, easy, and cheap threads using forkIO that (potentially) share a single OS thread. Two bright spots were that inet_ntoa was marked as IO and that the result is usually unimportant.

Another FFI binding, nano-md5, has a similar race condition but is much more series (not marked as IO and the result is a digest).

An even-handed note: iirc, nano-md5 remains hackage mostly as an FFI example – not that this is advertised in the nano-md5 description. “Real” users are told to look at hsOpenSSL and hopenssl – a cursory glance at the code suggests they don’t have this bug. Also, the other bindings don’t require O(n) space – so they are certainly worth switching to.

The nano-md5 line:

digest <- c_md5 ptr (fromIntegral n) nullPtr

is the culprit. It uses ‘nullPtr’ and according to the OpenSSL manual “If md is NULL, the digest is placed in a static array”.

Test code that confirms the bug can be found here – this will run three hash operations in parallel and eventually one result will be the correct first bits with ending bits from one of the other digests. The developer has already fixed the issue for versions 0.1.2+. I’ll wrap this post up with a request for library developers to please work to avoid use of static buffers – they have no place in this forkIO happy playland I call Haskell.

Racing inet_ntoa

April 24, 2008

Just because I am feeling lazy wrt any real task, I decided to post about the sillyness that is inet_ntoa. Yes, this is ancient/known stuff to rehash, but you can hit the browser back button at any time.

As most of you probably know, the function inet_ntoa converts an IPv4 address to ascii, storing the result in a static buffer. It is this last part that periodically causes people fun when they forget. This mutable memory issue is revealed easily enough in goofed up ‘C’ statements such as:

  struct in_addr a,b;
  inet_aton("1.2.3.4", &a);
  inet_aton("99.43.214.7", &b);
  printf("addr a: %s\taddr b: %s\n", inet_ntoa(a), inet_ntoa(b));

which returns “addr a: 1.2.3.4 addr b: 1.2.3.4″. Sometimes more complex systems have a race condition (ex: exception handlers calling inet_ntoa), but it isn’t a larger issue in multi-threaded C programs thanks to thread local storage…

unless you cram many logical threads into a single OS thread like in Haskell. Zao in #haskell asked why inet_ntoa was of type IO (meaning, it isn’t a pure / deterministic function) and I correctly guessed it was a wrapper for the ‘C’ call.

Not to rip at any of the libraries folk, who made a faithful foreign function interface for the sockets/networking functions, but – this was a bad idea. Foremost, the use of IO means this can’t be called from any deterministic function even though the desired operation of converting an address to a string IS deterministic. Secondly, some Haskell programmers (myself included) use Haskells threads liberally (perhaps another, positive, blog post on that). So if someone is being brain-dead then they are going to have a bug – likely non-fatal and obvious due to how string representation of addresses are used.

And if you desire to see the race, I have some code… hope it runs… yep:

import Network.Socket (inet_ntoa)
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad (when, forever)

main = do
    let zero = (0,"0.0.0.0")
        one  = (1,"1.0.0.0")
        two  = (2,"2.0.0.0")
        assert = confirm "assert"
        race = \x -> forever (confirm "FAILURE: " x)
    assert zero
    assert one
    assert two
    forkIO $ race zero
    forkIO $ race one
    forkIO $ race two
    threadDelay maxBound

test n s = forever $ confirm "" (n,s)

confirm prefix (n,str) = do
    s <- inet_ntoa n
    when (s /= str) (error (prefix ++ s ++ " does not equal " ++ str))

Edit:
Yes, I know this non-deterministic behavior is being screamed by that ‘IO’ type.
Yes, I know I should write a Haskellized network library.

Follow

Get every new post delivered to your Inbox.