From 8362fd4ad48c100523532ca80c37d8eaa5d241f4 Mon Sep 17 00:00:00 2001 From: Ilya Portnov Date: Sun, 30 May 2010 13:35:11 +0600 Subject: [PATCH] Support for persistent connections. --- Makefile | 5 ++- Network/YAML.hs | 2 + Network/YAML/Base.hs | 2 + Network/YAML/Caller.hs | 70 ++++++++++++++++++++++++++++++++++-------- Network/YAML/Dispatcher.hs | 3 ++ Network/YAML/Server.hs | 56 ++++++++++++++++++++++++++++------ Network/YAML/WrapMethods.hs | 4 +- Test.hs | 6 ++-- TestCall.hs | 50 +++++++++++++++++++----------- 9 files changed, 149 insertions(+), 49 deletions(-) diff --git a/Makefile b/Makefile index b675c60..3aceabe 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,11 @@ +GHC=ghc -i. --make all: Test TestCall Test: *.hs Network/YAML/*.hs - ghc -i. --make Test.hs + $(GHC) Test.hs TestCall: *.hs Network/YAML/*.hs - ghc -i. --make TestCall.hs + $(GHC) TestCall.hs clean: find . -name \*.hi -delete diff --git a/Network/YAML.hs b/Network/YAML.hs index faf5ad9..a1fcb8d 100644 --- a/Network/YAML.hs +++ b/Network/YAML.hs @@ -7,9 +7,11 @@ module Network.YAML module Network.YAML.Dispatcher, module Network.YAML.Balancer, module Network.YAML.WrapMethods, + HostAndPort, forkA ) where +import Network.YAML.Base (HostAndPort) import Network.YAML.Caller import Network.YAML.Instances import Network.YAML.Derive diff --git a/Network/YAML/Base.hs b/Network/YAML/Base.hs index c429dad..dd762ae 100644 --- a/Network/YAML/Base.hs +++ b/Network/YAML/Base.hs @@ -10,6 +10,8 @@ import Data.Object.Yaml import qualified Data.ByteString.Char8 as BS import Text.Libyaml hiding (encode, decode) +type HostAndPort = (BS.ByteString, Int) + class (ConvertSuccess YamlObject a, ConvertSuccess a YamlObject, Default a) => IsYamlObject a where getAttr :: BS.ByteString -> YamlObject -> Maybe YamlObject diff --git a/Network/YAML/Caller.hs b/Network/YAML/Caller.hs index 4e44cc4..f035072 100644 --- a/Network/YAML/Caller.hs +++ b/Network/YAML/Caller.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE OverloadedStrings, FlexibleInstances, TypeSynonymInstances #-} module Network.YAML.Caller where @@ -15,6 +15,16 @@ import Network.YAML.Base import Network.YAML.Instances import Network.YAML.Server +class Connection c where + newConnection :: (BS.ByteString, Int) -> IO c + closeConnection :: c -> IO () +-- | Call remote method + call :: (IsYamlObject a, IsYamlObject b) + => c + -> BS.ByteString -- ^ Name of method + -> a -- ^ Argument for method + -> IO b + -- | Send any YAML text and return an answer sendYAML :: (BS.ByteString, Int) -- ^ (Hostname, port) -> BS.ByteString -- ^ YAML text @@ -28,19 +38,51 @@ sendYAML (host,port) yaml = withSocketsDo $ do let text = BS.unlines lns return text --- | Call remote method -call :: (IsYamlObject a, IsYamlObject b) - => (BS.ByteString, Int) -- ^ (Host name, port number) - -> BS.ByteString -- ^ Name of method - -> a -- ^ Argument for method - -> IO b -call (host,port) name args = do - let c = mkCall name (cs args) - s = serialize c - text <- sendYAML (host,port) s - case unserialize text of - Nothing -> fail "No answer" - Just x -> return x +-- | Send any YAML text and return an answer +hSendYAML :: Handle + -> BS.ByteString -- ^ YAML text + -> IO BS.ByteString -- ^ Answer +hSendYAML h yaml = withSocketsDo $ do + hSetBuffering h NoBuffering + BS.hPutStrLn h yaml + lns <- readHandle h [] + let text = BS.unlines lns + return text + +instance Connection HostAndPort where + -- | Call remote method +-- call :: (IsYamlObject a, IsYamlObject b) +-- => (BS.ByteString, Int) -- ^ (Host name, port number) +-- -> BS.ByteString -- ^ Name of method +-- -> a -- ^ Argument for method +-- -> IO b + call (host,port) name args = do + let c = mkCall name (cs args) + s = serialize c + text <- sendYAML (host,port) s + case unserialize text of + Nothing -> fail "No answer" + Just x -> return x + + newConnection pair = return pair + closeConnection _ = return () + +newtype PersistentConnection = PC Handle + +instance Connection PersistentConnection where + newConnection (host, port) = do + h <- connectTo (BS.unpack host) (PortNumber $ fromIntegral port) + return (PC h) + + closeConnection (PC h) = hClose h + + call (PC h) name args = do + let c = mkCall name (cs args) + s = serialize c + text <- hSendYAML h s + case unserialize text of + Nothing -> fail "No answer" + Just x -> return x -- | Similar, but select server on each call callDynamic :: (IsYamlObject a, IsYamlObject b) diff --git a/Network/YAML/Dispatcher.hs b/Network/YAML/Dispatcher.hs index 4ed9dff..4af6763 100644 --- a/Network/YAML/Dispatcher.hs +++ b/Network/YAML/Dispatcher.hs @@ -29,3 +29,6 @@ dispatch rules = \obj -> -- | Listens given port and dispatches requests dispatcher :: Int -> Rules -> IO () dispatcher port rules = server port (dispatch rules) + +persistentDispatcher :: Int -> Rules -> IO () +persistentDispatcher port rules = persistentServer port (dispatch rules) diff --git a/Network/YAML/Server.hs b/Network/YAML/Server.hs index 7978ba1..3366276 100644 --- a/Network/YAML/Server.hs +++ b/Network/YAML/Server.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE OverloadedStrings #-} module Network.YAML.Server where @@ -30,16 +31,20 @@ readHandle :: Handle -> [BS.ByteString] -- ^ Already read lines -> IO [BS.ByteString] readHandle h acc = do - line <- BS.hGetLine h - let line' = if BS.null line - then line - else if (BS.last line)=='\r' - then BS.init line - else line --- print $ "read line:"++line' - if BS.null line' - then return acc - else readHandle h (acc ++ [line']) + eof <- hIsEOF h + if eof + then return acc + else do + line <- BS.hGetLine h + let line' = if BS.null line + then line + else if (BS.last line)=='\r' + then BS.init line + else line + -- print $ "read line:"++line' + if BS.null line' + then return acc + else readHandle h (acc ++ [line']) -- | Start server and wait for connections server :: @@ -67,3 +72,34 @@ server port callOut = do BS.hPutStrLn h $ serialize res hClose h) +persistentServer :: + Int + -> (YamlObject -> IO YamlObject) + -> IO () +persistentServer port callOut = do +-- installHandler sigPIPE Ignore Nothing + sock <- listenOn (PortNumber $ fromIntegral port) + (forever $ loop sock) `finally` sClose sock + where + loop :: Socket -> IO ThreadId + loop sock = + do (h,_nm,_port) <- accept sock + forkIO (worker h) + + worker :: Handle -> IO () + worker h = do + hSetBuffering h NoBuffering + lns <- readHandle h [] + let text = BS.unlines lns + if BS.null text + then hClose h + else + case unserialize text of + Nothing -> hClose h + Just ob -> do + res <- callOut ob + BS.hPutStrLn h $ serialize res + if getScalarAttr "connection" ob == Just ("close" :: BS.ByteString) + then hClose h + else worker h + diff --git a/Network/YAML/WrapMethods.hs b/Network/YAML/WrapMethods.hs index 0c816be..7adb28c 100644 --- a/Network/YAML/WrapMethods.hs +++ b/Network/YAML/WrapMethods.hs @@ -28,7 +28,7 @@ remote name = do (VarI _ tp _ _) <- reify name let AppT (AppT ArrowT a) ioB = tp sequence [ - sigD cName [t| (BS.ByteString, Int) -> $(return a) -> $(return ioB) |], + sigD cName [t| (Connection c) => c -> $(return a) -> $(return ioB) |], funD cName [c]] remote' :: Name -> Q [Dec] @@ -39,7 +39,7 @@ remote' name = do (VarI _ tp _ _) <- reify name let AppT (AppT ArrowT _) (AppT (AppT ArrowT a) ioB) = tp sequence [ - sigD cName [t| (BS.ByteString, Int) -> $(return a) -> $(return ioB) |], + sigD cName [t| (Connection c) => c -> $(return a) -> $(return ioB) |], funD cName [c]] rulePair :: Name -> ExpQ diff --git a/Test.hs b/Test.hs index 3fd9326..ebb4cc2 100644 --- a/Test.hs +++ b/Test.hs @@ -20,7 +20,7 @@ $(declareRulesWithArg 'st ['double, 'mySum, 'counter, 'ls]) main = do putStrLn "Listening..." -- Start 3 listeners on 3 ports - forkA [dispatcher 5000 dispatchingRules, - dispatcher 5001 dispatchingRules, - dispatcher 5002 dispatchingRules] + forkA [persistentDispatcher 5000 dispatchingRules, + persistentDispatcher 5001 dispatchingRules, + persistentDispatcher 5002 dispatchingRules] return () diff --git a/TestCall.hs b/TestCall.hs index be745e3..fbb366f 100644 --- a/TestCall.hs +++ b/TestCall.hs @@ -1,7 +1,10 @@ -{-# LANGUAGE OverloadedStrings, TemplateHaskell #-} +{-# LANGUAGE OverloadedStrings, TemplateHaskell, ScopedTypeVariables #-} -- | Test client module Main where +import Control.Monad +import System.Environment (getArgs) +import qualified Data.ByteString.Char8 as BS import Data.Object.Yaml import Data.Convertible.Base @@ -22,29 +25,40 @@ $(remote' 'Methods.ls) -- Now `ls' is defined here as -- ls :: (ByteString,Int) -> String -> IO [String] -rules = [("test", ("127.0.0.1", 5000), 1), - ("test", ("127.0.0.1", 5001), 1), - ("test", ("127.0.0.1", 5002), 1)] -getService = selectRandom rules +rules host = [("test", (host', 5000), 1), + ("test", (host', 5001), 1), + ("test", (host', 5002), 1)] + where + host' = BS.pack host + +getService host = selectRandom (rules host) p = Point 2.0 3.0 ps = [Point 3.0 5.0, Point 1.0 2.1, Point 0.1 0.2] main = do - test <- getService "test" + [host] <- getArgs + test <- getService host "test" + + (conn :: PersistentConnection) <- newConnection test +-- (conn :: HostAndPort) <- newConnection test -- call remote functions - r <- double test p - print r - s <- mySum test [3.5, 5.5, 1.0] - print s - lst <- ls test "/tmp" - print lst - - -- call remote functions for many arguments, for each argument on different server maybe - rs <- callP getService "test" "double" ps - print (rs :: [Point]) - cs <- callP getService "test" "counter" $ zip ([3,4,5,6] :: [Int]) ([1..] :: [Int]) - print (cs :: [Int]) + replicateM 100 $ do + r <- double conn p + print r + +-- s <- mySum conn [3.5, 5.5, 1.0] +-- print s +-- lst <- ls conn "/tmp" +-- print lst +-- +-- -- call remote functions for many arguments, for each argument on different server maybe +-- rs <- callP (getService host) "test" "double" ps +-- print (rs :: [Point]) +-- cs <- callP (getService host) "test" "counter" $ zip ([3,4,5,6] :: [Int]) ([1..] :: [Int]) +-- print (cs :: [Int]) + + closeConnection conn -- 1.7.2.3