[tor-commits] [tordnsel/master] introduce conduits

arlo at torproject.org arlo at torproject.org
Sat Apr 16 06:08:43 UTC 2016


commit fe5b3964db5f8b7f88fc308b38c16e1ccd7fa849
Author: David Kaloper <david at numm.org>
Date:   Sat Sep 21 19:18:52 2013 +0200

    introduce conduits
    
    And get rid of TorDNSEL.Util.hGetLine.
---
 src/TorDNSEL/ExitTest/Request.hs          |  94 ++++++++++---------
 src/TorDNSEL/ExitTest/Server/Internals.hs |   2 +-
 src/TorDNSEL/TorControl/Internals.hs      |  78 ++++++++--------
 src/TorDNSEL/Util.hsc                     | 148 ++++++++----------------------
 tordnsel.cabal                            |   2 +-
 5 files changed, 130 insertions(+), 194 deletions(-)

diff --git a/src/TorDNSEL/ExitTest/Request.hs b/src/TorDNSEL/ExitTest/Request.hs
index 87a2fbd..82e198c 100644
--- a/src/TorDNSEL/ExitTest/Request.hs
+++ b/src/TorDNSEL/ExitTest/Request.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE OverloadedStrings  #-}
+
 -----------------------------------------------------------------------------
 -- |
 -- Module      : TorDNSEL.ExitTest.Request
@@ -24,14 +26,20 @@ module TorDNSEL.ExitTest.Request (
   , cookieLen
   ) where
 
-import Control.Arrow ((***))
-import Control.Monad (guard)
+import Control.Arrow ((***), second)
+import Control.Applicative
+import Control.Monad
 import Control.Monad.Trans (lift, liftIO)
+import Data.Monoid
 import qualified Data.ByteString.Char8 as B
+import qualified Data.ByteString.Lazy as BL
 import Data.Char (isSpace, toLower)
 import qualified Data.Map as M
 import System.IO (Handle)
 
+import           Data.Conduit
+import qualified Data.Conduit.Binary as CB
+
 import TorDNSEL.Util
 
 --------------------------------------------------------------------------------
@@ -40,55 +48,50 @@ import TorDNSEL.Util
 -- | Create an HTTP request that POSTs a cookie to one of our listening ports.
 createRequest :: B.ByteString -> Port -> Cookie -> B.ByteString
 createRequest host port cookie =
-  B.intercalate (B.pack "\r\n")
-  -- POST should force caching proxies to forward the request.
-  [ B.pack "POST / HTTP/1.0"
-  -- Host doesn't exist in HTTP 1.0. We'll use it anyway to help the request
-  -- traverse transparent proxies.
-  , B.pack "Host: " `B.append` hostValue
-  , B.pack "Content-Type: application/octet-stream"
-  , B.pack "Content-Length: " `B.append` B.pack (show cookieLen)
-  , B.pack "Connection: close"
-  , B.pack "\r\n" `B.append` unCookie cookie ]
+    B.intercalate "\r\n"
+    -- POST should force caching proxies to forward the request.
+    [ "POST / HTTP/1.0"
+    -- Host doesn't exist in HTTP 1.0. We'll use it anyway to help the request
+    -- traverse transparent proxies.
+    , "Host: " <> hostValue
+    , "Content-Type: application/octet-stream"
+    , "Content-Length: " <> bshow cookieLen
+    , "Connection: close"
+    , "\r\n" <> unCookie cookie ]
+
   where
     hostValue
       | port == 80 = host
-      | otherwise  = B.concat [host, B.pack ":", B.pack $ show port]
+      | otherwise  = B.concat [host, ":", bshow port]
 
 -- | Given an HTTP client, return the cookie contained in the body of the HTTP
 -- request if it's well-formatted, otherwise return 'Nothing'.
