У меня есть очень простой поток для обработки файла - просто источник FileIO.FromFile
, сквозной поток и приемник Last
:
var source = FileIO.FromFile(new FileInfo(fileName));
var flow = Flow.FromFunction<ByteString, ByteString>(x =>
{
Log.Info(x.Count.ToString());
return x;
});
var sink = Sink.Last<ByteString>();
var runnable = source.Via(flow).ToMaterialized(sink, Keep.Right);
var result = runnable.Run(Context.Materializer()).Result;
Поток работает так, как ожидалось:регистратор высвобождает размеры строки байтов, пока источник файла не будет полностью исчерпан.
Теперь я изменил приемник на использование Первый вместо Последний
var source = FileIO.FromFile(new FileInfo(fileName));
var flow = Flow.FromFunction<ByteString, ByteString>(x =>
{
Log.Info(x.Count.ToString());
return x;
});
var sink = Sink.First<ByteString>();
var runnable = source.Via(flow).ToMaterialized(sink, Keep.Right);
var result = runnable.Run(Context.Materializer()).Result;
Документация по «первым» состояниям «отменяет после получения одного элемента», что, как я предполагал, означало, что приемник сигнализирует об отмене вверх по потоку, что приведет к закрытию источника.Но когда этот поток запущен, происходят две вещи.
1) Я получаю следующее сообщение журнала отладки
[DEBUG][17/05/2018 13:55:16][Thread 0004][akka://Demo/user/DATReader/StreamSupervisor-0/Flow-0-1-fileSource] Unhandled message from akka://Demo/user/DATReader/StreamSupervisor-0/Flow-0-0-unknown-operation : Akka.Streams.Actors.Cancel
и
2) Файл заблокирован, поэтомулюбая дальнейшая попытка прочитать его не удалась с исключением из-за отсутствия доступа.
Я также пытался использовать Take(1)
в источнике, но тот же эффект виден.
Мой вопрос: как мнечитать только первые n байтов из файла и корректно завершать поток, чтобы снять блокировку (полученную с помощью FileIO.FromFile)?