Создание темы Avro для Kafka с использованием Apache Spark - PullRequest
1 голос
/ 21 апреля 2019

Я установил kafka локально (на данный момент нет реестра кластера / схемы) и пытаюсь создать тему Avro, а ниже приведена схема, связанная с этой темой.

{
  "type" : "record",
  "name" : "Customer",
  "namespace" : "com.example.Customer",
  "doc" : "Class: Customer",
  "fields" : [ {
    "name" : "name",
    "type" : "string",
    "doc" : "Variable: Customer Name"
  }, {
    "name" : "salary",
    "type" : "double",
    "doc" : "Variable: Customer Salary"
  } ]
}

Я хотел бы создать простуюSparkProducerApi для создания некоторых данных на основе приведенной выше схемы и публикации их в kafka.Думая о создании примера данных, преобразующих в dataframe, затем измените его на avro и затем опубликуйте.

val df = spark.createDataFrame(<<data>>)

И затем, что-то вроде ниже:

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic","customer_avro_topic")
  .save()
}

Присоединение схемык этой теме можно сделать manually.

Можно ли это сделать, просто используя Apache Spark APIs вместо Java/Kafka Apis?Это для пакетной обработки вместо streaming.

1 Ответ

0 голосов
/ 21 апреля 2019

Я не думаю, что это напрямую возможно, потому что производитель Kafka в Spark ожидает два столбца ключа и значения, оба из которых должны быть байтовыми массивами.

Если вы читаете существующий файл Avro с диска,у читателя Avro Dataframe у вас, скорее всего, есть два столбца для имени и зарплаты.Следовательно, вам потребуется одна операция для создания столбца value из других, содержащих всю запись Avro, затем отбросить эти другие столбцы, а затем вы должны сериализовать его в байтовый массив с использованием библиотеки, такой как Bijection, например, поскольку вы 'Реестр схемы не используется.

Если вы хотите сгенерировать данные, но у вас нет файла, вам нужно создать список объектов Tuple2 для ключа сообщения Kafka и значений, которые являются байтовыми массивами.затем вы можете parallelize передать их в RDD, а затем преобразовать их в Dataframe.Но в этот момент просто использовать обычный API Kafka Producer намного проще.

Плюс, если вы уже знаете свою схему, попробуйте проект, упомянутый в Способы генерации тестовых данных в Kafka

...