Как связать два API C #, которые ожидают, что вы предоставите поток? - PullRequest
0 голосов
/ 10 октября 2018

Я работаю с двумя потоковыми API C #, один из которых является источником данных , а другой является приемником данных .

На самом деле ни один APIвыставляет объект потока;оба ожидают, что вы передадите в них поток, и они обрабатывают запись / чтение из потока.

Есть ли способ связать эти API вместе, чтобы вывод источника направлялся в приемник без необходимости буферизоватьвесь источник в MemoryStream?Это очень чувствительное к оперативной памяти приложение.

Вот пример, в котором используется подход MemoryStream, которого я стараюсь избегать, поскольку он буферизирует весь поток в ОЗУ перед записью его в S3:

using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
    // This destructor finishes the file and transferUtil closes 
    // the stream, so we need this weird using nesting to keep everyone happy.
    using (var parquetWriter = new ParquetWriter(schema, buffer)) 
        using (var rowGroupWriter = parquetWriter.CreateRowGroup())
        {
            rowGroupWriter.WriteColumn(...);
            ...
        }
    transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}

1 Ответ

0 голосов
/ 13 октября 2018

Вы ищете поток, который может быть передан как источнику данных, так и приемнику, и который может «передавать» данные между двумя асинхронно.Существует ряд возможных решений, и я мог бы рассмотреть шаблон производителя-потребителя вокруг BlockingCollection.

В последнее время добавление типов System.IO.Pipelines, Span и Memory действительно сосредоточилось на высокопроизводительном вводе-выводе.и я думаю, что это было бы хорошо здесь.Класс Pipe со связанными с ним Reader и Writer может автоматически обрабатывать управление потоком, обратное давление и ввод-вывод между собой, используя все новые типы, связанные с Span и Memory.

Я загрузил Gist в PipeStream, который даст вам пользовательский поток с внутренней реализацией Pipe, который вы можете передать обоим классам API.Все, что записывается в метод WriteAsync (или Write), будет доступно для метода ReadAsync (или Read) без дополнительных выделений в байтах [] или MemoryStream

В вашем случае вы просто подставили бы MemoryStream для этогоновый класс и он должен работать из коробки.У меня не работает полный тест S3, но чтение непосредственно из потока Parquet и вывод его в окно консоли показывает, что он работает асинхронно.

// Create some very badly 'mocked' data
var idColumn = new DataColumn(
    new DataField<int>("id"),
    Enumerable.Range(0, 10000).Select(i => i).ToArray());

var cityColumn = new DataColumn(
    new DataField<string>("city"),
    Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());

var schema = new Schema(idColumn.Field, cityColumn.Field);

using (var pipeStream = new PipeStream())
{
    var buffer = new byte[4096];
    int read = 0;

    var readTask = Task.Run(async () =>
    {
        //transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread 
        while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
        {
            var incoming = Encoding.ASCII.GetString(buffer, 0, read);
            Console.WriteLine(incoming);
            // await Task.Delay(5000); uncomment this to simulate very slow consumer
        }
    });

    using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
    using (var rowGroupWriter = parquetWriter.CreateRowGroup())
    {
        rowGroupWriter.WriteColumn(idColumn);  // Step through both these statements to see data read before the parquetWriter completes
        rowGroupWriter.WriteColumn(cityColumn);
    }       
}

Реализация еще не полностью завершена, но я думаю, что это показываетхороший подход.В консоли 'readTask' вы можете откомментировать Task.Delay для имитации медленного чтения (TransferUtil), и вы должны увидеть, что канал автоматически душит задачу записи.

Вам необходимо использовать C # 7.2 или новее(VS 2017 -> Свойства проекта -> Сборка -> Дополнительно -> Языковая версия) для одного из методов расширения Span, но он должен быть совместим с любым .Net Framework.Вам может понадобиться Nuget Package

Поток доступен для чтения и записи (очевидно!), Но не для поиска, который должен работать для вас в этом сценарии, но не будет работать при чтении из Parquet SDK, которыйтребует поиска потоков.

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...