искра: постоянное разделение не работает - PullRequest
1 голос
/ 22 июня 2019

Я пытаюсь проверить, сохраняет ли persist() на rdd после partitionBy последующую операцию, и, похоже, спарк-интерфейс подсказывает, что я не сохраняю.

Я предполагаю, что этап 7 или этап 8 следует пропустить, если persist сработало

(Мой тестовый код может быть неправильным, в любом случае, пожалуйста, дайте мне знать.)

enter image description here

Вот код, который я использую

 from pyspark import SparkContext, SparkConf
 from pyspark.rdd import portable_hash
 from pyspark.sql import SparkSession, Row
 from pyspark.storagelevel import StorageLevel

 transactions = [                                                                                                                                                  
     {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},                                                                                                  
     {'name': 'James', 'amount': 15, 'country': 'United Kingdom'},                                                                                                 
     {'name': 'Marek', 'amount': 51, 'country': 'Poland'},
     {'name': 'Johannes', 'amount': 200, 'country': 'Germany'},
     {'name': 'Paul', 'amount': 75, 'country': 'Poland'},
 ]

                                                                                                                                                               conf = SparkConf().setAppName("word count4").setMaster("local[3]")                                                                                            sc = SparkContext(conf = conf)
 lines = sc.textFile("in/word_count.text")
 words = lines.flatMap(lambda line: line.split(" "))

 rdd = words.map(lambda word: (word, 1))

 rdd = rdd.partitionBy(4)                                                                                                                                      
 rdd = rdd.persist(StorageLevel.MEMORY_ONLY)                                                                                                                   
 rdd = rdd.reduceByKey(lambda x, y: x+y)

 for count, word in rdd.collect():
     print("{} : {}".format(word, count))

 rdd = rdd.sortByKey(ascending=False)

 for count, word in rdd.collect():
     print("{} : {}".format(word, count))

1 Ответ

1 голос
/ 22 июня 2019

Ваше ожидание просто неверно. Если вы проверите DAG

(4) PythonRDD[28] at collect at <ipython-input-15-a9f47c6b3258>:3 []
 |  MapPartitionsRDD[27] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[26] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[25] at sortByKey at <ipython-input-15-a9f47c6b3258>:1 []
    |  PythonRDD[24] at sortByKey at <ipython-input-15-a9f47c6b3258>:1 []
    |  MapPartitionsRDD[20] at mapPartitions at PythonRDD.scala:133 []
    |      CachedPartitions: 4; MemorySize: 6.6 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  ShuffledRDD[19] at partitionBy at NativeMethodAccessorImpl.java:0 []
    +-(1) PairwiseRDD[18] at partitionBy at <ipython-input-13-fff304ea68c9>:6 []
       |  PythonRDD[17] at partitionBy at <ipython-input-13-fff304ea68c9>:6 []
       |  in/word_count.text MapPartitionsRDD[16] at textFile at NativeMethodAccessorImpl.java:0 []
       |  in/word_count.text HadoopRDD[15] at textFile at NativeMethodAccessorImpl.java:0 []

вы увидите, что кэшированный компонент - только одна из многих операций, способствующих вышеупомянутому этапу. И хотя кэшированные данные действительно используются повторно, оставшиеся операции (подготовка shuffle для sortByKey) все еще должны быть вычислены.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...