У нас есть две InputDStream
из двух тем Кафки, но мы должны объединить данные этих двух входов вместе.Проблема в том, что каждый InputDStream
обрабатывается независимо, из-за foreachRDD
, ничего не может быть возвращено, до join
после.
var Message1ListBuffer = new ListBuffer[Message1]
var Message2ListBuffer = new ListBuffer[Message2]
inputDStream1.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map({ msg =>
val r = msg.value()
val avro = AvroUtils.objectToAvro(r.getSchema, r)
val messageValue = AvroInputStream.json[FMessage1](avro.getBytes("UTF-8")).singleEntity.get
Message1ListBuffer = Message1FlatMapper.flatmap(messageValue)
Message1ListBuffer
})
inputDStream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
inputDStream2.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map({ msg =>
val r = msg.value()
val avro = AvroUtils.objectToAvro(r.getSchema, r)
val messageValue = AvroInputStream.json[FMessage2](avro.getBytes("UTF-8")).singleEntity.get
Message2ListBuffer = Message1FlatMapper.flatmap(messageValue)
Message2ListBuffer
})
inputDStream2.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
Я думал, что смогу вернуть Message1ListBuffer и Message2ListBuffer, повернуть ихв кадры данных и присоединиться к ним.Но это не работает, и я не думаю, что это лучший выбор
Откуда, каким образом вернуть rdd каждого foreachRDD для соединения?
inputDStream1.foreachRDD(rdd => {
})
inputDStream2.foreachRDD(rdd => {
})