IO файла Akka.Net - обрабатывать только первые n байтов файла - PullRequest
0 голосов
/ 17 мая 2018

У меня есть очень простой поток для обработки файла - просто источник FileIO.FromFile, сквозной поток и приемник Last:

    var source = FileIO.FromFile(new FileInfo(fileName));

    var flow = Flow.FromFunction<ByteString, ByteString>(x =>
    {
       Log.Info(x.Count.ToString());
       return x;
    });

    var sink = Sink.Last<ByteString>();

    var runnable = source.Via(flow).ToMaterialized(sink, Keep.Right);
    var result = runnable.Run(Context.Materializer()).Result;

Поток работает так, как ожидалось:регистратор высвобождает размеры строки байтов, пока источник файла не будет полностью исчерпан.

Теперь я изменил приемник на использование Первый вместо Последний

    var source = FileIO.FromFile(new FileInfo(fileName));

    var flow = Flow.FromFunction<ByteString, ByteString>(x =>
    {
       Log.Info(x.Count.ToString());
       return x;
    });

    var sink = Sink.First<ByteString>();

    var runnable = source.Via(flow).ToMaterialized(sink, Keep.Right);
    var result = runnable.Run(Context.Materializer()).Result;

Документация по «первым» состояниям «отменяет после получения одного элемента», что, как я предполагал, означало, что приемник сигнализирует об отмене вверх по потоку, что приведет к закрытию источника.Но когда этот поток запущен, происходят две вещи.

1) Я получаю следующее сообщение журнала отладки

[DEBUG][17/05/2018 13:55:16][Thread 0004][akka://Demo/user/DATReader/StreamSupervisor-0/Flow-0-1-fileSource] Unhandled message from akka://Demo/user/DATReader/StreamSupervisor-0/Flow-0-0-unknown-operation : Akka.Streams.Actors.Cancel

и

2) Файл заблокирован, поэтомулюбая дальнейшая попытка прочитать его не удалась с исключением из-за отсутствия доступа.

Я также пытался использовать Take(1) в источнике, но тот же эффект виден.

Мой вопрос: как мнечитать только первые n байтов из файла и корректно завершать поток, чтобы снять блокировку (полученную с помощью FileIO.FromFile)?

1 Ответ

0 голосов
/ 19 октября 2018

В моем случае такой ошибки нет. Попробуйте использовать sharedKillSwitch для контроля завершения. Документация здесь о Динамическая обработка потока

            var sharedKillSwitch = KillSwitches.Shared("my-kill-switch");
            var source = FileIO.FromFile(new FileInfo(fileName), chunkSize: 1024)
                    .Via(sharedKillSwitch.Flow<ByteString>());

            var sink = Sink.First<ByteString>();

            var runnable = source.Select(x => {                 
                Log.Info(x.Count.ToString());
                sharedKillSwitch.Shutdown();
                return x;
            })
            .ToMaterialized(sink, Keep.Right);

            var result = runnable.Run(materializer).Result;
...