Вы ищете поток, который может быть передан как источнику данных, так и приемнику, и который может «передавать» данные между двумя асинхронно.Существует ряд возможных решений, и я мог бы рассмотреть шаблон производителя-потребителя вокруг BlockingCollection.
В последнее время добавление типов System.IO.Pipelines, Span и Memory действительно сосредоточилось на высокопроизводительном вводе-выводе.и я думаю, что это было бы хорошо здесь.Класс Pipe со связанными с ним Reader и Writer может автоматически обрабатывать управление потоком, обратное давление и ввод-вывод между собой, используя все новые типы, связанные с Span и Memory.
Я загрузил Gist в PipeStream, который даст вам пользовательский поток с внутренней реализацией Pipe, который вы можете передать обоим классам API.Все, что записывается в метод WriteAsync (или Write), будет доступно для метода ReadAsync (или Read) без дополнительных выделений в байтах [] или MemoryStream
В вашем случае вы просто подставили бы MemoryStream для этогоновый класс и он должен работать из коробки.У меня не работает полный тест S3, но чтение непосредственно из потока Parquet и вывод его в окно консоли показывает, что он работает асинхронно.
// Create some very badly 'mocked' data
var idColumn = new DataColumn(
new DataField<int>("id"),
Enumerable.Range(0, 10000).Select(i => i).ToArray());
var cityColumn = new DataColumn(
new DataField<string>("city"),
Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());
var schema = new Schema(idColumn.Field, cityColumn.Field);
using (var pipeStream = new PipeStream())
{
var buffer = new byte[4096];
int read = 0;
var readTask = Task.Run(async () =>
{
//transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread
while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var incoming = Encoding.ASCII.GetString(buffer, 0, read);
Console.WriteLine(incoming);
// await Task.Delay(5000); uncomment this to simulate very slow consumer
}
});
using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
using (var rowGroupWriter = parquetWriter.CreateRowGroup())
{
rowGroupWriter.WriteColumn(idColumn); // Step through both these statements to see data read before the parquetWriter completes
rowGroupWriter.WriteColumn(cityColumn);
}
}
Реализация еще не полностью завершена, но я думаю, что это показываетхороший подход.В консоли 'readTask' вы можете откомментировать Task.Delay для имитации медленного чтения (TransferUtil), и вы должны увидеть, что канал автоматически душит задачу записи.
Вам необходимо использовать C # 7.2 или новее(VS 2017 -> Свойства проекта -> Сборка -> Дополнительно -> Языковая версия) для одного из методов расширения Span, но он должен быть совместим с любым .Net Framework.Вам может понадобиться Nuget Package
Поток доступен для чтения и записи (очевидно!), Но не для поиска, который должен работать для вас в этом сценарии, но не будет работать при чтении из Parquet SDK, которыйтребует поиска потоков.
Надеюсь, это поможет