У меня возникают проблемы при попытке записи данных RDD в хранилище Azure Table.
У меня есть rdd, который является результатом операции "RecommendedProductsForUsers (n)" в модели ALS (pyspark.mllib.recommendation.ALS).
Мой первый подход состоял в том, чтобы отобразить мой rdd в новый со строками в качестве совместимого слова для вставки в таблицу (я использую azure.cosmosdb.table.tableservice.TableService). после карты я сделал сбор с намерением получить фактические строки для вставки (я планировал сделать пакетную вставку с ними). Проблема в том, что искра взрывается при этом («Общий размер сериализованных результатов задач больше, чем spark.driver.maxResultSize»).
После этого я попытался изменить подход, выполнив вставки внутри самой функции отображения. По сути, внутри карты после создания совместимого с вставкой dict для каждой строки вместо того, чтобы возвращать объект, как я делал раньше, я делаю саму вставку. Хотя это не вызывает искру, она ничего не пишет, и я не понимаю, почему. Если вместо этого я попытаюсь применить функцию к одной извлеченной вручную строке (take (1) [0]), она будет работать как положено.
Я что-то упустил? почему не работает та же самая функция внутри отображения?
Вот код для функции карты. Как правило, он принимает строку в качестве входных данных, создает объект dict, подходящий для вставки в строку таблицы Azure из table_service, и пытается вставить такой объект в таблицу Azure:
def addCreateTableRow(r, topReturned, table_service):
try:
resPositionCounter = 0
obj = {}
obj['PartitionKey'] = 'partitionKey1'
obj['RowKey'] = dictUsInv[r[0]]
docKeysKey = {}
for d in docKeys:
docKeysKey[d[configDict['docKey']]] = d
for res in r[1]:
try:
art = dictDocInv[res[1]]
if art in docKeysKey.keys():
obj['p'+str(resPositionCounter)] = json.dumps(docKeysKey[art])
resPositionCounter += 1
if resPositionCounter >= topReturned:
break
except:
pass
table_service.insert_or_replace_entity(tableNameWrite,obj)
except:
pass
Это карта:
pr.map(lambda r: addCreateTableRow(r, topReturned, table_service))
где pr - результат вышеупомянутой функции recommendProductsForUsers(n)
ALS