Простой Spark структурированный потоковый эквивалент KafkaUtils.createRDD, т. Е. Читать тему kafka в RDD, указав смещения? - PullRequest
0 голосов
/ 30 октября 2018

Как прочитать данные в теме кафки в СДР, указав начальное и конечное смещения?

KafkaUtils.createRDD означает , был экспериментальным, и API довольно неприятен (он возвращает большой раздутый класс Java ConsumerRecord, который даже не сериализуется, и помещает его в KafkaRDD, который переопределяет много методы (например, persist), чтобы просто выдать исключение.

То, что я хотел бы, это простой API, подобный этому:

case class Message(key: String, 
                   value: String, 
                   offset: Long, 
                   timestamp: Long)

def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
             (config: KafkaConfig, sc: SparkContext): RDD[Message]

или что-то подобное, где key: Array[Byte] и value: Array[Byte]

1 Ответ

0 голосов
/ 30 октября 2018

Для чтения из kafka со смещением код будет выглядеть так, как указано здесь

val df = 
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()

Выше будут считаны данные, доступные в пределах смещений, а затем вы можете преобразовать столбцы в строку и привести к вашему объекту Message.

val messageRDD: RDD[Message] = 
  df.select(
    col("key").cast("string"), 
    col("value").cast("string"), 
    col("offset").cast("long"),
    col("timestamp").cast("long")
  ).as[Message].rdd
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...