-getRequest :: Handle -> MaybeT IO Cookie
-getRequest client = do
-  (reqLine,headers) <- liftIO $ getHeader
-  guard $ reqLine `elem` [B.pack "POST / HTTP/1.0", B.pack "POST / HTTP/1.1"]
-  Just contentType <- return $ B.pack "content-type" `M.lookup` headers
-  guard $ contentType == B.pack "application/octet-stream"
-  Just contentLen <- return $ readInt =<< B.pack "content-length" `M.lookup` headers
-  guard $ contentLen == cookieLen
-
-  fmap Cookie . lift $ B.hGet client cookieLen
+getRequest :: Handle -> IO (Maybe Cookie)
+getRequest client =
+    CB.sourceHandle client $= CB.isolate maxReqLen $$ do
+      mh <- getHeaders
+      case checkHeaders mh of
+           Nothing -> return Nothing
+           Just _  -> Just . Cookie <$> takeC cookieLen
+
   where
-    maxHeaderLen = 2048
-    crlf = B.pack "\r\n"
-    crlfLen = 2
-
-    getHeader = do
-      reqLine <- hGetLine client crlf maxHeaderLen
-      headers <- getHeaders (maxHeaderLen - B.length reqLine - crlfLen)
-      return (reqLine, M.fromList headers)
-
-    getHeaders remain
-      | remain <= 0 = return []
-      | otherwise = do
-          header <- hGetLine client crlf remain
-          if B.null header
-            then return []
-            else do
-              headers <- getHeaders (remain - B.length header - crlfLen)
-              return (readHeader header : headers)
-
-    readHeader =
-      (B.map toLower *** B.dropWhile isSpace . B.drop 1) . B.break (== ':')
+    maxReqLen = 2048 + cookieLen
+    line      = frameC "\r\n"
+
+    getHeaders =
+        (,) <$> line
+            <*> (decodeHeaders <$> muntil B.null line)
+      where
+        decodeHeaders = M.fromList .
+          map ((B.map toLower *** B.dropWhile isSpace . B.tail)
+                . B.break (== ':'))
+
+    checkHeaders (reqLine, headers) = do
+      contentType <- "content-type" `M.lookup` headers
+      contentLen  <- readInt =<< "content-length" `M.lookup` headers
+      guard $ reqLine `elem` ["POST / HTTP/1.0", "POST / HTTP/1.1"]
+      guard $ contentType == "application/octet-stream"
+      guard $ contentLen == cookieLen
 
 --------------------------------------------------------------------------------
 -- Cookies
@@ -97,7 +100,7 @@ getRequest client = do
 -- associate it with the exit node we're testing through and use it look up that
 -- exit node when we receive it on a listening port.
 newtype Cookie = Cookie { unCookie :: B.ByteString }
-  deriving (Eq, Ord)
+  deriving (Eq, Ord, Show)
 
 -- | Create a new cookie from pseudo-random data.
 newCookie :: (Int -> IO B.ByteString) -> IO Cookie
@@ -106,3 +109,4 @@ newCookie getRandBytes = Cookie `fmap` getRandBytes cookieLen
 -- | The cookie length in bytes.
 cookieLen :: Int
 cookieLen = 32
+
diff --git a/src/TorDNSEL/ExitTest/Server/Internals.hs b/src/TorDNSEL/ExitTest/Server/Internals.hs
index 8f9a872..13e2136 100644
--- a/src/TorDNSEL/ExitTest/Server/Internals.hs
+++ b/src/TorDNSEL/ExitTest/Server/Internals.hs
@@ -181,7 +181,7 @@ handleMessage conf s (NewClient sock addr) = do
   tid <- forkLinkIO . (`E.finally` signalQSemN (handlerSem s) 1) .
     E.bracket (socketToHandle sock ReadWriteMode) hClose $ \client -> do
       r <- timeout readTimeout . E.try $ do
-        r <- runMaybeT $ getRequest client
+        r <- getRequest client
         case r of
           Just cookie -> do
             now <- getCurrentTime
