Искра Ошибка при распаковке предметов из кортежа в RDD - PullRequest
0 голосов
/ 02 октября 2019

Я написал скрипт на ноутбуке Jupyter для чтения СДР и выполнения операций. Скрипт отлично работает на Jupyter.

rdd=   [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
                  [{'pol_cat_id':'234','pol_dt':'20100220'}],
                  [{'qor_pol_id':'23492','qor_cd':'30'}]),

     ('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
                  [{'pol_cat_id':'532','pol_dt':'20091020'}],
                  [{'qor_pol_id':'49320','qor_cd':'21'}]) ]


def flatten_map(record):
    # Unpack items
    id, items, [line], [pls] = record
    pol_id = pls["pol_cat_id"]
    pol_dt = pls["pol_dt"]
    qor_id = pls["qor_pol_id"]
    for item in items:
        yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1


 result = (rdd
    # Expand data
    .flatMap(flatten_map)
    # Flatten tuples
    .map(lambda x: x[0],))) 

Однако при конвертации в скрипт Python я получаю сообщение об ошибке:

2019-10-01 14: 12: 46,901:ОШИБКА: id, items, [line], [pls] = record

2019-10-01 14: 12: 46,901: ERROR: ValueError: недостаточно значений для распаковки

(ожидается 1, получил 0)

Есть предложения? Есть ли разница между тем, как Python обрабатывает это на ноутбуке и .py?

1 Ответ

1 голос
/ 02 октября 2019

Это просто некоторые ошибки, принимающие правильные значения для правильных переменных.

Пожалуйста, введите следующий код:

rdd = [('xxxxx99', [{'cov_id':'Q', 'cov_cd':'100','cov_amt':'100', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'33','cov_amt':'200', 'cov_state':'AZ'},
                  {'cov_id':'Q', 'cov_cd':'64','cov_amt':'10', 'cov_state':'AZ'}],
                  [{'pol_cat_id':'234','pol_dt':'20100220'}],
                  [{'qor_pol_id':'23492','qor_cd':'30'}]),
     ('xxxxx86', [{'cov_id':'R', 'cov_cd':'20','cov_amt':'100', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'44','cov_amt':'500', 'cov_state':'TX'},
                  {'cov_id':'R', 'cov_cd':'66','cov_amt':'50', 'cov_state':'TX'}],
                  [{'pol_cat_id':'532','pol_dt':'20091020'}],
                  [{'qor_pol_id':'49320','qor_cd':'21'}]) ]
def flatten_map(record):
    # Unpack items
    id, items, [line], [pls] = record
    pol_id = line["pol_cat_id"]
    pol_dt = line["pol_dt"]
    qor_id = pls["qor_pol_id"]
    for item in items:
        yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id), 1
result = spark.sparkContext.parallelize(rdd).flatMap(flatten_map).map(lambda x: x[0])
result.collect()
# OUTPUT
[('xxxxx99', 'Q', '100', '100', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '33', '200', 'AZ', '234', '20100220', '23492'), ('xxxxx99', 'Q', '64', '10', 'AZ', '234', '20100220', '23492'), ('xxxxx86', 'R', '20', '100', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '44', '500', 'TX', '532', '20091020', '49320'), ('xxxxx86', 'R', '66', '50', 'TX', '532', '20091020', '49320')]
...