Как сделать JOIN на потоковых данных из кафки на потоковой передаче искры - PullRequest
0 голосов
/ 01 февраля 2019

Я новичок в потоковом зажигании.Я пытаюсь выполнить некоторые упражнения по извлечению данных из kafka и соединению с таблицей улья. Я не уверен, как выполнить JOIN в потоковой передаче с искрой (не структурированной потоковой).Вот мой код

   val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))   

   val kafkaParams = Map[String, Object](
   "bootstrap.servers" -> "dofff2.dl.uk.feefr.com:8002",
   "security.protocol" -> "SASL_PLAINTEXT",
   "key.deserializer" -> classOf[StringDeserializer],
   "value.deserializer" -> classOf[StringDeserializer],
   "group.id" -> "1",
   "auto.offset.reset" -> "latest",
   "enable.auto.commit" -> (false: java.lang.Boolean)
   )

   val topics = Array("csvstream")
   val stream = KafkaUtils.createDirectStream[String, String](
   ssc,
   PreferConsistent,
   Subscribe[String, String](topics, kafkaParams)
   )

   val strmk = stream.map(record => (record.value,record.timestamp))

Теперь я хочу сделать соединение на одной из таблиц в улье.В потоковой передаче с искрой я могу напрямую вызывать spark.table («table nanme») и выполнять некоторое соединение, но в потоковой передаче с искрой я могу это сделать, поскольку все основано на RDD.кто-нибудь может мне помочь?

1 Ответ

0 голосов
/ 02 февраля 2019

Вам нужно преобразование .

Требуется что-то подобное:

val dataset: RDD[String, String] = ... // From Hive
val windowedStream = stream.window(Seconds(20))... // From dStream
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

Из руководств:

Операция преобразования(вместе с его вариациями, такими как transformWith) позволяет применять произвольные функции RDD-to-RDD к DStream.Его можно использовать для применения любой операции RDD, которая не предоставляется в API DStream.Например, функциональность объединения каждого пакета в потоке данных с другим набором данных не предоставляется напрямую в API DStream.Тем не менее, вы можете легко использовать transform для этого.Это дает очень мощные возможности.

Пример этого можно найти здесь: Как объединить DStream с непотоковым файлом?

Помогает следующее руководство: https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html

...