
commit fe5b3964db5f8b7f88fc308b38c16e1ccd7fa849 Author: David Kaloper <david@numm.org> Date: Sat Sep 21 19:18:52 2013 +0200 introduce conduits And get rid of TorDNSEL.Util.hGetLine. --- src/TorDNSEL/ExitTest/Request.hs | 94 ++++++++++--------- src/TorDNSEL/ExitTest/Server/Internals.hs | 2 +- src/TorDNSEL/TorControl/Internals.hs | 78 ++++++++-------- src/TorDNSEL/Util.hsc | 148 ++++++++---------------------- tordnsel.cabal | 2 +- 5 files changed, 130 insertions(+), 194 deletions(-) diff --git a/src/TorDNSEL/ExitTest/Request.hs b/src/TorDNSEL/ExitTest/Request.hs index 87a2fbd..82e198c 100644 --- a/src/TorDNSEL/ExitTest/Request.hs +++ b/src/TorDNSEL/ExitTest/Request.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE OverloadedStrings #-} + ----------------------------------------------------------------------------- -- | -- Module : TorDNSEL.ExitTest.Request @@ -24,14 +26,20 @@ module TorDNSEL.ExitTest.Request ( , cookieLen ) where -import Control.Arrow ((***)) -import Control.Monad (guard) +import Control.Arrow ((***), second) +import Control.Applicative +import Control.Monad import Control.Monad.Trans (lift, liftIO) +import Data.Monoid import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy as BL import Data.Char (isSpace, toLower) import qualified Data.Map as M import System.IO (Handle) +import Data.Conduit +import qualified Data.Conduit.Binary as CB + import TorDNSEL.Util -------------------------------------------------------------------------------- @@ -40,55 +48,50 @@ import TorDNSEL.Util -- | Create an HTTP request that POSTs a cookie to one of our listening ports. createRequest :: B.ByteString -> Port -> Cookie -> B.ByteString createRequest host port cookie = - B.intercalate (B.pack "\r\n") - -- POST should force caching proxies to forward the request. - [ B.pack "POST / HTTP/1.0" - -- Host doesn't exist in HTTP 1.0. We'll use it anyway to help the request - -- traverse transparent proxies. - , B.pack "Host: " `B.append` hostValue - , B.pack "Content-Type: application/octet-stream" - , B.pack "Content-Length: " `B.append` B.pack (show cookieLen) - , B.pack "Connection: close" - , B.pack "\r\n" `B.append` unCookie cookie ] + B.intercalate "\r\n" + -- POST should force caching proxies to forward the request. + [ "POST / HTTP/1.0" + -- Host doesn't exist in HTTP 1.0. We'll use it anyway to help the request + -- traverse transparent proxies. + , "Host: " <> hostValue + , "Content-Type: application/octet-stream" + , "Content-Length: " <> bshow cookieLen + , "Connection: close" + , "\r\n" <> unCookie cookie ] + where hostValue | port == 80 = host - | otherwise = B.concat [host, B.pack ":", B.pack $ show port] + | otherwise = B.concat [host, ":", bshow port] -- | Given an HTTP client, return the cookie contained in the body of the HTTP -- request if it's well-formatted, otherwise return 'Nothing'. -getRequest :: Handle -> MaybeT IO Cookie -getRequest client = do - (reqLine,headers) <- liftIO $ getHeader - guard $ reqLine `elem` [B.pack "POST / HTTP/1.0", B.pack "POST / HTTP/1.1"] - Just contentType <- return $ B.pack "content-type" `M.lookup` headers - guard $ contentType == B.pack "application/octet-stream" - Just contentLen <- return $ readInt =<< B.pack "content-length" `M.lookup` headers - guard $ contentLen == cookieLen - - fmap Cookie . lift $ B.hGet client cookieLen +getRequest :: Handle -> IO (Maybe Cookie) +getRequest client = + CB.sourceHandle client $= CB.isolate maxReqLen $$ do + mh <- getHeaders + case checkHeaders mh of + Nothing -> return Nothing + Just _ -> Just . Cookie <$> takeC cookieLen + where - maxHeaderLen = 2048 - crlf = B.pack "\r\n" - crlfLen = 2 - - getHeader = do - reqLine <- hGetLine client crlf maxHeaderLen - headers <- getHeaders (maxHeaderLen - B.length reqLine - crlfLen) - return (reqLine, M.fromList headers) - - getHeaders remain - | remain <= 0 = return [] - | otherwise = do - header <- hGetLine client crlf remain - if B.null header - then return [] - else do - headers <- getHeaders (remain - B.length header - crlfLen) - return (readHeader header : headers) - - readHeader = - (B.map toLower *** B.dropWhile isSpace . B.drop 1) . B.break (== ':') + maxReqLen = 2048 + cookieLen + line = frameC "\r\n" + + getHeaders = + (,) <$> line + <*> (decodeHeaders <$> muntil B.null line) + where + decodeHeaders = M.fromList . + map ((B.map toLower *** B.dropWhile isSpace . B.tail) + . B.break (== ':')) + + checkHeaders (reqLine, headers) = do + contentType <- "content-type" `M.lookup` headers + contentLen <- readInt =<< "content-length" `M.lookup` headers + guard $ reqLine `elem` ["POST / HTTP/1.0", "POST / HTTP/1.1"] + guard $ contentType == "application/octet-stream" + guard $ contentLen == cookieLen -------------------------------------------------------------------------------- -- Cookies @@ -97,7 +100,7 @@ getRequest client = do -- associate it with the exit node we're testing through and use it look up that -- exit node when we receive it on a listening port. newtype Cookie = Cookie { unCookie :: B.ByteString } - deriving (Eq, Ord) + deriving (Eq, Ord, Show) -- | Create a new cookie from pseudo-random data. newCookie :: (Int -> IO B.ByteString) -> IO Cookie @@ -106,3 +109,4 @@ newCookie getRandBytes = Cookie `fmap` getRandBytes cookieLen -- | The cookie length in bytes. cookieLen :: Int cookieLen = 32 + diff --git a/src/TorDNSEL/ExitTest/Server/Internals.hs b/src/TorDNSEL/ExitTest/Server/Internals.hs index 8f9a872..13e2136 100644 --- a/src/TorDNSEL/ExitTest/Server/Internals.hs +++ b/src/TorDNSEL/ExitTest/Server/Internals.hs @@ -181,7 +181,7 @@ handleMessage conf s (NewClient sock addr) = do tid <- forkLinkIO . (`E.finally` signalQSemN (handlerSem s) 1) . E.bracket (socketToHandle sock ReadWriteMode) hClose $ \client -> do r <- timeout readTimeout . E.try $ do - r <- runMaybeT $ getRequest client + r <- getRequest client case r of Just cookie -> do now <- getCurrentTime diff --git a/src/TorDNSEL/TorControl/Internals.hs b/src/TorDNSEL/TorControl/Internals.hs index 254d6b1..7e0b8f1 100644 --- a/src/TorDNSEL/TorControl/Internals.hs +++ b/src/TorDNSEL/TorControl/Internals.hs @@ -95,7 +95,6 @@ module TorDNSEL.TorControl.Internals ( -- * Backend connection manager , IOMessage(..) , startIOManager - , ReplyType(..) , startSocketReader -- * Data types @@ -130,6 +129,7 @@ module TorDNSEL.TorControl.Internals ( , parseReplyCode , throwIfNotPositive , isPositive + ) where import Control.Arrow (first, second) @@ -156,6 +156,10 @@ import Data.Typeable (Typeable) import System.IO (Handle, hClose, hSetBuffering, BufferMode(..), hFlush) import System.IO.Error (isEOFError) +import Data.Conduit +import qualified Data.Conduit.Binary as CB +import qualified Data.Conduit.List as CL + import TorDNSEL.Control.Concurrent.Link import TorDNSEL.Control.Concurrent.Future import TorDNSEL.Control.Concurrent.Util @@ -213,7 +217,6 @@ openConnection handle mbPasswd = do let conn' = Conn tellIOManager ioManagerTid protInfo confSettings authenticate mbPasswd conn' useFeature [VerboseNames] conn' - putStrLn "*X MRMLJ" return conn' ) `onException'` closeConnection' conn confSettings @@ -823,45 +826,48 @@ startIOManager handle = do eventCode = B.takeWhile (/= ' ') . repText --- | Reply types in a single sequence of replies. -data ReplyType - = MidReply !Reply -- ^ A reply preceding other replies. - | LastReply !Reply -- ^ The last reply. - deriving Show - -- | Start a thread that reads replies from @handle@ and passes them to -- @sendRepliesToIOManager@, linking it to the calling thread. startSocketReader :: Handle -> ([Reply] -> IO ()) -> IO ThreadId startSocketReader handle sendRepliesToIOManager = - forkLinkIO . forever $ readReplies >>= sendRepliesToIOManager + forkLinkIO $ CB.sourceHandle handle $= + repliesC $$ + CL.mapM_ sendRepliesToIOManager + +-- | Conduit taking lines to 'Reply' blocks. +replyC :: Conduit B.ByteString IO [Reply] +replyC = + line0 [] + where + + line0 acc = await >>= return () `maybe` \line -> do + let (code, (typ, text)) = B.splitAt 1 `second` B.splitAt 3 line + code' <- either (monadThrow . ProtocolError) return $ + parseReplyCode code + case () of + _ | typ == B.pack "-" -> line0 (Reply code' text [] : acc) + | typ == B.pack "+" -> line0 . (: acc) . Reply code' text =<< rest [] + | typ == B.pack " " -> do + yield $ reverse (Reply code' text [] : acc) + line0 [] + | otherwise -> monadThrow $ ProtocolError $ + cat "Malformed reply line type " (esc 1 typ) '.' + + rest acc = + await >>= \mline -> case mline of + Nothing -> return $ reverse acc + Just line | B.null line -> rest acc + | line == B.pack "." -> return $ reverse (line:acc) + | otherwise -> rest (line:acc) + +-- | Conduit taking raw 'ByteString' to 'Reply' blocks. +repliesC :: Conduit B.ByteString IO [Reply] +repliesC = + CB.lines =$= CL.map strip =$= replyC where - readReplies = do - line <- parseReplyLine =<< hGetLine handle crlf maxLineLength - case line of - MidReply reply -> fmap (reply :) readReplies - LastReply reply -> return [reply] - - parseReplyLine line = - either (E.throwIO . ProtocolError) (parseReplyLine' typ text) - (parseReplyCode code) - where (code,(typ,text)) = B.splitAt 1 `second` B.splitAt 3 line - - parseReplyLine' typ text code - | typ == B.pack "-" = return . MidReply $ Reply code text [] - | typ == B.pack "+" = (MidReply . Reply code text) `fmap` readData - | typ == B.pack " " = return . LastReply $ Reply code text [] - | otherwise = E.throwIO . ProtocolError $ - cat "Malformed reply line type " (esc 1 typ) '.' - - readData = do - line <- hGetLine handle (B.pack "\n") maxLineLength - case (if B.last line == '\r' then B.init else id) line of - line' | line == (B.pack ".\r") -> return [] - | any B.null [line, line'] -> readData - | otherwise -> fmap (line' :) readData - - crlf = B.pack "\r\n" - maxLineLength = 2^20 + strip bs = case unsnoc bs of + Just (bs', '\r') -> bs' + _ -> bs -------------------------------------------------------------------------------- -- Data types diff --git a/src/TorDNSEL/Util.hsc b/src/TorDNSEL/Util.hsc index 5cea0bb..6bbffc3 100644 --- a/src/TorDNSEL/Util.hsc +++ b/src/TorDNSEL/Util.hsc @@ -30,6 +30,9 @@ module TorDNSEL.Util ( , replaceError , handleError + -- * Show functions + , bshow + -- * Strict functions , adjust' , alter' @@ -49,6 +52,7 @@ module TorDNSEL.Util ( , inet_htoa , encodeBase16 , split + , unsnoc , syncExceptions , bracket' , finally' @@ -59,10 +63,13 @@ module TorDNSEL.Util ( , inBoundsOf , htonl , ntohl - , hGetLine , splitByDelimiter , showUTCTime + -- * Conduit utilities + , takeC + , frameC + -- * Network functions , bindUDPSocket , bindListeningTCPSocket @@ -116,9 +123,11 @@ import Data.Char import Data.Dynamic (Dynamic) import Data.List (foldl', intersperse) import Data.Maybe (mapMaybe) +import Data.Monoid import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Internal as B -import qualified Data.ByteString.Unsafe as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString.Internal as B (c2w) +import qualified Data.ByteString as B (hGetSome) import Data.ByteString (ByteString) import qualified Data.Map as M import Data.Ratio (numerator, denominator, (%)) @@ -140,6 +149,9 @@ import System.Posix.Types (FileMode) import Text.Printf (printf) import Data.Binary (Binary(..)) +import qualified Data.Conduit as C +import qualified Data.Conduit.Binary as CB + #include <netinet/in.h> -------------------------------------------------------------------------------- @@ -240,6 +252,12 @@ handleError :: MonadError e m => (e -> m a) -> m a -> m a handleError = flip catchError -------------------------------------------------------------------------------- +-- Show functions + +bshow :: (Show a) => a -> B.ByteString +bshow = B.pack . show + +-------------------------------------------------------------------------------- -- Strict functions -- | Same as 'M.adjust', but the adjusting function is applied strictly. @@ -322,6 +340,10 @@ encodeBase16 = B.pack . concat . B.foldr ((:) . toBase16 . B.c2w) [] split :: Int -> ByteString -> [ByteString] split x = takeWhile (not . B.null) . map (B.take x) . iterate (B.drop x) +-- | Deconstruct a 'ByteString' at the tail. +unsnoc :: ByteString -> Maybe (ByteString, Char) +unsnoc bs | B.null bs = Nothing + | otherwise = Just (B.init bs, B.last bs) -- | Try an action, catching -- roughly -- "synchronous" exceptions. -- @@ -401,114 +423,18 @@ instance Error e => MonadError e Maybe where foreign import ccall unsafe "htonl" htonl :: Word32 -> Word32 foreign import ccall unsafe "ntohl" ntohl :: Word32 -> Word32 --- | Read a line terminated by an arbitrary sequence of bytes from a handle. The --- end-of-line sequence is stripped before returning the line. @maxLen@ --- specifies the maximum line length to read, not including the end-of-line --- sequence. If the line length exceeds @maxLen@, return the first @maxLen@ --- bytes. If EOF is encountered, return the bytes preceding it. The handle --- should be in 'LineBuffering' mode. -hGetLine :: Handle -> ByteString -> Int -> IO ByteString -hGetLine = error "hGetLine" -- XXX STUB --- hGetLine h eol maxLen | B.null eol = B.hGet h maxLen --- hGetLine h eol@(B.PS _ _ eolLen) maxLen --- = wantReadableHandle "TorDNSEL.Util.hGetLine" h $ \handle_ -> do --- case haBufferMode handle_ of --- NoBuffering -> error "no buffering" --- _other -> hGetLineBuffered handle_ --- --- where --- hGetLineBuffered handle_ = do --- let ref = haBuffer handle_ --- buf <- readIORef ref --- hGetLineBufferedLoop handle_ ref buf 0 0 [] --- --- hGetLineBufferedLoop handle_ ref --- buf@Buffer{ bufRPtr=r, bufWPtr=w, bufBuf=raw } !len !eolIx xss = do --- (new_eolIx,off) <- findEOL eolIx r w raw --- let new_len = len + off - r --- --- if maxLen > 0 && new_len - new_eolIx > maxLen --- -- If the line length exceeds maxLen, return a partial line. --- then do --- let maxOff = off - (new_len - maxLen) --- writeIORef ref buf{ bufRPtr = maxOff } --- mkBigPS . (:xss) =<< mkPS raw r maxOff --- else if new_eolIx == eolLen --- -- We have a complete line; strip the EOL sequence and return it. --- then do --- if w == off --- then writeIORef ref buf{ bufRPtr=0, bufWPtr=0 } --- else writeIORef ref buf{ bufRPtr = off } --- if eolLen <= off - r --- then mkBigPS . (:xss) =<< mkPS raw r (off - eolLen) --- else fmap stripEOL . mkBigPS . (:xss) =<< mkPS raw r off --- else do --- xs <- mkPS raw r off --- maybe_buf <- maybeFillReadBuffer (haFD handle_) True --- (haIsStream handle_) buf{ bufWPtr=0, bufRPtr=0 } --- case maybe_buf of --- -- Nothing indicates we caught an EOF, and we may have a --- -- partial line to return. --- Nothing -> do --- writeIORef ref buf{ bufRPtr=0, bufWPtr=0 } --- if new_len > 0 --- then mkBigPS (xs:xss) --- else ioe_EOF --- Just new_buf -> --- hGetLineBufferedLoop handle_ ref new_buf new_len new_eolIx --- (xs:xss) --- --- maybeFillReadBuffer fd is_line is_stream buf --- = catch (Just `fmap` fillReadBuffer fd is_line is_stream buf) --- (\e -> if isEOFError e then return Nothing else ioError e) --- --- findEOL eolIx --- | eolLen == 1 = findEOLChar (B.w2c $ B.unsafeHead eol) --- | otherwise = findEOLSeq eolIx --- --- findEOLChar eolChar r w raw --- | r == w = return (0, r) --- | otherwise = do --- (!c,!r') <- readCharFromBuffer raw r --- if c == eolChar --- then return (1, r') --- else findEOLChar eolChar r' w raw --- --- -- find the end-of-line sequence, if there is one --- findEOLSeq !eolIx r w raw --- | eolIx == eolLen || r == w = return (eolIx, r) --- | otherwise = do --- (!c,!r') <- readCharFromBuffer raw r --- findEOLSeq (next c eolIx + 1) r' w raw --- --- -- get the next index into the EOL sequence we should match against --- next !c !i = if i >= 0 && c /= eolIndex i then next c (table ! i) else i --- --- eolIndex = B.w2c . B.unsafeIndex eol --- --- -- build a match table for the Knuth-Morris-Pratt algorithm --- table = runSTUArray (do --- arr <- newArray_ (0, if eolLen == 1 then 1 else eolLen - 1) --- zipWithM_ (writeArray arr) [0,1] [-1,0] --- loop arr 2 0) --- where --- loop arr !t !p --- | t >= eolLen = return arr --- | eolIndex (t - 1) == eolIndex p --- = let p' = p + 1 in writeArray arr t p' >> loop arr (t + 1) p' --- | p > 0 = readArray arr p >>= loop arr t --- | otherwise = writeArray arr t 0 >> loop arr (t + 1) p --- --- stripEOL (B.PS p s l) = E.assert (new_len >= 0) . B.copy $ B.PS p s new_len --- where new_len = l - eolLen --- --- mkPS buf start end = B.create len $ \p -> do --- B.memcpy_ptr_baoff p buf (fromIntegral start) (fromIntegral len) --- return () --- where len = end - start --- --- mkBigPS [ps] = return ps --- mkBigPS pss = return $! B.concat (reverse pss) +takeC :: Monad m => Int -> C.ConduitM ByteString o m ByteString +takeC = fmap (mconcat . BL.toChunks) . CB.take + +-- | Take a prefix up to delimiter. +-- FIXME This is worst-case quadratic. +frameC :: Monad m => ByteString -> C.ConduitM ByteString o m ByteString +frameC delim = loop $ B.pack "" where + loop acc = C.await >>= + return acc `maybe` \bs -> + case B.breakSubstring delim $ acc <> bs of + (h, t) | B.null t -> loop h + | otherwise -> h <$ C.leftover (B.drop (B.length delim) t) -- | Split @bs@ into pieces delimited by @delimiter@, consuming the delimiter. -- The result for overlapping delimiters is undefined. diff --git a/tordnsel.cabal b/tordnsel.cabal index 3173943..50e7f40 100644 --- a/tordnsel.cabal +++ b/tordnsel.cabal @@ -15,7 +15,7 @@ Maintainer: tup.tuple@googlemail.com, lunar@debian.org, andrew@torproject.o Build-Type: Simple Build-Depends: base>=2.0, network>=2.0, mtl>=1.0, unix>=1.0, stm>=2.0, time>=1.0, HUnit>=1.1, binary>=0.4, bytestring>=0.9, array>=0.1, directory>=1.0, - containers>=0.1, deepseq >= 1.3 + containers>=0.1, conduit >= 1.0.0 && < 1.1.0, deepseq >= 1.3 Tested-With: GHC==6.6, GHC==6.8, GHC==6.10, GHC==6.12 Data-Files: config/tordnsel.conf.sample, contrib/cacti-input.pl, contrib/tordnsel-init.d-script.sample, doc/tordnsel.8
participants (1)
-
arlo@torproject.org