C# ClientWebSocket не получает все пакеты? - PullRequest
2 голосов
/ 07 марта 2020

Я создал сервис windows, который подписывается на около 10 000 биржевых тикеров в режиме реального времени, используя ClientWebSocket. Если я подписываюсь на 1000 тикеров, я получаю все точки данных, как и должен (получая несколько сотен сообщений в секунду), как только я получаю до 2000 тикеров, мне кажется, что я не получаю данные, которыми я должен быть, 10 000 (получая тысячи сообщений в секунду) это еще хуже. Я запустил сравнительные отчеты, и, похоже, я теряю до 60% пакетов. Я поговорил с Polygon (поставщиком данных в реальном времени) об этой проблеме, и они утверждают, что их Socket - пожарный шланг, и все, что должно go, исчезнет, ​​и что ни один из их других клиентов не будет жаловаться. Таким образом, единственная логическая вещь здесь должна была бы предположить, что это мой код или какое-то ограничение. Может быть, это часть Task метода Receive? Возможно, у окон есть максимальное ограничение задачи, и я его превышаю.

Я также проверил это на выделенном высокопроизводительном сервере с 10-гигабитным соединением, поэтому это не похоже на ограничение соединения или оборудования.

Я также прошел мимо кеша BlockingCollection, и проблема все еще сохраняется.

Надеюсь, у кого-то есть понимание, спасибо!

