Я использую «структурированную потоковую передачу» pyspark (spark 2.3.0), и мне трудно понять, как я могу использовать пользовательскую функцию-обработчик для обработки потоковых данных.
Я прочиталдокументация о foreach
и foreachBatch
.Но он доступен только из версии spark 2.4.x.Есть несколько альтернатив?
Что у меня есть до сих пор:
df = spark \
.readStream \
.format("kafka") \
.option("maxFilesPerTrigger", 1) \
.option("kafka.bootstrap.servers", options["kafka.bootstrap.servers"]) \
.option("startingOffsets", options["startingOffsets"]) \
.option("subscribe", options["subscribe"]) \
.option("failOnDataLoss", options["failOnDataLoss"]) \
.load() \
.select(
col('value').cast("string").alias('json'),
col('key').cast("string").alias('kafka_key'),
col("timestamp").cast("string").alias('kafka_timestamp')
) \
.withColumn('pjson', from_json(col('json'), jsonSchema)).drop('json')
query = df \
.writeStream \
.foreach(customHandler) \ #this doesn't works on spark 2.3.0
.start()
query.awaitTermination()