Support for persistent connections.

Ilya Portnov [2010-05-30 07:35:11]
Support for persistent connections.
Filename
Makefile
Network/YAML.hs
Network/YAML/Base.hs
Network/YAML/Caller.hs
Network/YAML/Dispatcher.hs
Network/YAML/Server.hs
Network/YAML/WrapMethods.hs
Test.hs
TestCall.hs
diff --git a/Makefile b/Makefile
index b675c60..3aceabe 100644
--- a/Makefile
+++ b/Makefile
@@ -1,10 +1,11 @@
+GHC=ghc -i. --make
 all: Test TestCall

 Test: *.hs Network/YAML/*.hs
-	ghc -i. --make Test.hs
+	$(GHC) Test.hs

 TestCall: *.hs Network/YAML/*.hs
-	ghc -i. --make TestCall.hs
+	$(GHC) TestCall.hs

 clean:
 	find . -name \*.hi -delete
diff --git a/Network/YAML.hs b/Network/YAML.hs
index faf5ad9..a1fcb8d 100644
--- a/Network/YAML.hs
+++ b/Network/YAML.hs
@@ -7,9 +7,11 @@ module Network.YAML
    module Network.YAML.Dispatcher,
    module Network.YAML.Balancer,
    module Network.YAML.WrapMethods,
+   HostAndPort,
    forkA
   ) where

+import Network.YAML.Base (HostAndPort)
 import Network.YAML.Caller
 import Network.YAML.Instances
 import Network.YAML.Derive
diff --git a/Network/YAML/Base.hs b/Network/YAML/Base.hs
index c429dad..dd762ae 100644
--- a/Network/YAML/Base.hs
+++ b/Network/YAML/Base.hs
@@ -10,6 +10,8 @@ import Data.Object.Yaml
 import qualified Data.ByteString.Char8 as BS
 import Text.Libyaml hiding (encode, decode)

+type HostAndPort = (BS.ByteString, Int)
+
 class (ConvertSuccess YamlObject a, ConvertSuccess a YamlObject, Default a) => IsYamlObject a where

 getAttr :: BS.ByteString -> YamlObject -> Maybe YamlObject
diff --git a/Network/YAML/Caller.hs b/Network/YAML/Caller.hs
index 4e44cc4..f035072 100644
--- a/Network/YAML/Caller.hs
+++ b/Network/YAML/Caller.hs
@@ -1,4 +1,4 @@
-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE OverloadedStrings, FlexibleInstances, TypeSynonymInstances #-}

 module Network.YAML.Caller where

@@ -15,6 +15,16 @@ import Network.YAML.Base
 import Network.YAML.Instances
 import Network.YAML.Server

+class Connection c where
+  newConnection :: (BS.ByteString, Int) -> IO c
+  closeConnection :: c -> IO ()
+-- | Call remote method
+  call :: (IsYamlObject a, IsYamlObject b)
+       => c
+       -> BS.ByteString                   -- ^ Name of method
+       -> a                               -- ^ Argument for method
+       -> IO b
+
 -- | Send any YAML text and return an answer
 sendYAML :: (BS.ByteString, Int)      -- ^ (Hostname, port)
          -> BS.ByteString             -- ^ YAML text
@@ -28,19 +38,51 @@ sendYAML (host,port) yaml =  withSocketsDo $ do
   let text = BS.unlines lns
   return text

--- | Call remote method
-call :: (IsYamlObject a, IsYamlObject b)
-     => (BS.ByteString, Int)            -- ^ (Host name, port number)
-     -> BS.ByteString                   -- ^ Name of method
-     -> a                               -- ^ Argument for method
-     -> IO b
-call (host,port) name args = do
-  let c = mkCall name (cs args)
-      s = serialize c
-  text <- sendYAML (host,port) s
-  case unserialize text of
-    Nothing -> fail "No answer"
-    Just x -> return x
+-- | Send any YAML text and return an answer
+hSendYAML :: Handle
+         -> BS.ByteString             -- ^ YAML text
+         -> IO BS.ByteString          -- ^ Answer
+hSendYAML h yaml =  withSocketsDo $ do
+  hSetBuffering h NoBuffering
+  BS.hPutStrLn h yaml
+  lns <- readHandle h []
+  let text = BS.unlines lns
+  return text
+
+instance Connection HostAndPort where
+  -- | Call remote method
+--   call :: (IsYamlObject a, IsYamlObject b)
+--        => (BS.ByteString, Int)            -- ^ (Host name, port number)
+--        -> BS.ByteString                   -- ^ Name of method
+--        -> a                               -- ^ Argument for method
+--        -> IO b
+  call (host,port) name args = do
+    let c = mkCall name (cs args)
+        s = serialize c
+    text <- sendYAML (host,port) s
+    case unserialize text of
+      Nothing -> fail "No answer"
+      Just x -> return x
+
+  newConnection pair = return pair
+  closeConnection _ = return ()
+
+newtype PersistentConnection = PC Handle
+
+instance Connection PersistentConnection where
+  newConnection (host, port) = do
+    h <- connectTo (BS.unpack host) (PortNumber $ fromIntegral port)
+    return (PC h)
+
+  closeConnection (PC h) = hClose h
+
+  call (PC h) name args = do
+    let c = mkCall name (cs args)
+        s = serialize c
+    text <- hSendYAML h s
+    case unserialize text of
+      Nothing -> fail "No answer"
+      Just x -> return x

 -- | Similar, but select server on each call
 callDynamic :: (IsYamlObject a, IsYamlObject b)
diff --git a/Network/YAML/Dispatcher.hs b/Network/YAML/Dispatcher.hs
index 4ed9dff..4af6763 100644
--- a/Network/YAML/Dispatcher.hs
+++ b/Network/YAML/Dispatcher.hs
@@ -29,3 +29,6 @@ dispatch rules = \obj ->
 -- | Listens given port and dispatches requests
 dispatcher :: Int -> Rules -> IO ()
 dispatcher port rules = server port (dispatch rules)
+
+persistentDispatcher :: Int -> Rules -> IO ()
+persistentDispatcher port rules = persistentServer port (dispatch rules)
diff --git a/Network/YAML/Server.hs b/Network/YAML/Server.hs
index 7978ba1..3366276 100644
--- a/Network/YAML/Server.hs
+++ b/Network/YAML/Server.hs
@@ -1,3 +1,4 @@
+{-# LANGUAGE OverloadedStrings #-}

 module Network.YAML.Server where

@@ -30,16 +31,20 @@ readHandle :: Handle
            -> [BS.ByteString]       -- ^ Already read lines
            -> IO [BS.ByteString]
 readHandle h acc = do
-    line <- BS.hGetLine h
-    let line' = if BS.null line
-                  then line
-                  else if (BS.last line)=='\r'
-                          then BS.init line
-                          else line
---           print $ "read line:"++line'
-    if BS.null line'
-      then return acc
-      else readHandle h (acc ++ [line'])
+  eof <- hIsEOF h
+  if eof
+    then return acc
+    else do
+      line <- BS.hGetLine h
+      let line' = if BS.null line
+                    then line
+                    else if (BS.last line)=='\r'
+                            then BS.init line
+                            else line
+  --           print $ "read line:"++line'
+      if BS.null line'
+        then return acc
+        else readHandle h (acc ++ [line'])

 -- | Start server and wait for connections
 server ::
@@ -67,3 +72,34 @@ server port callOut = do
                     BS.hPutStrLn h $ serialize res
                     hClose h)

+persistentServer ::
+      Int
+   -> (YamlObject -> IO YamlObject)
+   -> IO ()
+persistentServer port callOut = do
+--        installHandler sigPIPE Ignore Nothing
+      sock  <- listenOn (PortNumber $ fromIntegral port)
+      (forever $ loop sock) `finally` sClose sock
+  where
+    loop :: Socket -> IO ThreadId
+    loop sock =
+         do (h,_nm,_port) <- accept sock
+            forkIO (worker h)
+
+    worker :: Handle -> IO ()
+    worker h = do
+      hSetBuffering h NoBuffering
+      lns <- readHandle h []
+      let text = BS.unlines lns
+      if BS.null text
+        then hClose h
+        else
+          case unserialize text of
+            Nothing -> hClose h
+            Just ob -> do
+              res <- callOut ob
+              BS.hPutStrLn h $ serialize res
+              if getScalarAttr "connection" ob == Just ("close" :: BS.ByteString)
+                then hClose h
+                else worker h
+
diff --git a/Network/YAML/WrapMethods.hs b/Network/YAML/WrapMethods.hs
index 0c816be..7adb28c 100644
--- a/Network/YAML/WrapMethods.hs
+++ b/Network/YAML/WrapMethods.hs
@@ -28,7 +28,7 @@ remote name = do
   (VarI _ tp _ _) <- reify name
   let AppT (AppT ArrowT a) ioB = tp
   sequence [
-    sigD cName [t| (BS.ByteString, Int) -> $(return a) -> $(return ioB) |],
+    sigD cName [t| (Connection c) => c -> $(return a) -> $(return ioB) |],
     funD cName [c]]

 remote' :: Name -> Q [Dec]
@@ -39,7 +39,7 @@ remote' name = do
   (VarI _ tp _ _) <- reify name
   let AppT (AppT ArrowT _) (AppT (AppT ArrowT a) ioB) = tp
   sequence [
-    sigD cName [t| (BS.ByteString, Int) -> $(return a) -> $(return ioB) |],
+    sigD cName [t| (Connection c) => c -> $(return a) -> $(return ioB) |],
     funD cName [c]]

 rulePair :: Name -> ExpQ
diff --git a/Test.hs b/Test.hs
index 3fd9326..ebb4cc2 100644
--- a/Test.hs
+++ b/Test.hs
@@ -20,7 +20,7 @@ $(declareRulesWithArg 'st ['double, 'mySum, 'counter, 'ls])
 main = do
   putStrLn "Listening..."
   -- Start 3 listeners on 3 ports
-  forkA [dispatcher 5000 dispatchingRules,
-         dispatcher 5001 dispatchingRules,
-         dispatcher 5002 dispatchingRules]
+  forkA [persistentDispatcher 5000 dispatchingRules,
+         persistentDispatcher 5001 dispatchingRules,
+         persistentDispatcher 5002 dispatchingRules]
   return ()
diff --git a/TestCall.hs b/TestCall.hs
index be745e3..fbb366f 100644
--- a/TestCall.hs
+++ b/TestCall.hs
@@ -1,7 +1,10 @@
-{-# LANGUAGE OverloadedStrings, TemplateHaskell #-}
+{-# LANGUAGE OverloadedStrings, TemplateHaskell, ScopedTypeVariables #-}
 -- | Test client
 module Main where

+import Control.Monad
+import System.Environment (getArgs)
+import qualified Data.ByteString.Char8 as BS
 import Data.Object.Yaml
 import Data.Convertible.Base

@@ -22,29 +25,40 @@ $(remote' 'Methods.ls)
 -- Now `ls' is defined here as
 -- ls :: (ByteString,Int) -> String -> IO [String]

-rules = [("test", ("127.0.0.1", 5000), 1),
-         ("test", ("127.0.0.1", 5001), 1),
-         ("test", ("127.0.0.1", 5002), 1)]

-getService = selectRandom rules
+rules host = [("test", (host', 5000), 1),
+              ("test", (host', 5001), 1),
+              ("test", (host', 5002), 1)]
+  where
+    host' = BS.pack host
+
+getService host = selectRandom (rules host)

 p = Point 2.0 3.0

 ps = [Point 3.0 5.0, Point 1.0 2.1, Point 0.1 0.2]

 main = do
-  test <- getService "test"
+  [host] <- getArgs
+  test <- getService host "test"
+
+  (conn :: PersistentConnection) <- newConnection test
+--   (conn :: HostAndPort) <- newConnection test

   -- call remote functions
-  r <- double test p
-  print r
-  s <- mySum test [3.5, 5.5, 1.0]
-  print s
-  lst <- ls test "/tmp"
-  print lst
-
-  -- call remote functions for many arguments, for each argument on different server maybe
-  rs <- callP getService "test" "double" ps
-  print (rs :: [Point])
-  cs <- callP getService "test" "counter" $ zip ([3,4,5,6] :: [Int]) ([1..] :: [Int])
-  print (cs :: [Int])
+  replicateM 100 $ do
+    r <- double conn p
+    print r
+
+--   s <- mySum conn [3.5, 5.5, 1.0]
+--   print s
+--   lst <- ls conn "/tmp"
+--   print lst
+--
+--   -- call remote functions for many arguments, for each argument on different server maybe
+--   rs <- callP (getService host) "test" "double" ps
+--   print (rs :: [Point])
+--   cs <- callP (getService host) "test" "counter" $ zip ([3,4,5,6] :: [Int]) ([1..] :: [Int])
+--   print (cs :: [Int])
+
+  closeConnection conn
ViewGit