TL; DR Это не тривиально
Похоже, кто-то уже отправил полный код для Utf8JsonStreamReader
структуры, котораячитает буферы из потока и передает их в Utf8JsonRreader, что позволяет легко десериализовать с помощью JsonSerializer.Deserialize<T>(ref newJsonReader, options);
. Код тоже не тривиален. Связанный вопрос здесь , а ответ здесь .
Этого недостаточно, хотя HttpClient.GetAsync
вернется только после получения всего ответа, по существу буферизуя всев памяти.
Чтобы избежать этого, HttpClient.GetAsync (string, HttpCompletionOption) следует использовать с HttpCompletionOption.ResponseHeadersRead
.
Цикл десериализации должен также проверять токен отмены, и либовыйти или бросить, если это сигнализируется. В противном случае цикл будет продолжаться до тех пор, пока весь поток не будет получен и обработан.
Этот код основан на примере соответствующего ответа и использует HttpCompletionOption.ResponseHeadersRead
и проверяет токен отмены. Он может анализировать строки JSON, которые содержат правильный массив элементов, например:
[{"prop1":123},{"prop1":234}]
Первый вызов jsonStreamReader.Read()
перемещается в начало массива, а второй - в начало первого объекта. Сам цикл завершается, когда обнаруживается конец массива (]
).
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);
jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object
while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}
// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;
// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}
JSON-фрагменты, потоковая передача JSON AKA aka ... *
В сценариях потоковой передачи событий или журналирования достаточно часто добавлять отдельные объекты JSON в файл, по одному элементу в строке, например:
{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}
Это не действительный документ JSON , а отдельныйфрагменты действительны. Это имеет несколько преимуществ для больших данных / сценариев с высокой степенью одновременности. Добавление нового события требует только добавления новой строки в файл, а не анализа и перекомпоновки всего файла. Обработка , особенно параллельная обработка проще по двум причинам:
- Отдельные элементы могут быть получены по одному за раз, просто читая одну строку из потока.
- Входной файл может быть легко разбит на части и разбит по границам строк, передавая каждую часть отдельному рабочему процессу, например, в кластере Hadoop, или просто различным потокам в приложении: вычисление точек разделения, например, путем делениядлина по количеству работников, затем ищите первую новую строку. Передайте все до этой точки отдельному рабочему.
Использование StreamReader
Способ выделения-y для этого будет использовать TextReader,читать по одной строке за раз и анализировать ее с помощью JsonSerializer.Deserialize :
using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken
while((line=await reader.ReadLineAsync()) != null)
{
var item=JsonSerializer.Deserialize<T>(line);
yield return item;
if(cancellationToken.IsCancellationRequested)
{
return;
}
}
Это намного проще, чем код, десериализующий правильный массив. Есть две проблемы:
ReadLineAsync
не принимает токен отмены - Каждая итерация выделяет новую строку, одну из вещей, которую мы хотели избегать с помощью System.Text.Json
Этого может быть достаточно, хотя , поскольку попытка создать буферы ReadOnlySpan<Byte>
, необходимые для JsonSerializer.Deserialize, не тривиальна.
Pipelines и SequenceReader
Чтобы избежать размещения, нам нужно получить ReadOnlySpan<byte>
из потока. Для этого необходимо использовать каналы System.IO.Pipeline и структуру SequenceReader . Стив Гордона * Введение в SequenceReader объясняет, как этот класс может использоваться для чтения данных из потока с использованием разделителей.
К сожалению, SequenceReader
является структурой ref, что означает, что ее нельзя использовать в асинхронных или локальных методах. Вот почему Стив Гордон в своей статье создает метод
private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
для чтения элементов из формы ReadOnlySequence и возврата конечной позиции, чтобы PipeReader мог возобновить работу с нее. К сожалению мы хотим вернуть IEnumerable или IAsyncEnumerable, а методы итератора также не любят параметры in
или out
.
Мы могли бы собрать десериализованные элементы в списке или очереди и вернуть их как единый результат, но это все равно выделило бы списки, буферы или узлы и пришлось бы ждать десериализации всех элементов в буфере, прежде чем возвращать:
private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
Нам нужно что-то , которое действует как перечислимое, не требуя метода итератора, работает с асинхронностью и не буферизирует все как есть.
Добавление каналов для создания IAsyncEnumerable
ChannelReader.ReadAllAsync возвращает IAsyncEnumerable. Мы можем вернуть ChannelReader из методов, которые не могли работать как итераторы и по-прежнему генерировать поток элементов без кэширования.
Адаптируя код Стива Гордона для использования каналов, мы получаем методы ReadItems (ChannelWriter ...) и ReadLastItem
. Первый, читает по одному элементу за раз, до новой строки, используя ReadOnlySpan<byte> itemBytes
. Это может быть использовано JsonSerializer.Deserialize
. Если ReadItems
не может найти разделитель, он возвращает свою позицию, чтобы PipelineReader мог извлечь следующий фрагмент из потока.
Когда мы достигаем последнего фрагмента и другого разделителя нет, ReadLastItem` читает оставшиеся байты и десериализует их.
Код почти идентичен коду Стива Гордона. Вместо записи в консоль, мы пишем в ChannelWriter.
private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;
private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
bool isCompleted, CancellationToken token)
{
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
{
var item=JsonSerializer.Deserialize<T>(itemBytes);
writer.TryWrite(item);
}
else if (isCompleted) // read last item which has no final delimiter
{
var item = ReadLastItem<T>(sequence.Slice(reader.Position));
writer.TryWrite(item);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
}
}
return reader.Position;
}
private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
else // otherwise we'll rent an array to use as the buffer
{
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
Метод DeserializeToChannel<T>
создает программу чтения конвейера поверх потока, создает канал и запускает рабочую задачу, которая анализирует чанки и выталкивает их в канал:
ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
var pipeReader = PipeReader.Create(stream);
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
while (!token.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(token); // read from the pipe
var buffer = result.Buffer;
var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer
if (result.IsCompleted)
break; // exit if we've read everything from the pipe
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}
pipeReader.Complete();
},token)
.ContinueWith(t=>{
pipeReader.Complete();
writer.TryComplete(t.Exception);
});
return channel.Reader;
}
ChannelReader.ReceiveAllAsync()
можно использовать для потребления всех предметов через IAsyncEnumerable<T>
:
var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
//Do something with it
}