diff --git a/src/TorDNSEL/TorControl/Internals.hs b/src/TorDNSEL/TorControl/Internals.hs
index 254d6b1..7e0b8f1 100644
--- a/src/TorDNSEL/TorControl/Internals.hs
+++ b/src/TorDNSEL/TorControl/Internals.hs
@@ -95,7 +95,6 @@ module TorDNSEL.TorControl.Internals (
   -- * Backend connection manager
   , IOMessage(..)
   , startIOManager
-  , ReplyType(..)
   , startSocketReader
 
   -- * Data types
@@ -130,6 +129,7 @@ module TorDNSEL.TorControl.Internals (
   , parseReplyCode
   , throwIfNotPositive
   , isPositive
+
   ) where
 
 import Control.Arrow (first, second)
@@ -156,6 +156,10 @@ import Data.Typeable (Typeable)
 import System.IO (Handle, hClose, hSetBuffering, BufferMode(..), hFlush)
 import System.IO.Error (isEOFError)
 
+import           Data.Conduit
+import qualified Data.Conduit.Binary as CB
+import qualified Data.Conduit.List as CL
+
 import TorDNSEL.Control.Concurrent.Link
 import TorDNSEL.Control.Concurrent.Future
 import TorDNSEL.Control.Concurrent.Util
@@ -213,7 +217,6 @@ openConnection handle mbPasswd = do
        let conn' = Conn tellIOManager ioManagerTid protInfo confSettings
        authenticate mbPasswd conn'
        useFeature [VerboseNames] conn'
-       putStrLn "*X MRMLJ"
        return conn'
     ) `onException'` closeConnection' conn confSettings
 
@@ -823,45 +826,48 @@ startIOManager handle = do
 
     eventCode = B.takeWhile (/= ' ') . repText
 
--- | Reply types in a single sequence of replies.
-data ReplyType
-  = MidReply  !Reply -- ^ A reply preceding other replies.
-  | LastReply !Reply -- ^ The last reply.
-  deriving Show
-
 -- | Start a thread that reads replies from @handle@ and passes them to
 -- @sendRepliesToIOManager@, linking it to the calling thread.
 startSocketReader :: Handle -> ([Reply] -> IO ()) -> IO ThreadId
 startSocketReader handle sendRepliesToIOManager =
