Если вы хотите получить информацию о файлах или каталогах, считываемых из фабрики данных, это можно сделать с помощью действия Get Metadata Activity, см. Следующий пример answer .
Другой подходдля обнаружения новых файлов в вашей записной книжке будет использоваться структурированная потоковая передача с файловыми источниками.Это работает довольно хорошо, и вы просто вызываете блокнот после операции копирования.
Для этого вы определяете потоковый фрейм входных данных:
streamingInputDF = (
spark
.readStream
.schema(pqtSchema)
.parquet(inputPath)
)
с inputPath , указывающим на входной каталог в хранилище BLOB-объектов.Поддерживаемые форматы файлов: текстовый, csv, json, orc, parquet, поэтому это будет зависеть от вашего конкретного сценария, будет ли это работать для вас.
Важно, чтобы на цели вы использовали опцию триггер один раз, поэтому записная книжкане нужно запускать постоянно, например:
streamingOutputDF \
.repartition(1) \
.writeStream \
.format("parquet") \
.partitionBy('Id') \
.option("checkpointLocation", adlpath + "spark/checkpointlocation/data/trusted/sensorreadingsdelta") \
.option("path", targetPath + "delta") \
.trigger(once=True) \
.start()
Другой подход может заключаться в использовании хранилища очередей Azure (AQS), см. следующую документацию .