Разработано: хотите реализовать scala-код с использованием структурированной потоковой обработки искры, DataFrame для чтения события JSON из Kafka и использовать spark-sql для манипулирования данными / столбцами и записи его в куст?
Использованиеscala 2.11 / spark 2.2
Я понимаю, что создать соединение просто:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Как мне обработать событие JSON?Предполагая, что все события имеют одинаковую схему, я должен предоставить схему, и если да, то как это сделать, а также, если есть способ вывести схему, как это делается?
Если я правильно понимаю, тогда ясоздать tempView , как мне выполнить sql-подобные запросы в этом представлении?
Правка, требуемая вашей системой: автоматическая система пометила это сообщение как дублирующее, но это не так.В связанном вопросе OP попросил исправить проблему с его существующим кодом, и один (действительный) ответ решил проблемы с десериализацией JSON.Мои вопросы отличаются, как указано выше.Если мой вопрос не был ясен, пожалуйста, спросите конкретно, и я постараюсь уточнить дальше.Спасибо.