Pyspark - Вложенные словари как глобальная переменная / аккумулятор и разделены между работниками - PullRequest
0 голосов
/ 23 октября 2019

Я пытаюсь запустить конвейер данных с тысячами и тысячами файлов, и цель состоит в том, чтобы вместо того, чтобы запускать каждый файл, я просто запускаю конвейер для последнего файла и добавляю в файл паркета.

Для этого я отслеживаю последнее значение некоторых столбцов во вложенном dict (dict IDs, и у каждого идентификатора есть несколько атрибутов, таких как last_value_1, last_value_2 и т. Д., И где я запускаюновый файл, я просто хочу перезагрузить последний вложенный словарь (который был сохранен из файла).

1) Одна из первых проблем заключается в том, что, когда я хочу сохранить его,это просто сохраняет пустой словарь. Так что я думаю, что мои работники не отправляют / не обновляют словарь для мастера (я работаю на нескольких узлах в GCP)

2) Я пытался с аккумуляторами, но не мог даже сделать эточерез. Код выглядит следующим образом:

spark = SparkSession \
    .builder \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
sc = spark.sparkContext
dictionary = sc.accumulator(defaultdict(dict))

/// SOME DICTIONARY UPDATES

storage_client = storage.Client()
bucket = storage_client.get_bucket(MAIN_BUCKET)

file_to_download = bucket.blob(path)
dictionary = json.dumps(dictionary)
file_to_download.upload_from_string(dictionary)

На данный момент у меня возникла проблема типа: TypeError: No default accumulator param for type <class 'collections.defaultdict'>, и я был бы рад получить помощь по этому вопросу.

Для получения дополнительной информации, выполните следующие действия. ты что я могу делать что хочу? Это хороший способ сделать? А если нет, как это сделать?

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...