pyspark rdd сплит выпуск - PullRequest
       21

pyspark rdd сплит выпуск

1 голос
/ 08 февраля 2020

Я пытаюсь отфильтровать от rdd, которые имеют значения как «01-10-2019»

print("\n ### Remove duplicates in merged RDD:")

insuredata = insuredatamerged_cache.distinct()
print("insuredata: ",type(insuredata))

print("\n  ### Increase partition to 8 in merged RDD:")
insuredata.getNumPartitions()
insuredatarepart = insuredata.repartition(8)
insuredatarepart.getNumPartitions()

print("insuredatarepart:",type(insuredatarepart))

print("\n ### Split RDD with business date field:")

rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)

print(" ### count of rdd_201901001:",rdd_201901001.count())

Входные значения:

, где insuredatarepart - это класс 'pyspark.rdd.RDD 'с указанным ниже набором данных в качестве значений списка

Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='96601', IssuerId2='96601', MarketCoverage=u'SHOP (Small Group)', NetworkName=u'Select Network', NetworkURL=u'http://il.coventryproviders.com', SourceName=u'SERFF', StateCode=u'IL', custnum='13')Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'Yes', IssuerId='37001', IssuerId2='37001', MarketCoverage=u'Individual', NetworkName=u'HumanaDental PPO/Traditional Preferred', NetworkURL=u'https://www.humana.com/finder/search?customerId=1085&pfpkey=317', SourceName=u'HIOS', StateCode=u'GA', custnum='13')
    Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='54172', IssuerId2='54172', MarketCoverage=u'Individual', NetworkName=u'Molina Marketplace', NetworkURL=u'https://eportal.molinahealthcare.com/Provider/ProviderSearch?RedirectFrom=MolinaStaticWeb&State=fl&Coverage=MMP', SourceName=u'HIOS', StateCode=u'FL', custnum='14')

Исключение составляет, как показано ниже:

### Remove duplicates in merged RDD:
insuredata:  class 'pyspark.rdd.PipelinedRDD'
 Result Count after duplicates removed:  1407
 Result Count of duplicates removed:  1

### Increase partition to 8 in merged RDD:
insuredatarepart: class 'pyspark.rdd.RDD'

### Split RDD with business date field:
20/02/05 19:11:43 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID 150)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <genexpr>
  File "/home/hduser/sparkdata2/script/insurance_info2_new.py", line 294, in <lambda>
    rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1502, in __getattr__
    raise AttributeError(item)
AttributeError: split

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

1 Ответ

0 голосов
/ 10 февраля 2020

Из предоставленного вами печатного материала видно, что у вас есть RDD типа Row.

Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='96601', IssuerId2='96601', MarketCoverage=u'SHOP (Small Group)', NetworkName=u'Select Network', NetworkURL=u'http://il.coventryproviders.com', SourceName=u'SERFF', StateCode=u'IL', custnum='13')Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'Yes', IssuerId='37001', IssuerId2='37001', MarketCoverage=u'Individual', NetworkName=u'HumanaDental PPO/Traditional Preferred', NetworkURL=u'https://www.humana.com/finder/search?customerId=1085&pfpkey=317', SourceName=u'HIOS', StateCode=u'GA', custnum='13')
Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='54172', IssuerId2='54172', MarketCoverage=u'Individual', NetworkName=u'Molina Marketplace', NetworkURL=u'https://eportal.molinahealthcare.com/Provider/ProviderSearch?RedirectFrom=MolinaStaticWeb&State=fl&Coverage=MMP', SourceName=u'HIOS', StateCode=u'FL', custnum='14')

Здесь вы не должны вызывать функцию split, чтобы разделить элементы, потому что кажется, что они уже разделены на несколько полей в любом процессе, который вы использовали для их получения. Вы можете просто получить доступ через индекс предмета.

rdd_201901001 = insuredatarepart.filter(lambda x: u'01-10-2019' in x[0])

Обратите внимание, что карта удалена, а индекс добавлен в предложении фильтра как in x[0]

Если у вас было одно поле типа строки в вашем Строка (чего вы не делаете, основываясь на общем выводе); вам все равно придется вызывать split для нулевого элемента, а не для самой строки, и оператор мог бы быть

rdd_201901001 = insuredatarepart.map(lambda y: y[0].split(",",-1)).filter(lambda x: u'01-10-2019' in x[0])

Обратите внимание, что значения индекса были применены как в операциях map, так и filter. Это привело бы к созданию RDD со списком строк, которые вам нужно было бы соединить вместе.

...