Цепочка команды cache () в pyspark? - PullRequest
0 голосов
/ 15 января 2020

Я только начинаю узнавать о pyspark и играю с тем, как оптимизировать код с кэшированием. Имеет ли смысл использование цепочек команд cache ()? Вот как выглядит мой код:

token_count_dict = dict(sorted_tokens_rdd.collect())
tokens = list(token_count_dict.keys())

popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
    .filter(lambda x: x[1] in tokens)\
    .distinct()\
    .map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
    .reduceByKey(lambda x, y: x+y)\
    .map(lambda x: (x[0], a_function(x[1], token_count_dict[x[0][1]])))\
    .sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .groupByKey()\
    .map(lambda x: [x[0], list(x[1])])

print(popular_tokens.toDebugString().decode("utf-8"))

вывод:

(2) PythonRDD[149] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[148] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[147] at partitionBy at <unknown>:0 []
 +-(2) PairwiseRDD[146] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
    |  PythonRDD[145] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
    |  MapPartitionsRDD[144] at mapPartitions at PythonRDD.scala:122 []
    |  ShuffledRDD[143] at partitionBy at <unknown>:0 []
    +-(2) PairwiseRDD[142] at sortBy at <ipython-input-24-d694a6d94459>:5 []
       |  PythonRDD[141] at sortBy at <ipython-input-24-d694a6d94459>:5 []
       |  MapPartitionsRDD[138] at mapPartitions at PythonRDD.scala:122 []
       |  ShuffledRDD[137] at partitionBy at <unknown>:0 []
       +-(2) PairwiseRDD[136] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
          |  PythonRDD[135] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
          |  MapPartitionsRDD[134] at mapPartitions at PythonRDD.scala:122 []
          |  ShuffledRDD[133] at partitionBy at <unknown>:0 []
          +-(2) PairwiseRDD[132] at distinct at <ipython-input-24-d694a6d94459>:5 []
             |  PythonRDD[131] at distinct at <ipython-input-24-d694a6d94459>:5 []
             |  ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
             |      CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

на основе вышеприведенной строки, я вижу несколько ветвей, которые могут извлечь выгоду (?) из кэширования. Итак, является ли приведенная ниже лучшая практика оптимизации искры?

На основании проведенного мною исследования, похоже, что консенсус заключается в кэшировании (), где происходит ветвление линии. Когда я определяю время выполнения обеих реализаций с использованием %% timeit, разницы нет.

popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
    .cache()\
    .filter(lambda x: x[1] in tokens)\
    .distinct()\
    .cache()\
    .map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
    .cache()\
    .reduceByKey(lambda x, y: x+y)\
    .map(lambda x: (x[0], get_rel_popularity(x[1], token_count_dict[x[0][1]])))\
    .cache()\
    .sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
    .cache()\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .cache()\
    .groupByKey()\
    .map(lambda x: [x[0], list(x[1])])

Вывод, похоже, все еще имеет много ветвлений

(2) PythonRDD[130] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[129] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[128] at partitionBy at <unknown>:0 []
 +-(2) PairwiseRDD[127] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
    |  PythonRDD[126] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
    |  PythonRDD[125] at RDD at PythonRDD.scala:48 []
    |  PythonRDD[124] at RDD at PythonRDD.scala:48 []
    |  MapPartitionsRDD[123] at mapPartitions at PythonRDD.scala:122 []
    |  ShuffledRDD[122] at partitionBy at <unknown>:0 []
    +-(2) PairwiseRDD[121] at sortBy at <ipython-input-23-5914874b5d65>:5 []
       |  PythonRDD[120] at sortBy at <ipython-input-23-5914874b5d65>:5 []
       |  PythonRDD[117] at RDD at PythonRDD.scala:48 []
       |      CachedPartitions: 2; MemorySize: 7.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
       |  MapPartitionsRDD[116] at mapPartitions at PythonRDD.scala:122 []
       |  ShuffledRDD[115] at partitionBy at <unknown>:0 []
       +-(2) PairwiseRDD[114] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
          |  PythonRDD[113] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
          |  PythonRDD[112] at RDD at PythonRDD.scala:48 []
          |      CachedPartitions: 2; MemorySize: 193.2 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
          |  PythonRDD[111] at RDD at PythonRDD.scala:48 []
          |      CachedPartitions: 2; MemorySize: 188.7 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
          |  MapPartitionsRDD[110] at mapPartitions at PythonRDD.scala:122 []
          |  ShuffledRDD[109] at partitionBy at <unknown>:0 []
          +-(2) PairwiseRDD[108] at distinct at <ipython-input-23-5914874b5d65>:5 []
             |  PythonRDD[107] at distinct at <ipython-input-23-5914874b5d65>:5 []
             |  PythonRDD[106] at RDD at PythonRDD.scala:48 []
             |      CachedPartitions: 2; MemorySize: 652.0 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
             |      CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

Спасибо за помощь нуб!

1 Ответ

0 голосов
/ 15 января 2020

Кэширование - это выбор между сохранением вычислений и потреблением памяти. Вы не можете кэшировать все, потому что это будет занимать память и диск. Память ограничена, и кэширование на диск требует ввода-вывода, когда вы читаете его обратно. Я порекомендую кешировать фрейм данных, который будет дорогостоящим для создания и использования более одного раза.

Если он используется только один раз, я бы его не кешировал, даже если его сборка дорогая, потому что он должен быть собран один раз для использования. Вот почему вы не видите никакого преимущества в производительности, потому что вы не используете повторно кэшированные данные.

В вашем примере я буду кэшировать конечный результат после того, как все фильтры, сортировка, сопоставление и группировка произошли, предполагая что все это используется повторно.

...