Spark Streaming присоединиться к темам Кафка - PullRequest
0 голосов
/ 24 мая 2019

У нас есть две InputDStream из двух тем Кафки, но мы должны объединить данные этих двух входов вместе.Проблема в том, что каждый InputDStream обрабатывается независимо, из-за foreachRDD, ничего не может быть возвращено, до join после.

  var Message1ListBuffer = new ListBuffer[Message1]
  var Message2ListBuffer = new ListBuffer[Message2]

    inputDStream1.foreachRDD(rdd => {
      if (!rdd.partitions.isEmpty) {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map({ msg =>
          val r = msg.value()
          val avro = AvroUtils.objectToAvro(r.getSchema, r)
          val messageValue = AvroInputStream.json[FMessage1](avro.getBytes("UTF-8")).singleEntity.get
          Message1ListBuffer = Message1FlatMapper.flatmap(messageValue)
          Message1ListBuffer
        })
        inputDStream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })


    inputDStream2.foreachRDD(rdd => {
      if (!rdd.partitions.isEmpty) {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map({ msg =>
          val r = msg.value()
          val avro = AvroUtils.objectToAvro(r.getSchema, r)
          val messageValue = AvroInputStream.json[FMessage2](avro.getBytes("UTF-8")).singleEntity.get
          Message2ListBuffer = Message1FlatMapper.flatmap(messageValue)
          Message2ListBuffer

        })
        inputDStream2.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

Я думал, что смогу вернуть Message1ListBuffer и Message2ListBuffer, повернуть ихв кадры данных и присоединиться к ним.Но это не работает, и я не думаю, что это лучший выбор

Откуда, каким образом вернуть rdd каждого foreachRDD для соединения?

inputDStream1.foreachRDD(rdd => {

})


inputDStream2.foreachRDD(rdd => {

})

1 Ответ

1 голос
/ 24 мая 2019

Не уверен насчет используемой версии Spark, с Spark 2.3+ это может быть достигнуто напрямую.

с искрой> = 2,3

Подписаться на 2 темы, к которым вы хотите присоединиться

val ds1 = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load

val ds2 = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic2")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load

Форматирование подписанных сообщений в обоих потоках

val stream1 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

val stream2 = ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Присоединяйтесь к обоим потокам

resultStream = stream1.join(stream2)

больше здесь присоединяются операции

Предупреждение:

Задержка записи не приведет к объединению матчей. Нужно немного подправить буфер. более подробную информацию можно найти здесь

...