записывать данные RDD pyspark в хранилище таблиц Azure - PullRequest
0 голосов
/ 31 октября 2018

У меня возникают проблемы при попытке записи данных RDD в хранилище Azure Table.

У меня есть rdd, который является результатом операции "RecommendedProductsForUsers (n)" в модели ALS (pyspark.mllib.recommendation.ALS).

Мой первый подход состоял в том, чтобы отобразить мой rdd в новый со строками в качестве совместимого слова для вставки в таблицу (я использую azure.cosmosdb.table.tableservice.TableService). после карты я сделал сбор с намерением получить фактические строки для вставки (я планировал сделать пакетную вставку с ними). Проблема в том, что искра взрывается при этом («Общий размер сериализованных результатов задач больше, чем spark.driver.maxResultSize»).

После этого я попытался изменить подход, выполнив вставки внутри самой функции отображения. По сути, внутри карты после создания совместимого с вставкой dict для каждой строки вместо того, чтобы возвращать объект, как я делал раньше, я делаю саму вставку. Хотя это не вызывает искру, она ничего не пишет, и я не понимаю, почему. Если вместо этого я попытаюсь применить функцию к одной извлеченной вручную строке (take (1) [0]), она будет работать как положено.

Я что-то упустил? почему не работает та же самая функция внутри отображения?

Вот код для функции карты. Как правило, он принимает строку в качестве входных данных, создает объект dict, подходящий для вставки в строку таблицы Azure из table_service, и пытается вставить такой объект в таблицу Azure:

def addCreateTableRow(r, topReturned, table_service): try: resPositionCounter = 0 obj = {} obj['PartitionKey'] = 'partitionKey1' obj['RowKey'] = dictUsInv[r[0]] docKeysKey = {} for d in docKeys: docKeysKey[d[configDict['docKey']]] = d for res in r[1]: try: art = dictDocInv[res[1]] if art in docKeysKey.keys(): obj['p'+str(resPositionCounter)] = json.dumps(docKeysKey[art]) resPositionCounter += 1 if resPositionCounter >= topReturned: break except: pass table_service.insert_or_replace_entity(tableNameWrite,obj) except: pass

Это карта: pr.map(lambda r: addCreateTableRow(r, topReturned, table_service)) где pr - результат вышеупомянутой функции recommendProductsForUsers(n) ALS

...