Я только начинаю узнавать о pyspark и играю с тем, как оптимизировать код с кэшированием. Имеет ли смысл использование цепочек команд cache ()? Вот как выглядит мой код:
token_count_dict = dict(sorted_tokens_rdd.collect())
tokens = list(token_count_dict.keys())
popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
.filter(lambda x: x[1] in tokens)\
.distinct()\
.map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
.reduceByKey(lambda x, y: x+y)\
.map(lambda x: (x[0], a_function(x[1], token_count_dict[x[0][1]])))\
.sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
.map(lambda x: (x[0][0], (x[0][1], x[1])))\
.groupByKey()\
.map(lambda x: [x[0], list(x[1])])
print(popular_tokens.toDebugString().decode("utf-8"))
вывод:
(2) PythonRDD[149] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[148] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[147] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[146] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[145] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
| MapPartitionsRDD[144] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[143] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[142] at sortBy at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[141] at sortBy at <ipython-input-24-d694a6d94459>:5 []
| MapPartitionsRDD[138] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[137] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[136] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[135] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
| MapPartitionsRDD[134] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[133] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[132] at distinct at <ipython-input-24-d694a6d94459>:5 []
| PythonRDD[131] at distinct at <ipython-input-24-d694a6d94459>:5 []
| ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
| CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []
на основе вышеприведенной строки, я вижу несколько ветвей, которые могут извлечь выгоду (?) из кэширования. Итак, является ли приведенная ниже лучшая практика оптимизации искры?
На основании проведенного мною исследования, похоже, что консенсус заключается в кэшировании (), где происходит ветвление линии. Когда я определяю время выполнения обеих реализаций с использованием %% timeit, разницы нет.
popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
.cache()\
.filter(lambda x: x[1] in tokens)\
.distinct()\
.cache()\
.map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
.cache()\
.reduceByKey(lambda x, y: x+y)\
.map(lambda x: (x[0], get_rel_popularity(x[1], token_count_dict[x[0][1]])))\
.cache()\
.sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
.cache()\
.map(lambda x: (x[0][0], (x[0][1], x[1])))\
.cache()\
.groupByKey()\
.map(lambda x: [x[0], list(x[1])])
Вывод, похоже, все еще имеет много ветвлений
(2) PythonRDD[130] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[129] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[128] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[127] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[126] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[125] at RDD at PythonRDD.scala:48 []
| PythonRDD[124] at RDD at PythonRDD.scala:48 []
| MapPartitionsRDD[123] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[122] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[121] at sortBy at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[120] at sortBy at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[117] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 7.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| MapPartitionsRDD[116] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[115] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[114] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[113] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[112] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 193.2 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| PythonRDD[111] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 188.7 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| MapPartitionsRDD[110] at mapPartitions at PythonRDD.scala:122 []
| ShuffledRDD[109] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[108] at distinct at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[107] at distinct at <ipython-input-23-5914874b5d65>:5 []
| PythonRDD[106] at RDD at PythonRDD.scala:48 []
| CachedPartitions: 2; MemorySize: 652.0 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
| CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []
Спасибо за помощь нуб!