Я могу прочитать поток из темы Кафки и записать (преобразованные) данные обратно в другую тему Кафки в два разных этапа в PySpark.Код для этого выглядит следующим образом:
# Define Stream:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "instream") \
.load()
# Transform
matchdata = df.select(from_json(F.col("value").cast("string"),schema).alias("value"))\
.select(F.col('value').cast("string"))
# Stream the data, from a Kafka topic to a Spark in-memory table
query = matchdata \
.writeStream \
.format("memory") \
.queryName("PositionTable") \
.outputMode("append") \
.start()
query.awaitTermination(5)
# Create a new dataframe after stream completes:
tmp_df=spark.sql("select * from PositionTable")
# Write data to a different Kafka topic
tmp_df \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "outstream") \
.save()
Приведенный выше код работает должным образом: данные в теме Kafka «instream» читаются в PySpark, а затем PySpark может записывать данные в тему Kafka «outtream»Msgstr ".
Тем не менее, я хотел бы прочитать поток и немедленно записать преобразованные данные (поток будет неограниченным, и мы хотели бы получить информацию сразу, как только данные появятся).Следуя документации , я заменил приведенный выше запрос следующим:
query = matchdata \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "outstream") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.start()
Это не похоже на работу.Там нет сообщения об ошибке, поэтому я не знаю, что не так.Я также пробовал работать с окнами и агрегировать в Windows, но это также не работает.Любой совет будет оценен!