Я пытаюсь запустить конвейер данных с тысячами и тысячами файлов, и цель состоит в том, чтобы вместо того, чтобы запускать каждый файл, я просто запускаю конвейер для последнего файла и добавляю в файл паркета.
Для этого я отслеживаю последнее значение некоторых столбцов во вложенном dict (dict IDs
, и у каждого идентификатора есть несколько атрибутов, таких как last_value_1
, last_value_2
и т. Д., И где я запускаюновый файл, я просто хочу перезагрузить последний вложенный словарь (который был сохранен из файла).
1) Одна из первых проблем заключается в том, что, когда я хочу сохранить его,это просто сохраняет пустой словарь. Так что я думаю, что мои работники не отправляют / не обновляют словарь для мастера (я работаю на нескольких узлах в GCP)
2) Я пытался с аккумуляторами, но не мог даже сделать эточерез. Код выглядит следующим образом:
spark = SparkSession \
.builder \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
sc = spark.sparkContext
dictionary = sc.accumulator(defaultdict(dict))
/// SOME DICTIONARY UPDATES
storage_client = storage.Client()
bucket = storage_client.get_bucket(MAIN_BUCKET)
file_to_download = bucket.blob(path)
dictionary = json.dumps(dictionary)
file_to_download.upload_from_string(dictionary)
На данный момент у меня возникла проблема типа: TypeError: No default accumulator param for type <class 'collections.defaultdict'>
, и я был бы рад получить помощь по этому вопросу.
Для получения дополнительной информации, выполните следующие действия. ты что я могу делать что хочу? Это хороший способ сделать? А если нет, как это сделать?
Спасибо