Flink: реализация «соединения» между DataStream и «набором правил» - PullRequest
0 голосов
/ 04 июня 2018

Какова рекомендация передовой практики для следующего варианта использования?Нам нужно сопоставить поток с набором «правил», которые по сути являются концепцией Flink DataSet.Обновления этого «набора правил» возможны, но не часты. Каждое событие потока должно проверяться на соответствие всем записям в «наборе правил», и каждое совпадение создает одно или несколько событий в потоке данных приемника. Числозаписи в наборе правил находятся в диапазоне 6 цифр.

В настоящее время мы просто загружаем правила в локальный список правил и используем flatMap поверх входящего DataStream. Внутри flatMap мы просто перебираем списоксравнивая каждое событие с каждым правилом.

Чтобы ускорить итерацию, мы также можем разбить список на несколько пакетов, по существу создав список списков и создав отдельный поток для итерации по каждому подсписку (используяФьючерсы на Java или Scala).

Вопросы:

  1. Есть ли лучший способ сделать такое объединение?
  2. Если нет, безопасно ли этодобавить дополнительный параллелизм, создавая новые потоки внутри каждой операции flatMap, в дополнение к тому, что уже делает Flink?

РЕДАКТИРОВАТЬ: Вот пример кодав соответствии с просьбой:

package wikiedits

import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object WikipediaEditEventProcessor {

  def main(args: Array[String])= {
    val see = StreamExecutionEnvironment.getExecutionEnvironment
    val edits = see.addSource(new WikipediaEditsSource())

    val ruleSets = Map[Int, List[String]](
      (1, List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")),
      (2, List("k", "l", "m", "n", "o", "p", "q", "r", "s", "t")),
      (3, List("u", "v", "w", "x", "y", "z", "0", "1", "2", "3"))
    )

    val result = edits.flatMap { edit =>
      ruleSets.map { ruleSet =>
        applyRuleSet(edit, ruleSet._2, ruleSet._1)
      }
    }
    see.execute
  }

  def applyRuleSet(event: WikipediaEditEvent, ruleSet: List[String], ruleSetId: Int): Future[List[String]] = {
    val title = event.getTitle
    Future(
      ruleSet.map {
        case rule if title.contains(rule) =>
          val result = s"Ruleset $ruleSetId: $rule -> exists in: $title"
          println(result) // this would be creating an output event instead
          result
        case rule =>
          val result = s"Ruleset $ruleSetId: $rule -> NO MATCH in: $title"
          println(result)
          result
      }
    )
  }
}

1 Ответ

0 голосов
/ 07 июня 2018

Каждое событие потока должно проверяться по всем записям в «наборе правил», и каждое совпадение создает одно или несколько событий в потоке данных приемника.Количество записей в наборе правил находится в диапазоне 6 цифр

Скажем, у вас есть K правил.Ваш подход хорош, если скорость ввода выше, чем время, затрачиваемое на обработку K правил для одного события.Иначе вам нужен какой-то подход, при котором вы можете параллельно обрабатывать эти K-правила.

Думайте о них как о K-платной стойке.Размещайте их по порядку, а не в одной большой комнате.Это упростило бы механизм потоковой передачи.

Другими словами, используйте простой цикл for, чтобы перебрать все правила и иметь отдельную flatMap для каждого правила.Таким образом, каждый из них не зависит друг от друга, поэтому может обрабатываться параллельно.В конце у вас будет K flatMaps для исполнения.Механизм будет использовать максимально возможный параллелизм с любой конфигурацией, которую вы предоставляете для выполнения.Этот подход ограничивает максимально возможный параллелизм до K. Но этого достаточно для большого числа правил.

дополнительный параллелизм путем создания новых потоков внутри каждой операции flatMap

Нена все рекомендуется.Оставь параллелизм моргать.Вы определяете единицу работы, которую вы хотите выполнить внутри вашей flatMap.

...