Это то, что я пытаюсь сделать
Потоковые данные из kafka topi c, который непрерывно получает данные. Запускать задание два раза в день, чтобы обработать все данные, существующие в данный момент, и остановить поток.
Поэтому я сначала поставил и вызвал остановку для запроса, но он выдавал «TimeoutException»
Затем я попытался динамически увеличить время ожидания, но теперь я получаю java .io.IOException: вызвано: java .lang.InterruptedException
Итак, есть ли способ изящно остановить поток без получить какие-либо исключения?
Ниже приведен мой текущий код (часть), который генерирует прерванное исключение
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_SERVERS"])
.option("subscribe", config.kafka.topic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 25000)
.load()
)
# <do some processing and save the data>
def save_batch(batch_df, batch_id):
pass
query = df.writeStream.foreachBatch(save_batch).start(
outputMode="append",
checkpointLocation=os.path.join(checkpoint_path, config.kafka.topic),
)
while query.isActive:
progress = query.lastProgress
if progress and progress["numInputRows"] < 25000 * 0.9:
timeout = sum(progress["durationMs"].values())
timeout = min(5 * 60 * 1000, max(15000, timeout))
spark.conf.set("spark.sql.streaming.stopTimeout", str(timeout))
stream_query.stop()
break
time.sleep(10)
Версия Spark: 2.4.5 Scala Версия: 2.1.1