Можно ли вызывать карту Flink и когда это необходимо (не активировано на входном потоке) - PullRequest
1 голос
/ 07 мая 2019

У меня есть карта, которая активируется, когда данные поступают через поток.

Я хочу вызвать эту карту, даже если данные не поступают.

Я переместил карту вфункция (бесконечный вызов функции), но тогда задание Flink никогда не запускается.И если я добавлю его в карту, он активируется только в том случае, если и когда данные поступят.

Идея состоит в том, чтобы иметь 1 карту в цикле infinte, проверяя некоторую общую переменную и другой поток flink, отслеживающий очередь kafka, если данные поступают в процессе, он изменяет общую переменную, которая каким-то образом воздействует на бесконечный цикл и продолжает.

Как я могу вызвать карту бесконечного цикла и запустить карты мерцания вместе?

Я попытался создать CollectionMap со случайными данными, чтобы активировать поток и карту, чтобы вызвать бесконечный цикл, но выходит почти сразу, хотя в карте есть условие while (true)

В IDE это работает, когдаЯ нажимаю его на Flink.local, он выходит почти сразу, не оставаясь в цикле

Поток 1

    val data_stream = env.addSource(myConsumer)
      .map(x => {process(x)})

Поток 2

    val elements = List[String]("Start")
    var read = env.fromElements(elements).map(x => ProcessData.infinteLoop())

Как мне вызвать бесконечный циклкарта и запустить карты Flink вместе?

1 Ответ

1 голос
/ 07 мая 2019

Вы можете создать window и trigger и вызывать карту каждые x секунды.

Вы можете найти документацию здесь: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

Пример:

import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

 val data_stream = env.addSource(myConsumer)
  .map(x => {process(x)})

val window: DataStream[String] = data_stream
  .windowAll(GlobalWindows.create())
  .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
  .apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})
...