Вот мой код:

        public static ConcurrentDictionary<string, TradeObj> TradeFeed = new ConcurrentDictionary<string, TradeObj>();
        public static ConcurrentDictionary<string, QuoteObj> QuoteFeed = new ConcurrentDictionary<string, QuoteObj>();
        public static ConcurrentDictionary<string, AggObj> AggFeed = new ConcurrentDictionary<string, AggObj>();
 public static BlockingCollection<byte[]> packets = new BlockingCollection<byte[]>();

        private static void Start(string[] args)
        {
            try
            {
                Polygon.StartSub();

                int HowManyConsumers = 2;

                  for (int i = 0; i < HowManyConsumers; i++)
                  {
                      Task.Factory.StartNew(Polygon.ConsumePackets);
                  }

            } catch(Exception e)
            {
                Console.WriteLine(e.Message);
            }

            Console.ReadKey();
        }

        public static async Task StartSub()
        {
            do
            {
                using (var socket = new ClientWebSocket())
                    try
                    {
                       // socket.Options.KeepAliveInterval = TimeSpan.Zero;
                        var Connection = "wss://socket.polygon.io/stocks";

                        await socket.ConnectAsync(new Uri(Connection), CancellationToken.None);

                        Console.WriteLine("Websocket opened to Polygon.");
                        await Send(socket, "{\"action\":\"auth\",\"params\":\""+ConfigurationManager.AppSettings["PolygonAPIToken"]+"\"}");
                        List<List<string>> batches = new List<List<string>>();

                        for (int i = 0; i < FeedCache.Tickers.Count(); i += 500)
                        {
                            var tempList = new List<string>();
                            tempList.AddRange(FeedCache.Tickers.Skip(i).Take(500));
                            batches.Add(tempList);
                        }

                        int bNum = 0;
                        string[] quoteStrings = new string[batches.Count()];

                        foreach (var tList in batches)
                        {
                            var tQuery = "";

                            tQuery = tQuery + "T." + string.Join(",T.", tList.ToArray());
                            tQuery = tQuery + ",A." + string.Join(",A.", tList.ToArray());
                            tQuery = tQuery + ",Q." + string.Join(",Q.", tList.ToArray());
                            quoteStrings[bNum] = tQuery;
                            bNum++;
                        }

                        for (int i = 0; i < quoteStrings.Count(); i++)
                        {
                            string SubscribeString = "{\"action\":\"subscribe\",\"params\":\"" + quoteStrings[i] + "\"}";
                            await Send(socket, SubscribeString);
                        }


                        await Receive(socket);

                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"ERROR - {ex.Message}");
                        Console.WriteLine(ex.ToString());
                    }
            } while (true);
        }

        static async Task Send(ClientWebSocket socket, string data)
        {
            var segment = new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
            await socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None);
        }


        static async Task Receive(ClientWebSocket socket)
        {

            do {
                WebSocketReceiveResult result;
            var buffer = new ArraySegment<byte>(new byte[2000]);
                using (var ms = new MemoryStream())
                {
                    do
                    {
                        result = await socket.ReceiveAsync(buffer, CancellationToken.None);
                        ms.Write(buffer.Array, buffer.Offset, result.Count);
                    } while (!result.EndOfMessage);


                    if (result.MessageType == WebSocketMessageType.Close)
                    {
                        await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closed in server by the client", CancellationToken.None);
                        Console.WriteLine("Socket disconnecting, trying to reconnect.");
                        await StartSub();
                    }
                    else
                    {
                     packets.Add(ms.ToArray());
                    }
                }
            } while (true);
        }

        public static async void ConsumePackets()
        {

            foreach (var buffer in packets.GetConsumingEnumerable())
            {

                using (var ms = new MemoryStream(buffer))
                {
                    ms.Seek(0, SeekOrigin.Begin);
                    using (var reader = new StreamReader(ms, Encoding.UTF8))
                    {

                        var data = await reader.ReadToEndAsync();
                        try
                        {
                            var j = JArray.Parse(data);


                        if (j != null)
                        {
                            string id = (string)j[0]["ev"];
                            switch (id)
                            {
                                case "T":
                                    AddOrUpdateTrade((string)j[0]["sym"], j);
                                    break;
                                case "Q":
                                    AddOrUpdateQuote((string)j[0]["sym"], j);
                                    break;
                                case "A":
                                    AddOrUpdateAgg((string)j[0]["sym"], j);
                                    break;
                            }
                        }
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e.ToString());
                        }
                    }

                }

            }
        }

        public static void AddOrUpdateTrade(string ticker, JArray data)
        {

            TradeFeed.AddOrUpdate(ticker, new TradeObj {
                LastPrice = (double)data[0]["p"],
                TradeCount = 1
            }, (key, existingVal) =>
            {
                return new TradeObj {
                    LastPrice = (double)data[0]["p"],
                    TradeCount = existingVal.TradeCount + 1,
                    PriceDirection = (double)data[0]["p"] < existingVal.LastPrice ? "D" : "U"
                };
            });

        }

        public static void AddOrUpdateAgg(string ticker, JArray data)
        {

            AggFeed.AddOrUpdate(ticker, new AggObj
            {
                TickVolume = (long)data[0]["v"],
                VolumeShare = (long)data[0]["av"],
                OpenPrice = (double)data[0]["op"],
                TickAverage = (double)data[0]["a"],
                VWAP = (double)data[0]["vw"],
                TickClosePrice = (double)data[0]["c"],
                TickHighPrice = (double)data[0]["h"],
                TickLowPrice = (double)data[0]["l"],
                TickOpenPrice = (double)data[0]["o"]
            }, (key, existingVal) =>
            {
                return new AggObj
                {
                    TickVolume = (long)data[0]["v"],
                    VolumeShare = (long)data[0]["av"],
                    OpenPrice = (double)data[0]["op"],
                    TickAverage = (double)data[0]["a"],
                    VWAP = (double)data[0]["vw"],
                    TickClosePrice = (double)data[0]["c"],
                    TickHighPrice = (double)data[0]["h"],
                    TickLowPrice = (double)data[0]["l"],
                    TickOpenPrice = (double)data[0]["o"]
                };
            });

        }

        public static void AddOrUpdateQuote(string ticker, JArray data)
        {

            QuoteFeed.AddOrUpdate(ticker, new QuoteObj
            {
                BidPrice = (double)data[0]["bp"],
                BidSize = (double)data[0]["bs"],
                AskPrice = (double)data[0]["ap"],
                AskSize = (double)data[0]["as"]
            }, (key, existingVal) =>
            {
                return new QuoteObj
                {
                    BidPrice = (double)data[0]["bp"],
                    BidSize = (double)data[0]["bs"],
                    AskPrice = (double)data[0]["ap"],
                    AskSize = (double)data[0]["as"]
                };
            });

        }

...