Spark Структурированная потоковая передача foreach Пользователь Sink не может читать данные из раздела Кафки. - PullRequest
0 голосов
/ 01 апреля 2020

У меня есть работа со струйной структурой для чтения из kafka topi c. Однако при подписке на topi c задание не записывает данные на консоль и не выгружает их в базу данных с помощью foreach writer.

У меня есть класс DBWriter extends ForeachWriter<Row>, но метод open, process, close этого класса никогда не вызывается.

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.

Следовали инструкциям согласно Руководству по интеграции Saprk Kafka . Все еще не работает.

Версия Spark 2.3.1 Кафка 0.10.0

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.3.1</version>
</dependency>

Мой код:

spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")   
  .option("subscribe", "TOPIC1")    
  .option("startingOffsets", "latest") // read data from the end of the stream
  .load()

И

Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
  .cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
  .cast("string"), schema).alias("data"));

selectDf.writeStream()
  .trigger(Trigger.ProcessingTime(1000))
  .foreach(new DBWriterSink())
  .option("checkpointLocation","/tmp/chp_path/")

Входные данные имеет следующий формат:

ДАННЫЕ в формате json:



    {"input_source_data": 
    { "key1":"value1", 
    "key2": "value2"
     } 
    }

1 Ответ

0 голосов
/ 05 апреля 2020

Фактическая проблема была из-за неправильной настройки конфигурации kafka. Подписка topi c не удалась, рукопожатие не удалось. После исправления свойств кафки правильно. Способный читать данные, он устанавливал эти свойства дополнительно. После удаления он начал работать. Возможность прочитать сообщение и увидеть, что ForEachWriter также вызывается.

properties.put("security.protocol", "SSL");

...