Как использовать spark-sql для чтения и обработки события JSON из kafka? - PullRequest
0 голосов
/ 28 мая 2018

Разработано: хотите реализовать scala-код с использованием структурированной потоковой обработки искры, DataFrame для чтения события JSON из Kafka и использовать spark-sql для манипулирования данными / столбцами и записи его в куст?

Использованиеscala 2.11 / spark 2.2

Я понимаю, что создать соединение просто:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Как мне обработать событие JSON?Предполагая, что все события имеют одинаковую схему, я должен предоставить схему, и если да, то как это сделать, а также, если есть способ вывести схему, как это делается?

Если я правильно понимаю, тогда ясоздать tempView , как мне выполнить sql-подобные запросы в этом представлении?

Правка, требуемая вашей системой: автоматическая система пометила это сообщение как дублирующее, но это не так.В связанном вопросе OP попросил исправить проблему с его существующим кодом, и один (действительный) ответ решил проблемы с десериализацией JSON.Мои вопросы отличаются, как указано выше.Если мой вопрос не был ясен, пожалуйста, спросите конкретно, и я постараюсь уточнить дальше.Спасибо.

...