F #, System.IO.IOException: все экземпляры канала заняты - PullRequest
1 голос
/ 08 сентября 2010

У меня есть приложение F #, которое связывается с Java-приложением через именованный канал. Где F # действует как сервер, а java - как клиент. Приложение работает по большей части за исключением того, что F # иногда запускает ошибку «System.IO.IOException: все экземпляры канала заняты» Ниже приведена полная трассировка стека исключений и фрагментов кода для F # и Java. Любая помощь приветствуется в решении этой проблемы

Спасибо, Sudaly

трассировка полного стека:

Unhandled Exception: System.IO.IOException: All pipe instances are busy.
   at Microsoft.FSharp.Control.CancellationTokenOps.Start@1143-1.Invoke(Exception e)
   at <StartupCode$FSharp-Core>.$Control.loop@413-38(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.-ctor@473-1.Invoke(Object state)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

F # Код:

[<DataContract>] 
type Quote = { 
    [<field: DataMember(Name="securityIdentifier") >] 
    RicCode:string
    [<field: DataMember(Name="madeOn") >] 
    MadeOn:DateTime
    [<field: DataMember(Name="closePrice") >] 
    Price:int 
    }

let globalPriceCache = new Dictionary<string, Quote>()

let ParseQuoteString (quoteString:string) = 
    let data = Encoding.Unicode.GetBytes(quoteString)
    let stream = new MemoryStream() 
    stream.Write(data, 0, data.Length); 
    stream.Position <- 0L 
    let ser = Json.DataContractJsonSerializer(typeof<Quote array>) 
    let results:Quote array = ser.ReadObject(stream) :?> Quote array
    results

let RefreshCache quoteList =
    globalPriceCache.Clear() 
    quoteList 
    |> Array.iter(fun result->globalPriceCache.Add(result.RicCode, result)) 

let EstablishConnection() =
    let pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 4)
    pipeServer.WaitForConnection()
    try
        Some(new StreamReader(pipeServer))
    with e -> 
        None

let rec MarketPriceCache() =
    match EstablishConnection() with
    |Some(sr) ->
        // Read request from the stream.
        let m_cache = 
            sr.ReadLine()  
            |>  ParseQuoteString  
            |>  RefreshCache

        MarketPriceCache()
    | _ -> () 


[<EntryPoint>]
let main args=
    try
        async { 
            MarketPriceCache() 
        } |> Async.Start

        while true do
            if globalPriceCache.Count > 0 then
    //Business logic
                System.Threading.Thread.Sleep(1000 * 50)
            else

                ignore(logInfo(sprintf "%s" "Price Cache is empty"))
                System.Threading.Thread.Sleep(1000 * 65)

    with e ->
        ignore(logError e.Message)
        ignore(logError e.StackTrace)    
    0

Java-код:

public void WatchForPrice()
 {
  while (true)
  {
   try 
   {
    Map<String, SecurityQuoteCacheEntry> priceMap = getPriceCache().getCacheMap();
    List<LocalSecurityQuote> localSecurityQuotes = new ArrayList<LocalSecurityQuote>();
    if(priceMap != null)
    {

     Set<String> keySet = priceMap.keySet();
     System.out.println("Key Size: " + keySet.size());
     for(String key : keySet)
     {
      SecurityQuote quote =  priceMap.get(key).getQuote();
      if(quote != null)
      {
       LocalSecurityQuote localSecurityQuote = new LocalSecurityQuote();
       localSecurityQuote.setClosePrice(quote.getClosePrice());
       localSecurityQuote.setMadeOn(quote.getMadeOn());     
       localSecurityQuote.setSecurityIdentifier(key);
       localSecurityQuotes.add(localSecurityQuote);
      }

     }

     JSONSerializer serializer = new JSONSerializer();
     String jsonString = serializer.serialize(localSecurityQuotes);

     // Connect to the pipe
     RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw");
     if (pipe != null )
     {
      // write to pipe
      pipe.write(jsonString.getBytes());
      pipe.close();

     }
     else
      System.out.println("Pipe not found");
     doPeriodicWait();
    }
    else 
     System.out.println("No Price data found");
   }
   catch (Exception e) 
   {
    e.printStackTrace();
    System.out.println(e.getMessage());
    doPeriodicWait();
   }
  }
 }

Ответы [ 2 ]

4 голосов
/ 08 сентября 2010

Это догадка, но, возможно, проблема в том, что вы не закрываете программу чтения потокового канала?

let rec MarketPriceCache() =
match EstablishConnection() with
|Some(sr) ->
    // Read request from the stream.        
    try        
        sr.ReadLine()  
        |>  ParseQuoteString  
        |>  RefreshCache   
    finally
        sr.Close()
    MarketPriceCache()
| _ -> () 

(переменная m_cache не нужна - вы ее нигде не используете)

3 голосов
/ 08 сентября 2010

Вы должны утилизировать NamedPipeServerStream каждый раз, когда создаете его. Самый простой способ сделать это в вашем коде - избавиться от StreamReader внутри MarketPriceCache, поместив вокруг него оператор use:

let rec MarketPriceCache() =
    match EstablishConnection() with
    | Some(sr) ->
        // Read request from the stream.
        use reader = sr in
        (
            let m_cache = 
                reader.ReadLine()  
                |>  ParseQuoteString  
                |>  RefreshCache
        )
        MarketPriceCache()
    | _ -> ()

Синтаксис с using ... in используется для предотвращения завершения области читателя после рекурсивного вызова MarketPriceCache.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...