Я пытаюсь отфильтровать от rdd, которые имеют значения как «01-10-2019»
print("\n ### Remove duplicates in merged RDD:")
insuredata = insuredatamerged_cache.distinct()
print("insuredata: ",type(insuredata))
print("\n ### Increase partition to 8 in merged RDD:")
insuredata.getNumPartitions()
insuredatarepart = insuredata.repartition(8)
insuredatarepart.getNumPartitions()
print("insuredatarepart:",type(insuredatarepart))
print("\n ### Split RDD with business date field:")
rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)
print(" ### count of rdd_201901001:",rdd_201901001.count())
Входные значения:
, где insuredatarepart - это класс 'pyspark.rdd.RDD 'с указанным ниже набором данных в качестве значений списка
Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='96601', IssuerId2='96601', MarketCoverage=u'SHOP (Small Group)', NetworkName=u'Select Network', NetworkURL=u'http://il.coventryproviders.com', SourceName=u'SERFF', StateCode=u'IL', custnum='13')Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'Yes', IssuerId='37001', IssuerId2='37001', MarketCoverage=u'Individual', NetworkName=u'HumanaDental PPO/Traditional Preferred', NetworkURL=u'https://www.humana.com/finder/search?customerId=1085&pfpkey=317', SourceName=u'HIOS', StateCode=u'GA', custnum='13')
Row(BusinessDate=u'01-10-2019', DentalOnlyPlan=u'No', IssuerId='54172', IssuerId2='54172', MarketCoverage=u'Individual', NetworkName=u'Molina Marketplace', NetworkURL=u'https://eportal.molinahealthcare.com/Provider/ProviderSearch?RedirectFrom=MolinaStaticWeb&State=fl&Coverage=MMP', SourceName=u'HIOS', StateCode=u'FL', custnum='14')
Исключение составляет, как показано ниже:
### Remove duplicates in merged RDD:
insuredata: class 'pyspark.rdd.PipelinedRDD'
Result Count after duplicates removed: 1407
Result Count of duplicates removed: 1
### Increase partition to 8 in merged RDD:
insuredatarepart: class 'pyspark.rdd.RDD'
### Split RDD with business date field:
20/02/05 19:11:43 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID 150)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2371, in pipeline_func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1008, in <genexpr>
File "/home/hduser/sparkdata2/script/insurance_info2_new.py", line 294, in <lambda>
rdd_201901001 = insuredatarepart.map(lambda y: y.split(",",-1)).filter(lambda x: u'01-10-2019' in x)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1502, in __getattr__
raise AttributeError(item)
AttributeError: split
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)