pyspark - Как отправить данные из потока в функцию с помощью spark 2.3.0 - PullRequest
0 голосов
/ 08 февраля 2019

Я использую «структурированную потоковую передачу» pyspark (spark 2.3.0), и мне трудно понять, как я могу использовать пользовательскую функцию-обработчик для обработки потоковых данных.

Я прочиталдокументация о foreach и foreachBatch.Но он доступен только из версии spark 2.4.x.Есть несколько альтернатив?

Что у меня есть до сих пор:

    df = spark \
        .readStream \
        .format("kafka") \
        .option("maxFilesPerTrigger", 1) \
        .option("kafka.bootstrap.servers", options["kafka.bootstrap.servers"]) \
        .option("startingOffsets", options["startingOffsets"]) \
        .option("subscribe", options["subscribe"]) \
        .option("failOnDataLoss", options["failOnDataLoss"]) \
        .load() \
        .select( 
          col('value').cast("string").alias('json'),
          col('key').cast("string").alias('kafka_key'),
          col("timestamp").cast("string").alias('kafka_timestamp')
          ) \
        .withColumn('pjson', from_json(col('json'), jsonSchema)).drop('json')

    query = df \
    .writeStream \
    .foreach(customHandler) \ #this doesn't works on spark 2.3.0
    .start()

    query.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...