Есть ли способ выполнить какой-либо конкретный поток после каждого другого потока без необходимости его явно вставлять - PullRequest
0 голосов
/ 05 августа 2020

У меня есть несколько потоков (для обработки сообщения, полученного из очереди) для выполнения, и после каждого потока мне нужно проверять, есть ли какая-либо ошибка в предыдущем потоке, если да, то я отфильтровываю сообщение в процессе, в противном случае перехожу к следующему flow.

В настоящее время я должен включать этот поток обработчика ошибок явно после каждого другого потока. Есть ли способ сделать это с помощью некоторых функций, при которых этот поток ошибок можно настроить для запуска после каждого другого потока. Или любой другой лучший способ сделать это?

Пример:

поток 1 -> Проверить сообщение, если ошибка, пометить сообщение как ошибку

поток ошибок -> проверить сообщение помечена как ошибка, если да, фильтруйте, в противном случае продолжайте.

поток 2 -> сохранить сообщение в базе данных, отметить в случае ошибки.

поток ошибок -> проверить, помечено ли сообщение как ошибка, если да, фильтр, иначе продолжить

поток 3 -> и т. д.

Или есть способ обернуть (поток 1 + поток ошибок), (поток 2 -> поток ошибок)?

Ответы [ 2 ]

0 голосов
/ 05 августа 2020

Вы также можете использовать Merge

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val merge = builder.add(Merge[Int](3))

  val flow1 = ...
  val flow2 = ...
  val flow3 = ...

  flow1 ~> merge
  flow2 ~> merge
  flow3 ~> merge

  ClosedShape
})

Не уверен, соответствует ли это вашим потребностям, просто показываю альтернативу.

0 голосов
/ 05 августа 2020

Я не уверен, что это именно то, что вы просили, но у меня есть какое-то решение. Что можно сделать, так это создать все потоки, например, мы можем посмотреть:

val flows = Seq (
  Flow.fromFunction[Int, Int](x => { println(s"flow1: Received $x"); x * 2 }),
  Flow.fromFunction[Int, Int](x => { println(s"flow2: Received $x"); x + 1}),
  Flow.fromFunction[Int, Int](x => { println(s"flow3: Received $x"); x * x})
)

Затем нам нужно добавить к каждому из существующих потоков обработку ошибок. Итак, давайте определим его и добавим к каждому из элементов:

val errorHandling = Flow[Int].filter(_ % 2 == 0)
val errorsHandledFlows = flows.map(flow => flow.via(errorHandling))

Теперь нам нужна вспомогательная функция, которая соединит все наши новые потоки:

def connectFlows(errorsHandledFlows: Seq[Flow[Int, Int, _]]): Flow[Int, Int, _] = {
  errorsHandledFlows match {
    case Seq() => Flow[Int] // This line is actually redundant, But I don't want to have an unexhausted pattern matching
    case Seq(singleFlow) => singleFlow
    case head :: tail => head.via(connectFlows(tail))
  }
}

И теперь нам нужно выполнить все вместе, например:

Source(1 to 4).via(connectFlows(errorsHandledFlows)).to(Sink.foreach(println)).run()

Будет выдан результат:

flow1: Received 1
flow2: Received 2
flow1: Received 2
flow2: Received 4
flow1: Received 3
flow2: Received 6
flow1: Received 4
flow2: Received 8

Как вы можете заметить, мы фильтруем нечетные числа. Следовательно, первый поток получает все числа от 1 до 4. Второй поток получил 2,4,6,8 (первый поток умножил значения на 2), а последний не получил никакого потока, потому что второй поток делает все значений нечетное.

...