Spark Streaming Kafka - Как остановить потоковую передачу после обработки всех существующих сообщений (изящно) - PullRequest
0 голосов
/ 01 мая 2020

Это то, что я пытаюсь сделать

Потоковые данные из kafka topi c, который непрерывно получает данные. Запускать задание два раза в день, чтобы обработать все данные, существующие в данный момент, и остановить поток.

Поэтому я сначала поставил и вызвал остановку для запроса, но он выдавал «TimeoutException»

Затем я попытался динамически увеличить время ожидания, но теперь я получаю java .io.IOException: вызвано: java .lang.InterruptedException

Итак, есть ли способ изящно остановить поток без получить какие-либо исключения?

Ниже приведен мой текущий код (часть), который генерирует прерванное исключение

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", os.environ["KAFKA_SERVERS"])
    .option("subscribe", config.kafka.topic)
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 25000)
    .load()
)


#   <do some processing and save the data>
def save_batch(batch_df, batch_id):
    pass

query = df.writeStream.foreachBatch(save_batch).start(
    outputMode="append",
    checkpointLocation=os.path.join(checkpoint_path, config.kafka.topic),
)

while query.isActive:
    progress = query.lastProgress
    if progress and progress["numInputRows"] < 25000 * 0.9:
        timeout = sum(progress["durationMs"].values())
        timeout = min(5 * 60 * 1000, max(15000, timeout))
        spark.conf.set("spark.sql.streaming.stopTimeout", str(timeout))
        stream_query.stop()
        break
    time.sleep(10)

Версия Spark: 2.4.5 Scala Версия: 2.1.1

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...