Моя цель - использовать Spark Structured Streaming для получения данных из kafka, выполнения обогащения и записи результатов обратно в kafka.Я получил этот пример от https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach
def process_row(row):
# Write row to storage
pass
query = streamingDF.writeStream.foreach(process_row).start()
Как записать обогащенный результат обратно в kafka?Я использую pyspark версии 2.4.2
Я искал в Интернете много часов и не смог найти ни одного примера того, как это сделать.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
def process_row(row):
row_value= row['value'] # comma separated list of values
# Perform enrichment and write the result back to kafka topic.
spark = SparkSession \
.builder \
.appName("dataenrichment") \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.0.1:9092") \
.option("subscribe", "ER-in") \
.option("startingOffsets", "earliest") \
.load()
df \
.selectExpr("CAST(value AS STRING)") \
.writeStream \
.foreach(process_row) \
.start() \
.awaitTermination()
Я ожидал, что тема ER-out содержитобогащенный результат.