Как получить данные из нескольких тем в одном месте для обработки? - PullRequest
0 голосов
/ 14 июня 2019

У меня есть требование, при котором я должен получать сообщения из 3 тем kafka в виде потоковых данных, а затем генерировать результат на основе объединений между этими 3 данными тем.Пожалуйста, предложите мне хороший подход, используя Direct Stream для Scala.Спасибо

1 Ответ

1 голос
/ 18 июня 2019

Если данные в разных темах одинаковы и у вас одинаковая логика обработки при использовании данных, вы можете использовать разные темы в одном потоке и выполнять агрегацию. Если логика обработки отличается для разных тем, задайте concurrentThreads как 4, а затем выполните агрегацию среди 4 потоков. Вы можете проверить документацию spark структурированной потоковой передачи на предмет использования нескольких тем.

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

<--- your aggregation logic here --->
...