Ошибка сбора в ... из-за отмены этапа из-за закрытия SparkContext - PullRequest
0 голосов
/ 19 февраля 2019

Я хочу отобразить количество элементов в каждом разделе, поэтому я пишу следующее:

def count_in_a_partition(iterator):
    yield sum(1 for _ in iterator)

Если я использую это так

print("number of element in each partitions: {}".format(
  my_rdd.mapPartitions(count_in_a_partition).collect()
))

, я получаю следующее:

19/02/18 21:41:15 INFO DAGScheduler: Job 3 failed: collect at /project/6008168/tamouze/testSparkCedar.py:435, took 30.859710 s
19/02/18 21:41:15 INFO DAGScheduler: ResultStage 3 (collect at /project/6008168/tamouze/testSparkCedar.py:435) failed in 30.848 s due to Stage cancelled because SparkContext was shut down
19/02/18 21:41:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/02/18 21:41:16 INFO MemoryStore: MemoryStore cleared
19/02/18 21:41:16 INFO BlockManager: BlockManager stopped
19/02/18 21:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped
19/02/18 21:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_14 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_14 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_3 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_3 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 INFO SparkContext: Successfully stopped SparkContext
....

отмечая, что my_rdd.take(1) return:

[(u'id', u'text', array([-0.31921682, ...,0.890875]))]

Как мне решить эту проблему?

1 Ответ

0 голосов
/ 19 февраля 2019

Для этого вы должны использовать функцию glom().Давайте рассмотрим пример.

Давайте сначала создадим DataFrame.

rdd=sc.parallelize([('a',22),('b',1),('c',4),('b',1),('d',2),('e',0),('d',3),('a',1),('c',4),('b',7),('a',2),('f',1)] )
df=rdd.toDF(['key','value'])
df=df.repartition(5,"key") # Make 5 Partitions

Количество разделов -

print("Number of partitions: {}".format(df.rdd.getNumPartitions())) 
    Number of partitions: 5

Количество строк / элементов в каждом разделе.Это может дать вам представление о перекосе -

print('Partitioning distribution: '+ str(df.rdd.glom().map(len).collect()))
    Partitioning distribution: [3, 3, 2, 2, 2]

Посмотрите, как на самом деле строки распределяются по разделам.Обратите внимание: если набор данных большой, ваша система может выйти из строя из-за нехватки памяти OOM.

print("Partitions structure: {}".format(df.rdd.glom().collect()))
    Partitions structure: [
       #Partition 1        [Row(key='a', value=22), Row(key='a', value=1), Row(key='a', value=2)], 
       #Partition 2        [Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7)], 
       #Partition 3        [Row(key='c', value=4), Row(key='c', value=4)], 
       #Partition 4        [Row(key='e', value=0), Row(key='f', value=1)], 
       #Partition 5        [Row(key='d', value=2), Row(key='d', value=3)]
                          ]
...