С Akka Streams как узнать, когда источник завершен? - PullRequest
0 голосов
/ 02 февраля 2019

У меня есть Alpakka Elasticsearch Sink, который я храню между запросами.Когда я получаю запрос, я создаю Source из HTTP-запроса и превращаю его в Source Elasticsearch WriteMessage s, а затем запускаю его с mySource.runWith(theElasticseachSink).

  1. Как получить уведомление о завершении источника?Ничего полезного, похоже, не материализовано.
  2. Будет ли завершение источника передаваться в приемник, а это означает, что мне нужно каждый раз создавать новый?
  3. Если да к вышеприведенному, разве их развязать?каким-то образом с помощью Flow.fromSourceAndSink help?

Моя цель - узнать, когда загрузка HTTP завершена (включая via s, через которую она проходит), и иметь возможность повторно использовать приемник.

Ответы [ 2 ]

0 голосов
/ 03 февраля 2019

Оказывается, библиотека Elasticsearch Alpakka также поддерживает формы потоков, поэтому я могу заставить свой источник пройти через это и запустить его через любой приемник, который материализует будущее.Sink.foreach отлично работает здесь для тестирования, например, как в https://github.com/danellis/akka-es-test.

Flow fromFunction { product: Product =>
    WriteMessage.createUpsertMessage(product.id, product.attributes)
} via ElasticsearchFlow.create[Map[String, String]](index, "_doc")

для определения es.flow, а затем

val graph = response.entity.withSizeLimit(MaxFeedSize).dataBytes
    .via(scanner)
    .via(CsvToMap.toMap(Utf8))
    .map(attrs => Product(attrs("id").decodeString(Utf8), attrs.mapValues(_.decodeString(Utf8))))
    .via(es.flow)

val futureDone = graph.runWith(Sink.foreach(println))

futureDone onComplete {
    case Success(_) => println("Done")
    case Failure(e) => println(e)
}
0 голосов
/ 02 февраля 2019

вы можете обойти отдельные части потока по своему усмотрению, вы можете даже обойти весь граф executetabe (это неизменяемые).Вызов run () материализует поток, но не меняет ваш граф или его части.

1) Поскольку вы хотите знать, когда HttpDownload прошел поток, почему бы не использовать полные графы Future [Done]?Предполагая, что ваш вызовasticsearch является асинхронным, это должно быть равным, так как ваш приемник просто выполняет вызов и не ждет.Вы также можете использовать Source.queue (https://doc.akka.io/docs/akka/2.5/stream/operators/Source/queue.html)) и просто добавить свои сообщения в очередь, которая затем повторно использует определенный граф, чтобы вы могли добавлять новые сообщения, когда требуется обработка. Этот также материализует SourceQueueWithComplete, позволяя вам остановитьпоток. Кроме того, повторно используйте приемник везде, где это необходимо, без необходимости ожидания другого потока, использующего его.

2) Как описано выше: нет, вам не нужно создавать экземпляр приемника несколько раз.

С наилучшими пожеланиями, Анди

...