Я создаю приложение Spark SQL, которое использует тему Kafka, преобразует некоторые данные, а затем записывает обратно в отдельную тему Kafka с определенным объектом JSON.
У меня есть большая часть этой работы - я могу потреблять, преобразовывать и записывать обратно в Kafka - это форма объекта JSON, записываемого после преобразования, с которой я борюсь.
Прямо сейчас я могу запросить / преобразовать то, что я хочу, и написать это:
Dataset<Row> reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "new_separate_topic")
.save();
Создает такую запись в теме:
{
"record_count": 989
}
Что мне нужно, так это чтобы этот бит JSON был свойством полезной нагрузки (дочерним) более крупного объекта JSON, который мы используем в качестве стандартного потребительского объекта для наших микросервисов.
То, что я хочу написать в теме, на самом деле выглядит так:
{
"id": "ABC123",
"timestamp": "2018-11-16 20:40:26.108",
"user": "DEF456",
"type": "new_entity",
"data": {
"record_count": 989
}
}
Кроме того, поля "id", "user" и "type" будут заполняться извне - они будут исходить из исходного сообщения Kafka, которое запускает весь процесс. По сути, мне нужно ввести некоторые значения для метаданных / объекта, которые я хочу записать в Kafka, и установить в поле «данные» результат SQL-запроса Spark выше.
Возможно ли это? Как? Спасибо!