У меня есть работа со струйной структурой для чтения из kafka topi c. Однако при подписке на topi c задание не записывает данные на консоль и не выгружает их в базу данных с помощью foreach writer.
У меня есть класс DBWriter extends ForeachWriter<Row>
, но метод open, process, close
этого класса никогда не вызывается.
Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.
Следовали инструкциям согласно Руководству по интеграции Saprk Kafka . Все еще не работает.
Версия Spark 2.3.1 Кафка 0.10.0
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
Мой код:
spark.readStream().format("kafka").option.option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
.option("subscribe", "TOPIC1")
.option("startingOffsets", "latest") // read data from the end of the stream
.load()
И
Dataset<Row> selectDf = dataframe.select(dataframe.col("key")
.cast("string"),org.apache.spark.sql.functions.from_json(dataframe.col("value")
.cast("string"), schema).alias("data"));
selectDf.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.foreach(new DBWriterSink())
.option("checkpointLocation","/tmp/chp_path/")
Входные данные имеет следующий формат:
ДАННЫЕ в формате json:
{"input_source_data":
{ "key1":"value1",
"key2": "value2"
}
}