Я читаю сотни XML-файлов в Spark Dataframe, где каждая строка состоит из метаданных и данных временных рядов для определенного события.Каждая из этих строк преобразуется в rdd, который преобразуется в пакеты документов с определенной структурой ключа / хранилища, а затем записывается в базу данных.Данные XML должны быть разбиты на пакеты <50 КБ, следовательно, вспомогательная функция для создания пакетов, показанных ниже. </p>
def build_documents(data):
# Make dataframe out of data tags
data = pd.DataFrame([i.split(',') for i in list(chain(*(data)))])
# Helper function to Get Batches
for batch in get_batches(data):
x = batch.T.to_dict()
yield x
def process_partition(partition):
client = document_client.DocumentClient(HOST, {'masterKey': MASTER_KEY} )
for element in partition:
generator = build_documents(element)
for batch in generator:
client.CreateDocument(collection_link + 'data', batch)
# Write to Database
df.rdd.coalesce(20).foreachPartition(process_partition)
Все еще настраиваете количество разделов, но есть какие-нибудь мысли о том, как это можно улучшить?Производительность действительно низкая, как и ожидалось с помощью кода, реализованного до сих пор.Кластер состоит из 32 ядер, 128,0 ГБ памяти для обоих драйверов и может масштабироваться до 8 исполнителей.Как показано ниже, работает только два рабочих, что, очевидно, не оптимально при дальнейшем расширении.Мысли?