У меня есть простой поток, объявленный следующим образом:
Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(30), Files.list(rootDir).collect(Collectors.toList()))
.mapConcat(files -> files)
.log("scanning logs")
.via(logsFlow.create())
.via(kafkaFlow.create())
// .via(archiveFlow.create())
.runWith(Sink.ignore(), materializer)
.whenComplete((a, b) -> {
log.info("done");
});
С закомментированным archiveFlow все работает, как и ожидалось.Но когда я добавляю дополнительный поток, независимо от того, является ли он потоком архива или каким-то простым потоком, подобным этому:
.via(Flow.of(Path.class).map(path -> {
log.info("foo");
return path;
}))
поток завершается после первой отметки.Это почему?
2019-03-20 21:35:09.292 DEBUG 50089 --- [lt-dispatcher-2] a.kafka.internal.DefaultProducerStage : Stage completed
2019-03-20 21:35:09.294 DEBUG 50089 --- [lt-dispatcher-4] akka.stream.Materializer : [scanning logs] Downstream finished.
2019-03-20 21:35:09.296 INFO 50089 --- [onPool-worker-3] com.example.MyStream : done