Асинхронная десериализация списка с использованием System.Text.Json - PullRequest
7 голосов
/ 26 октября 2019

Допустим, я запрашиваю большой файл JSON, который содержит список многих объектов. Я не хочу, чтобы они все время оставались в памяти, но я бы предпочел прочитать и обработать их один за другим. Поэтому мне нужно превратить асинхронный поток System.IO.Stream в IAsyncEnumerable<T>. Как мне использовать новый System.Text.Json API для этого?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}

Ответы [ 4 ]

2 голосов
/ 29 октября 2019

Да, действительно потоковый JSON (de) сериализатор был бы хорошим улучшением производительности во многих местах.

К сожалению, System.Text.Json не делает этого. Я не уверен, будет ли это в будущем - я надеюсь на это! По-настоящему потоковая десериализация JSON оказывается довольно сложной задачей.

Вы можете проверить, поддерживает ли ее чрезвычайно быстрый Utf8Json , возможно.

Однако может быть пользовательскийрешение для вашей конкретной ситуации, поскольку ваши требования, похоже, ограничивают сложность.

Идея состоит в том, чтобы вручную считывать по одному элементу из массива за раз. Мы используем тот факт, что каждый элемент списка сам по себе является допустимым объектом JSON.

Вы можете вручную пропустить [ (для первого элемента) или , (за каждый следующий пункт). Тогда я думаю, что вам лучше всего использовать .NET Core Utf8JsonReader, чтобы определить, где заканчивается текущий объект, и передать отсканированные байты в JsonDeserializer.

Таким образом, вы только слегка буферизуете за одинобъект за раз.

И так как мы говорим о производительности, вы можете получить ввод от PipeReader, пока вы на нем. : -)

0 голосов
/ 30 октября 2019

TL; DR Это не тривиально


Похоже, кто-то уже отправил полный код для Utf8JsonStreamReader структуры, котораячитает буферы из потока и передает их в Utf8JsonRreader, что позволяет легко десериализовать с помощью JsonSerializer.Deserialize<T>(ref newJsonReader, options);. Код тоже не тривиален. Связанный вопрос здесь , а ответ здесь .

Этого недостаточно, хотя HttpClient.GetAsync вернется только после получения всего ответа, по существу буферизуя всев памяти.

Чтобы избежать этого, HttpClient.GetAsync (string, HttpCompletionOption) следует использовать с HttpCompletionOption.ResponseHeadersRead.

Цикл десериализации должен также проверять токен отмены, и либовыйти или бросить, если это сигнализируется. В противном случае цикл будет продолжаться до тех пор, пока весь поток не будет получен и обработан.

Этот код основан на примере соответствующего ответа и использует HttpCompletionOption.ResponseHeadersRead и проверяет токен отмены. Он может анализировать строки JSON, которые содержат правильный массив элементов, например:

[{"prop1":123},{"prop1":234}]

Первый вызов jsonStreamReader.Read() перемещается в начало массива, а второй - в начало первого объекта. Сам цикл завершается, когда обнаруживается конец массива (]).

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

JSON-фрагменты, потоковая передача JSON AKA aka ... *

В сценариях потоковой передачи событий или журналирования достаточно часто добавлять отдельные объекты JSON в файл, по одному элементу в строке, например:

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

Это не действительный документ JSON , а отдельныйфрагменты действительны. Это имеет несколько преимуществ для больших данных / сценариев с высокой степенью одновременности. Добавление нового события требует только добавления новой строки в файл, а не анализа и перекомпоновки всего файла. Обработка , особенно параллельная обработка проще по двум причинам:

  • Отдельные элементы могут быть получены по одному за раз, просто читая одну строку из потока.
  • Входной файл может быть легко разбит на части и разбит по границам строк, передавая каждую часть отдельному рабочему процессу, например, в кластере Hadoop, или просто различным потокам в приложении: вычисление точек разделения, например, путем делениядлина по количеству работников, затем ищите первую новую строку. Передайте все до этой точки отдельному рабочему.

Использование StreamReader

Способ выделения-y для этого будет использовать TextReader,читать по одной строке за раз и анализировать ее с помощью JsonSerializer.Deserialize :

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

Это намного проще, чем код, десериализующий правильный массив. Есть две проблемы:

  • ReadLineAsync не принимает токен отмены
  • Каждая итерация выделяет новую строку, одну из вещей, которую мы хотели избегать с помощью System.Text.Json

Этого может быть достаточно, хотя , поскольку попытка создать буферы ReadOnlySpan<Byte>, необходимые для JsonSerializer.Deserialize, не тривиальна.

Pipelines и SequenceReader

Чтобы избежать размещения, нам нужно получить ReadOnlySpan<byte> из потока. Для этого необходимо использовать каналы System.IO.Pipeline и структуру SequenceReader . Стив Гордона * Введение в SequenceReader объясняет, как этот класс может использоваться для чтения данных из потока с использованием разделителей.

К сожалению, SequenceReader является структурой ref, что означает, что ее нельзя использовать в асинхронных или локальных методах. Вот почему Стив Гордон в своей статье создает метод

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

для чтения элементов из формы ReadOnlySequence и возврата конечной позиции, чтобы PipeReader мог возобновить работу с нее. К сожалению мы хотим вернуть IEnumerable или IAsyncEnumerable, а методы итератора также не любят параметры in или out.

Мы могли бы собрать десериализованные элементы в списке или очереди и вернуть их как единый результат, но это все равно выделило бы списки, буферы или узлы и пришлось бы ждать десериализации всех элементов в буфере, прежде чем возвращать:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

Нам нужно что-то , которое действует как перечислимое, не требуя метода итератора, работает с асинхронностью и не буферизирует все как есть.

Добавление каналов для создания IAsyncEnumerable

ChannelReader.ReadAllAsync возвращает IAsyncEnumerable. Мы можем вернуть ChannelReader из методов, которые не могли работать как итераторы и по-прежнему генерировать поток элементов без кэширования.

Адаптируя код Стива Гордона для использования каналов, мы получаем методы ReadItems (ChannelWriter ...) и ReadLastItem. Первый, читает по одному элементу за раз, до новой строки, используя ReadOnlySpan<byte> itemBytes. Это может быть использовано JsonSerializer.Deserialize. Если ReadItems не может найти разделитель, он возвращает свою позицию, чтобы PipelineReader мог извлечь следующий фрагмент из потока.

Когда мы достигаем последнего фрагмента и другого разделителя нет, ReadLastItem` читает оставшиеся байты и десериализует их.

Код почти идентичен коду Стива Гордона. Вместо записи в консоль, мы пишем в ChannelWriter.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

Метод DeserializeToChannel<T> создает программу чтения конвейера поверх потока, создает канал и запускает рабочую задачу, которая анализирует чанки и выталкивает их в канал:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync() можно использовать для потребления всех предметов через IAsyncEnumerable<T>:

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
0 голосов
/ 29 октября 2019

Такое чувство, что вам нужно реализовать собственный потоковый ридер. Вы должны читать байты один за другим и останавливаться, как только определение объекта завершено. Это действительно довольно низкоуровневый. Таким образом, вы НЕ БУДЕТЕ загружать весь файл в ОЗУ, а просто возьмете на себя роль, с которой имеете дело. Кажется ли это ответом?

0 голосов
/ 29 октября 2019

Может быть, вы могли бы использовать Newtonsoft.Json сериализатор? https://www.newtonsoft.com/json/help/html/Performance.htm

Особенно см. Раздел:

Оптимизировать использование памяти

Редактировать

Вы можете попробовать десериализовать значения из JsonTextReader, например

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...