Как быстро обрабатывать и выполнять действия с данными websocket в c #? - PullRequest
0 голосов
/ 26 февраля 2019

Я подключаюсь к стороннему серверу провайдера данных, используя websocket.Для подключения через веб-сокет мой код:

this.websocket = new WebSocket("wss://socket.polygon.io/stocks", sslProtocols: SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls);

Поэтому, когда соединение устанавливается, мы получаем от 70 000 до 1 000 000 записей каждую минуту.Таким образом, после этого мы разделили эти ответы и сохранили их в отдельных файлах.Например, если мы получим данные для AAPL, мы сохраним эти данные в файле AAPL.То же, что и для FB, MSFT, IBM, QQQ и т. Д.В общей сложности у нас есть 10 000 файлов, которые мы должны обрабатывать за один раз, и в соответствии с ним хранить живые записи.

public static string tempFile = @"D:\TempFileForLiveMarket\tempFileStoreLiveSymbols.txt";
public static System.IO.StreamWriter w;
private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
{
  using (w = System.IO.File.AppendText(tempFile))
  {
     Log(e.Message, w);
  }
  using (System.IO.StreamReader r = System.IO.File.OpenText(tempFile))
  {
     DumpLog(r);
  }
}

public static void Log(string responseMessage, System.IO.TextWriter w)
{
     w.WriteLine(responseMessage);
}

public static void DumpLog(System.IO.StreamReader r)
{
  string line;
  while ((line = r.ReadLine()) != null)
  {
     WriteRecord(line);
  }
}

public static void WriteRecord(string data)
{
   List<LiveData> ld = JsonConvert.DeserializeObject<List<LiveData>>(data);
   var filterData = ld.Where(x => symbolList.Contains(x.sym));
   List<string> fileLines = new List<string>();
   foreach (var item in filterData)
   {
      var fileName = @"D:\SymbolsData\"+item.sym+ "_day_Aggregate.txt";
      fileLines = File.ReadAllLines(fileName).AsParallel().Skip(1).ToList();
      if (fileLines.Count > 1)
      {
         var lastLine = fileLines.Last();
         if (!lastLine.Contains(item.sym))
         {
               fileLines.RemoveAt(fileLines.Count - 1);
         }
      }
      fileLines.Add(item.sym + "," + item.s + "," + item.p + "-----");
      System.IO.File.WriteAllLines(fileName, fileLines);
   }
}

Итак, когда установлено соединение с веб-сокетом и выполняются действия с рыночными рыночными данными с нашими 10000 файлами, оно становитсяболее медленное, а также соединение через веб-сокет закрывается через несколько минут и передается сообщение, как показано ниже:

Websocket Error
Received an unexpected EOF or 0 bytes from the transport stream.
Connection Closed...

Я выполняю весь этот процесс, потому что на следующем этапе мне нужно провести технический анализ реальной цены каждого символа.Так как я могу справиться с этой ситуацией?Как я могу сделать процесс быстрее, чем эта скорость обработки?и как я могу остановиться для соединения закрыто?

После редактирования

Я заменяю потоковую запись и временный файл на String Builder, как показано ниже,

public static StringBuilder sb = new StringBuilder();
public static System.IO.StringWriter sw;
private void websocket_MessageReceived(object sender, MessageReceivedEventArgs e)
{
      sw = new System.IO.StringWriter(sb);
      sw.WriteLine(e.Message);
      Reader();
}

public static void Reader()
{
     System.IO.StringReader _sr = new System.IO.StringReader(sb.ToString());
     while (_sr.Peek() > -1)
     {
            WriteRecord(sb.ToString());
     }
     sb.Remove(0, sb.Length);
}

public static void WriteRecord(string data)
{
     List<LiveData> ld = JsonConvert.DeserializeObject<List<LiveData>>(data);
     foreach (var item in filterData)
     {
           var fileName = @"D:\SymbolsData\"+item.sym+ "_day_Aggregate.txt";
           fileLines = File.ReadAllLines(fileName).AsParallel().Skip(1).ToList();
           fileLines.RemoveAt(fileLines.Count - 1);
           fileLines.Add(item.sym + "," + item.s + "," + item.p)
           System.IO.File.WriteAllLines(fileName, fileLines);
      }
}

1 Ответ

0 голосов
/ 07 марта 2019

Похоже, вы добавляете каждое сообщение к tempFile, но затем вы обрабатываете все tempFile.Это означает, что вы постоянно повторно обрабатываете старые данные плюс новую запись, так что да: это будет постепенно занимать все больше и больше и больше времени до тех пор, пока другой конец не получитНадоело ждать и отрезает тебя.Мой совет: не делайте этого.

Есть также много вещей, которые вы могли бы сделать более эффективно при фактической обработке каждой записи, но это не имеет значения по сравнению с накладными расходами постоянноповторная обработка всего .

...