Я создал сервис windows, который подписывается на около 10 000 биржевых тикеров в режиме реального времени, используя ClientWebSocket. Если я подписываюсь на 1000 тикеров, я получаю все точки данных, как и должен (получая несколько сотен сообщений в секунду), как только я получаю до 2000 тикеров, мне кажется, что я не получаю данные, которыми я должен быть, 10 000 (получая тысячи сообщений в секунду) это еще хуже. Я запустил сравнительные отчеты, и, похоже, я теряю до 60% пакетов. Я поговорил с Polygon (поставщиком данных в реальном времени) об этой проблеме, и они утверждают, что их Socket - пожарный шланг, и все, что должно go, исчезнет, и что ни один из их других клиентов не будет жаловаться. Таким образом, единственная логическая вещь здесь должна была бы предположить, что это мой код или какое-то ограничение. Может быть, это часть Task метода Receive? Возможно, у окон есть максимальное ограничение задачи, и я его превышаю.
Я также проверил это на выделенном высокопроизводительном сервере с 10-гигабитным соединением, поэтому это не похоже на ограничение соединения или оборудования.
Я также прошел мимо кеша BlockingCollection, и проблема все еще сохраняется.
Надеюсь, у кого-то есть понимание, спасибо!
Вот мой код:
public static ConcurrentDictionary<string, TradeObj> TradeFeed = new ConcurrentDictionary<string, TradeObj>();
public static ConcurrentDictionary<string, QuoteObj> QuoteFeed = new ConcurrentDictionary<string, QuoteObj>();
public static ConcurrentDictionary<string, AggObj> AggFeed = new ConcurrentDictionary<string, AggObj>();
public static BlockingCollection<byte[]> packets = new BlockingCollection<byte[]>();
private static void Start(string[] args)
{
try
{
Polygon.StartSub();
int HowManyConsumers = 2;
for (int i = 0; i < HowManyConsumers; i++)
{
Task.Factory.StartNew(Polygon.ConsumePackets);
}
} catch(Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadKey();
}
public static async Task StartSub()
{
do
{
using (var socket = new ClientWebSocket())
try
{
// socket.Options.KeepAliveInterval = TimeSpan.Zero;
var Connection = "wss://socket.polygon.io/stocks";
await socket.ConnectAsync(new Uri(Connection), CancellationToken.None);
Console.WriteLine("Websocket opened to Polygon.");
await Send(socket, "{\"action\":\"auth\",\"params\":\""+ConfigurationManager.AppSettings["PolygonAPIToken"]+"\"}");
List<List<string>> batches = new List<List<string>>();
for (int i = 0; i < FeedCache.Tickers.Count(); i += 500)
{
var tempList = new List<string>();
tempList.AddRange(FeedCache.Tickers.Skip(i).Take(500));
batches.Add(tempList);
}
int bNum = 0;
string[] quoteStrings = new string[batches.Count()];
foreach (var tList in batches)
{
var tQuery = "";
tQuery = tQuery + "T." + string.Join(",T.", tList.ToArray());
tQuery = tQuery + ",A." + string.Join(",A.", tList.ToArray());
tQuery = tQuery + ",Q." + string.Join(",Q.", tList.ToArray());
quoteStrings[bNum] = tQuery;
bNum++;
}
for (int i = 0; i < quoteStrings.Count(); i++)
{
string SubscribeString = "{\"action\":\"subscribe\",\"params\":\"" + quoteStrings[i] + "\"}";
await Send(socket, SubscribeString);
}
await Receive(socket);
}
catch (Exception ex)
{
Console.WriteLine($"ERROR - {ex.Message}");
Console.WriteLine(ex.ToString());
}
} while (true);
}
static async Task Send(ClientWebSocket socket, string data)
{
var segment = new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
await socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None);
}
static async Task Receive(ClientWebSocket socket)
{
do {
WebSocketReceiveResult result;
var buffer = new ArraySegment<byte>(new byte[2000]);
using (var ms = new MemoryStream())
{
do
{
result = await socket.ReceiveAsync(buffer, CancellationToken.None);
ms.Write(buffer.Array, buffer.Offset, result.Count);
} while (!result.EndOfMessage);
if (result.MessageType == WebSocketMessageType.Close)
{
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closed in server by the client", CancellationToken.None);
Console.WriteLine("Socket disconnecting, trying to reconnect.");
await StartSub();
}
else
{
packets.Add(ms.ToArray());
}
}
} while (true);
}
public static async void ConsumePackets()
{
foreach (var buffer in packets.GetConsumingEnumerable())
{
using (var ms = new MemoryStream(buffer))
{
ms.Seek(0, SeekOrigin.Begin);
using (var reader = new StreamReader(ms, Encoding.UTF8))
{
var data = await reader.ReadToEndAsync();
try
{
var j = JArray.Parse(data);
if (j != null)
{
string id = (string)j[0]["ev"];
switch (id)
{
case "T":
AddOrUpdateTrade((string)j[0]["sym"], j);
break;
case "Q":
AddOrUpdateQuote((string)j[0]["sym"], j);
break;
case "A":
AddOrUpdateAgg((string)j[0]["sym"], j);
break;
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}
}
}
public static void AddOrUpdateTrade(string ticker, JArray data)
{
TradeFeed.AddOrUpdate(ticker, new TradeObj {
LastPrice = (double)data[0]["p"],
TradeCount = 1
}, (key, existingVal) =>
{
return new TradeObj {
LastPrice = (double)data[0]["p"],
TradeCount = existingVal.TradeCount + 1,
PriceDirection = (double)data[0]["p"] < existingVal.LastPrice ? "D" : "U"
};
});
}
public static void AddOrUpdateAgg(string ticker, JArray data)
{
AggFeed.AddOrUpdate(ticker, new AggObj
{
TickVolume = (long)data[0]["v"],
VolumeShare = (long)data[0]["av"],
OpenPrice = (double)data[0]["op"],
TickAverage = (double)data[0]["a"],
VWAP = (double)data[0]["vw"],
TickClosePrice = (double)data[0]["c"],
TickHighPrice = (double)data[0]["h"],
TickLowPrice = (double)data[0]["l"],
TickOpenPrice = (double)data[0]["o"]
}, (key, existingVal) =>
{
return new AggObj
{
TickVolume = (long)data[0]["v"],
VolumeShare = (long)data[0]["av"],
OpenPrice = (double)data[0]["op"],
TickAverage = (double)data[0]["a"],
VWAP = (double)data[0]["vw"],
TickClosePrice = (double)data[0]["c"],
TickHighPrice = (double)data[0]["h"],
TickLowPrice = (double)data[0]["l"],
TickOpenPrice = (double)data[0]["o"]
};
});
}
public static void AddOrUpdateQuote(string ticker, JArray data)
{
QuoteFeed.AddOrUpdate(ticker, new QuoteObj
{
BidPrice = (double)data[0]["bp"],
BidSize = (double)data[0]["bs"],
AskPrice = (double)data[0]["ap"],
AskSize = (double)data[0]["as"]
}, (key, existingVal) =>
{
return new QuoteObj
{
BidPrice = (double)data[0]["bp"],
BidSize = (double)data[0]["bs"],
AskPrice = (double)data[0]["ap"],
AskSize = (double)data[0]["as"]
};
});
}