-  forkLinkIO . forever $ readReplies >>= sendRepliesToIOManager
+  forkLinkIO $ CB.sourceHandle handle $=
+               repliesC               $$
+               CL.mapM_ sendRepliesToIOManager
+
+-- | Conduit taking lines to 'Reply' blocks.
+replyC :: Conduit B.ByteString IO [Reply]
+replyC =
+    line0 []
+  where
+
+    line0 acc = await >>= return () `maybe` \line -> do
+      let (code, (typ, text)) = B.splitAt 1 `second` B.splitAt 3 line
+      code' <- either (monadThrow . ProtocolError) return $
+                      parseReplyCode code
+      case () of
+        _ | typ == B.pack "-" -> line0 (Reply code' text [] : acc)
+          | typ == B.pack "+" -> line0 . (: acc) . Reply code' text =<< rest []
+          | typ == B.pack " " -> do
+              yield $ reverse (Reply code' text [] : acc)
+              line0 []
+          | otherwise -> monadThrow $ ProtocolError $
+                            cat "Malformed reply line type " (esc 1 typ) '.'
+
+    rest acc =
+      await >>= \mline -> case mline of
+          Nothing                        -> return $ reverse acc
+          Just line | B.null line        -> rest acc
+                    | line == B.pack "." -> return $ reverse (line:acc)
+                    | otherwise          -> rest (line:acc)
+
+-- | Conduit taking raw 'ByteString' to 'Reply' blocks.
+repliesC :: Conduit B.ByteString IO [Reply]
+repliesC =
+    CB.lines =$= CL.map strip =$= replyC
   where
-    readReplies = do
-      line <- parseReplyLine =<< hGetLine handle crlf maxLineLength
-      case line of
-        MidReply reply  -> fmap (reply :) readReplies
-        LastReply reply -> return [reply]
-
-    parseReplyLine line =
-      either (E.throwIO . ProtocolError) (parseReplyLine' typ text)
-             (parseReplyCode code)
-      where (code,(typ,text)) = B.splitAt 1 `second` B.splitAt 3 line
-
-    parseReplyLine' typ text code
-      | typ == B.pack "-" = return . MidReply $ Reply code text []
-      | typ == B.pack "+" = (MidReply . Reply code text) `fmap` readData
-      | typ == B.pack " " = return . LastReply $ Reply code text []
-      | otherwise = E.throwIO . ProtocolError $
-                      cat "Malformed reply line type " (esc 1 typ) '.'
-
-    readData = do
-      line <- hGetLine handle (B.pack "\n") maxLineLength
-      case (if B.last line == '\r' then B.init else id) line of
-        line' | line == (B.pack ".\r")   -> return []
-              | any B.null [line, line'] -> readData
-              | otherwise                -> fmap (line' :) readData
-
-    crlf = B.pack "\r\n"
-    maxLineLength = 2^20
+    strip bs = case unsnoc bs of
+        Just (bs', '\r') -> bs'
+        _                -> bs
 
 --------------------------------------------------------------------------------
 -- Data types
diff --git a/src/TorDNSEL/Util.hsc b/src/TorDNSEL/Util.hsc
index 5cea0bb..6bbffc3 100644
--- a/src/TorDNSEL/Util.hsc
+++ b/src/TorDNSEL/Util.hsc
@@ -30,6 +30,9 @@ module TorDNSEL.Util (
   , replaceError
   , handleError
 
+  -- * Show functions
+  , bshow
+
   -- * Strict functions
   , adjust'
   , alter'
@@ -49,6 +52,7 @@ module TorDNSEL.Util (
   , inet_htoa
   , encodeBase16
   , split
+  , unsnoc
   , syncExceptions
   , bracket'
   , finally'
@@ -59,10 +63,13 @@ module TorDNSEL.Util (
   , inBoundsOf
   , htonl
   , ntohl
-  , hGetLine
   , splitByDelimiter
   , showUTCTime
 
+  -- * Conduit utilities
+  , takeC
+  , frameC
+
   -- * Network functions
   , bindUDPSocket
   , bindListeningTCPSocket
@@ -116,9 +123,11 @@ import Data.Char
 import Data.Dynamic (Dynamic)
 import Data.List (foldl', intersperse)
 import Data.Maybe (mapMaybe)
+import Data.Monoid
 import qualified Data.ByteString.Char8 as B
-import qualified Data.ByteString.Internal as B
-import qualified Data.ByteString.Unsafe as B
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.ByteString.Internal as B (c2w)
+import qualified Data.ByteString as B (hGetSome)
 import Data.ByteString (ByteString)
 import qualified Data.Map as M
 import Data.Ratio (numerator, denominator, (%))
@@ -140,6 +149,9 @@ import System.Posix.Types (FileMode)
 import Text.Printf (printf)
 import Data.Binary (Binary(..))
 
+import qualified Data.Conduit as C
+import qualified Data.Conduit.Binary as CB
+
 #include <netinet/in.h>
 
 --------------------------------------------------------------------------------
@@ -240,6 +252,12 @@ handleError :: MonadError e m => (e -> m a) -> m a -> m a
 handleError = flip catchError
 
 --------------------------------------------------------------------------------
+-- Show functions
+
+bshow :: (Show a) => a -> B.ByteString
+bshow = B.pack . show
+
+--------------------------------------------------------------------------------
 -- Strict functions
 
 -- | Same as 'M.adjust', but the adjusting function is applied strictly.
@@ -322,6 +340,10 @@ encodeBase16 = B.pack . concat . B.foldr ((:) . toBase16 . B.c2w) []
 split :: Int -> ByteString -> [ByteString]
 split x = takeWhile (not . B.null) . map (B.take x) . iterate (B.drop x)
 
+-- | Deconstruct a 'ByteString' at the tail.
+unsnoc :: ByteString -> Maybe (ByteString, Char)
+unsnoc bs | B.null bs = Nothing
+          | otherwise = Just (B.init bs, B.last bs)
 
 -- | Try an action, catching -- roughly -- "synchronous" exceptions.
 --
@@ -401,114 +423,18 @@ instance Error e => MonadError e Maybe where
 foreign import ccall unsafe "htonl" htonl :: Word32 -> Word32
 foreign import ccall unsafe "ntohl" ntohl :: Word32 -> Word32
 
--- | Read a line terminated by an arbitrary sequence of bytes from a handle. The
--- end-of-line sequence is stripped before returning the line. @maxLen@
--- specifies the maximum line length to read, not including the end-of-line
--- sequence. If the line length exceeds @maxLen@, return the first @maxLen@
--- bytes. If EOF is encountered, return the bytes preceding it. The handle
--- should be in 'LineBuffering' mode.
-hGetLine :: Handle -> ByteString -> Int -> IO ByteString
-hGetLine = error "hGetLine" -- XXX STUB
--- hGetLine h eol maxLen | B.null eol = B.hGet h maxLen
--- hGetLine h eol@(B.PS _ _ eolLen) maxLen
---   = wantReadableHandle "TorDNSEL.Util.hGetLine" h $ \handle_ -> do
---       case haBufferMode handle_ of
---         NoBuffering -> error "no buffering"
---         _other      -> hGetLineBuffered handle_
--- 
---   where
---     hGetLineBuffered handle_ = do
---       let ref = haBuffer handle_
---       buf <- readIORef ref
---       hGetLineBufferedLoop handle_ ref buf 0 0 []
--- 
---     hGetLineBufferedLoop handle_ ref
---       buf at Buffer{ bufRPtr=r, bufWPtr=w, bufBuf=raw } !len !eolIx xss = do
---         (new_eolIx,off) <- findEOL eolIx r w raw
---         let new_len = len + off - r
--- 
---         if maxLen > 0 && new_len - new_eolIx > maxLen
---           -- If the line length exceeds maxLen, return a partial line.
---           then do
---             let maxOff = off - (new_len - maxLen)
---             writeIORef ref buf{ bufRPtr = maxOff }
---             mkBigPS . (:xss) =<< mkPS raw r maxOff
---           else if new_eolIx == eolLen
---             -- We have a complete line; strip the EOL sequence and return it.
---             then do
---               if w == off
---                 then writeIORef ref buf{ bufRPtr=0, bufWPtr=0 }
---                 else writeIORef ref buf{ bufRPtr = off }
---               if eolLen <= off - r
---                 then mkBigPS . (:xss) =<< mkPS raw r (off - eolLen)
---                 else fmap stripEOL . mkBigPS . (:xss) =<< mkPS raw r off
---             else do
---               xs <- mkPS raw r off
---               maybe_buf <- maybeFillReadBuffer (haFD handle_) True
---                              (haIsStream handle_) buf{ bufWPtr=0, bufRPtr=0 }
---               case maybe_buf of
---                 -- Nothing indicates we caught an EOF, and we may have a
---                 -- partial line to return.
---                 Nothing -> do
---                   writeIORef ref buf{ bufRPtr=0, bufWPtr=0 }
---                   if new_len > 0
---                     then mkBigPS (xs:xss)
---                     else ioe_EOF
---                 Just new_buf ->
---                   hGetLineBufferedLoop handle_ ref new_buf new_len new_eolIx
---                                        (xs:xss)
--- 
---     maybeFillReadBuffer fd is_line is_stream buf
---       = catch (Just `fmap` fillReadBuffer fd is_line is_stream buf)
---               (\e -> if isEOFError e then return Nothing else ioError e)
--- 
---     findEOL eolIx
---       | eolLen == 1 = findEOLChar (B.w2c $ B.unsafeHead eol)
---       | otherwise   = findEOLSeq eolIx
--- 
---     findEOLChar eolChar r w raw
---       | r == w = return (0, r)
---       | otherwise = do
---           (!c,!r') <- readCharFromBuffer raw r
---           if c == eolChar
---             then return (1, r')
---             else findEOLChar eolChar r' w raw
--- 
---     -- find the end-of-line sequence, if there is one
---     findEOLSeq !eolIx r w raw
---       | eolIx == eolLen || r == w = return (eolIx, r)
---       | otherwise = do
---           (!c,!r') <- readCharFromBuffer raw r
---           findEOLSeq (next c eolIx + 1) r' w raw
--- 
---     -- get the next index into the EOL sequence we should match against
---     next !c !i = if i >= 0 && c /= eolIndex i then next c (table ! i) else i
--- 
---     eolIndex = B.w2c . B.unsafeIndex eol
--- 
---     -- build a match table for the Knuth-Morris-Pratt algorithm
---     table = runSTUArray (do
---       arr <- newArray_ (0, if eolLen == 1 then 1 else eolLen - 1)
---       zipWithM_ (writeArray arr) [0,1] [-1,0]
---       loop arr 2 0)
---       where
---         loop arr !t !p
---           | t >= eolLen = return arr
---           | eolIndex (t - 1) == eolIndex p
---           = let p' = p + 1 in writeArray arr t p' >> loop arr (t + 1) p'
---           | p > 0 = readArray arr p >>= loop arr t
---           | otherwise = writeArray arr t 0 >> loop arr (t + 1) p
--- 
---     stripEOL (B.PS p s l) = E.assert (new_len >= 0) . B.copy $ B.PS p s new_len
---       where new_len = l - eolLen
--- 
---     mkPS buf start end = B.create len $ \p -> do
---       B.memcpy_ptr_baoff p buf (fromIntegral start) (fromIntegral len)
---       return ()
---       where len = end - start
--- 
---     mkBigPS [ps] = return ps
---     mkBigPS pss  = return $! B.concat (reverse pss)
+takeC :: Monad m => Int -> C.ConduitM ByteString o m ByteString
+takeC = fmap (mconcat . BL.toChunks) . CB.take
+
+-- | Take a prefix up to delimiter.
+-- FIXME This is worst-case quadratic.
+frameC :: Monad m => ByteString -> C.ConduitM ByteString o m ByteString
+frameC delim = loop $ B.pack "" where
+  loop acc = C.await >>=
+    return acc `maybe` \bs ->
+      case B.breakSubstring delim $ acc <> bs of
+            (h, t) | B.null t  -> loop h
+                   | otherwise -> h <$ C.leftover (B.drop (B.length delim) t)
 
 -- | Split @bs@ into pieces delimited by @delimiter@, consuming the delimiter.
 -- The result for overlapping delimiters is undefined.
diff --git a/tordnsel.cabal b/tordnsel.cabal
index 3173943..50e7f40 100644
--- a/tordnsel.cabal
+++ b/tordnsel.cabal
@@ -15,7 +15,7 @@ Maintainer:      tup.tuple at googlemail.com, lunar at debian.org, andrew at torproject.o
 Build-Type:      Simple
 Build-Depends:   base>=2.0, network>=2.0, mtl>=1.0, unix>=1.0, stm>=2.0,
                  time>=1.0, HUnit>=1.1, binary>=0.4, bytestring>=0.9, array>=0.1, directory>=1.0,
-                 containers>=0.1, deepseq >= 1.3
+                 containers>=0.1, conduit >= 1.0.0 && < 1.1.0, deepseq >= 1.3
 Tested-With:     GHC==6.6, GHC==6.8, GHC==6.10, GHC==6.12
 Data-Files:      config/tordnsel.conf.sample, contrib/cacti-input.pl,
                  contrib/tordnsel-init.d-script.sample, doc/tordnsel.8





More information about the tor-commits mailing list