Параллельная запись PySpark foreachPartition в базу данных - PullRequest
0 голосов
/ 13 мая 2018

Я читаю сотни XML-файлов в Spark Dataframe, где каждая строка состоит из метаданных и данных временных рядов для определенного события.Каждая из этих строк преобразуется в rdd, который преобразуется в пакеты документов с определенной структурой ключа / хранилища, а затем записывается в базу данных.Данные XML должны быть разбиты на пакеты <50 КБ, следовательно, вспомогательная функция для создания пакетов, показанных ниже. </p>

def build_documents(data):

    # Make dataframe out of data tags
    data = pd.DataFrame([i.split(',') for i in list(chain(*(data)))])

    # Helper function to Get Batches
    for batch in get_batches(data): 
          x = batch.T.to_dict()
          yield x

def process_partition(partition):
   client = document_client.DocumentClient(HOST, {'masterKey': MASTER_KEY} )
   for element in partition:
        generator = build_documents(element)
        for batch in generator:
            client.CreateDocument(collection_link + 'data', batch)


# Write to Database
df.rdd.coalesce(20).foreachPartition(process_partition)

Все еще настраиваете количество разделов, но есть какие-нибудь мысли о том, как это можно улучшить?Производительность действительно низкая, как и ожидалось с помощью кода, реализованного до сих пор.Кластер состоит из 32 ядер, 128,0 ГБ памяти для обоих драйверов и может масштабироваться до 8 исполнителей.Как показано ниже, работает только два рабочих, что, очевидно, не оптимально при дальнейшем расширении.Мысли?

enter image description here

enter image description here

1 Ответ

0 голосов
/ 14 ноября 2018

df.rdd.coalesce(20).foreachPartition(process_partition) будет записывать последовательные записи в базу данных. и более того, ваша логика для функции process_partition также будет последовательной.

Вам необходимо многопоточность логики для определения process_partition. Это ускорит процесс. Также используйте df.rdd.coalesce(20).foreachPartitionAsync(process_partition)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...