Я занимаюсь разработкой структурированного приложения Spark Stream, в котором после анализа я наконец записал вывод в Kafka.Схема кадра данных:
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
Код для записи в Kafka:
query = testDataFrame \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.0.1:9092") \
.option("topic", output_topic) \
.option("checkpointLocation", ./tmp") \
.start()
После выполнения этого кода я не получаю никаких ошибок и, к сожалению, я не получаюполучить что-нибудь, когда я запускаю потребительскую команду Kafka из терминала:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output_topic --from-beginning
Обратите внимание, что Kafka и Zookeeper работают локально.
Команда, которую я использовал для отправки проекта:
bin/spark-submit --master local --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.kafka:kafka-clients:1.0.0 pathtoPythonClass
Любая идея, где я делаю ошибку или любую помощь, будет высоко оценена.