Spark Структурированная потоковая передача каждого результата записи обратно в kafka - PullRequest
0 голосов
/ 10 мая 2019

Моя цель - использовать Spark Structured Streaming для получения данных из kafka, выполнения обогащения и записи результатов обратно в kafka.Я получил этот пример от https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach

  def process_row(row):
      # Write row to storage
      pass

  query = streamingDF.writeStream.foreach(process_row).start()  

Как записать обогащенный результат обратно в kafka?Я использую pyspark версии 2.4.2

Я искал в Интернете много часов и не смог найти ни одного примера того, как это сделать.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

def process_row(row):
    row_value= row['value'] # comma separated list of values
    # Perform enrichment and write the result back to kafka topic.

spark = SparkSession \
    .builder \
    .appName("dataenrichment") \
    .getOrCreate()

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", "ER-in") \
    .option("startingOffsets", "earliest") \
    .load() 

df \
    .selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .foreach(process_row) \
    .start() \
    .awaitTermination()

Я ожидал, что тема ER-out содержитобогащенный результат.

...