Я выполняю потоковую передачу файлов из каталога, находящегося в состоянии копирования, и я хочу, чтобы мой потоковый код отфильтровывал файлы, находящиеся в состоянии копирования, до тех пор, пока он не будет полностью скопирован.
Пример: file1.json.copying
в конечном итоге изменится на file1.json
. Я хочу, чтобы моя программа не читала, пока находится в состоянии копирования.
## Reading from Input Dir
staticInputDF = spark \
.readStream \
.schema(jsonSchema) \
.json(inputPath)
## Writing to Output Dir
query=staticInputDF.writeStream.format("console").outputMode('append').option("path", output_dir).option("checkpointLocation", checkpoint_dir).start()
query.awaitTermination()