Какова рекомендация передовой практики для следующего варианта использования?Нам нужно сопоставить поток с набором «правил», которые по сути являются концепцией Flink DataSet.Обновления этого «набора правил» возможны, но не часты. Каждое событие потока должно проверяться на соответствие всем записям в «наборе правил», и каждое совпадение создает одно или несколько событий в потоке данных приемника. Числозаписи в наборе правил находятся в диапазоне 6 цифр.
В настоящее время мы просто загружаем правила в локальный список правил и используем flatMap поверх входящего DataStream. Внутри flatMap мы просто перебираем списоксравнивая каждое событие с каждым правилом.
Чтобы ускорить итерацию, мы также можем разбить список на несколько пакетов, по существу создав список списков и создав отдельный поток для итерации по каждому подсписку (используяФьючерсы на Java или Scala).
Вопросы:
- Есть ли лучший способ сделать такое объединение?
- Если нет, безопасно ли этодобавить дополнительный параллелизм, создавая новые потоки внутри каждой операции flatMap, в дополнение к тому, что уже делает Flink?
РЕДАКТИРОВАТЬ: Вот пример кодав соответствии с просьбой:
package wikiedits
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object WikipediaEditEventProcessor {
def main(args: Array[String])= {
val see = StreamExecutionEnvironment.getExecutionEnvironment
val edits = see.addSource(new WikipediaEditsSource())
val ruleSets = Map[Int, List[String]](
(1, List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")),
(2, List("k", "l", "m", "n", "o", "p", "q", "r", "s", "t")),
(3, List("u", "v", "w", "x", "y", "z", "0", "1", "2", "3"))
)
val result = edits.flatMap { edit =>
ruleSets.map { ruleSet =>
applyRuleSet(edit, ruleSet._2, ruleSet._1)
}
}
see.execute
}
def applyRuleSet(event: WikipediaEditEvent, ruleSet: List[String], ruleSetId: Int): Future[List[String]] = {
val title = event.getTitle
Future(
ruleSet.map {
case rule if title.contains(rule) =>
val result = s"Ruleset $ruleSetId: $rule -> exists in: $title"
println(result) // this would be creating an output event instead
result
case rule =>
val result = s"Ruleset $ruleSetId: $rule -> NO MATCH in: $title"
println(result)
result
}
)
}
}