Apache Spark: Кафка напишу в произвольном формате - PullRequest
0 голосов
/ 17 ноября 2018

Я создаю приложение Spark SQL, которое использует тему Kafka, преобразует некоторые данные, а затем записывает обратно в отдельную тему Kafka с определенным объектом JSON.

У меня есть большая часть этой работы - я могу потреблять, преобразовывать и записывать обратно в Kafka - это форма объекта JSON, записываемого после преобразования, с которой я борюсь.

Прямо сейчас я могу запросить / преобразовать то, что я хочу, и написать это:

Dataset<Row> reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))
    .write()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "new_separate_topic")
    .save();

Создает такую ​​запись в теме:

{
  "record_count": 989
}

Что мне нужно, так это чтобы этот бит JSON был свойством полезной нагрузки (дочерним) более крупного объекта JSON, который мы используем в качестве стандартного потребительского объекта для наших микросервисов.

То, что я хочу написать в теме, на самом деле выглядит так:

{
  "id": "ABC123",
  "timestamp": "2018-11-16 20:40:26.108",
  "user": "DEF456",
  "type": "new_entity",
  "data": {
      "record_count": 989
    }
}

Кроме того, поля "id", "user" и "type" будут заполняться извне - они будут исходить из исходного сообщения Kafka, которое запускает весь процесс. По сути, мне нужно ввести некоторые значения для метаданных / объекта, которые я хочу записать в Kafka, и установить в поле «данные» результат SQL-запроса Spark выше.

Возможно ли это? Как? Спасибо!

1 Ответ

0 голосов
/ 18 ноября 2018

Если вы хотите добавить новые поля, то вы не можете просто выбрать только одно.

Например, между write.format("kafka") и .select(), вам нужно сделать что-то вроде withColumn()

Dataset<Row> reader = myData.getRecordCount();
// Keep your DataSet as Columns
reader = reader.select("record_count"))

// Add more data
reader = reader.withColumn(...)

// Then convert structs to JSON and write the output 
reader.select(to_json(...))
    .write()
    .format("kafka")

"id", "userПоля ", и" type "будут заполнены извне - они будут исходить из исходного сообщения Kafka, которое запускает весь процесс

Затем вам необходимо включить select("id", "user", "type") в ваш код

Другой вариант - использовать Kafka Streams вместо того, чтобы принудительно манипулировать DataSets, вы можете использовать реальные классы Java / JSONObjects

...