pyspark: почему кеш такой медленный - PullRequest
0 голосов
/ 18 апреля 2019

когда я запускаю

import time
start_time = time.time()
print(df_join.count())
end_time = time.time()
print((end_time - start_time))

я получаю

25721
19.099464416503906

когда я запускаю

start_time = time.time()
df_join.cache()
print(df_join.count())
end_time = time.time()
print((end_time - start_time))

, он все еще работает через 5 минут.неужели так долго кешируется 27 строк данных?шириной около 15-20 столбцов, а сложность представляет собой строку URL.

РЕДАКТИРОВАТЬ 1: оказывается, у меня есть столбец, тип которого - массив jsons.если я возьму это все работает нормально.к сожалению, pyspark читает это как строку, и я не знаю, как сказать, что это массив jsons

Как я могу улучшить его?

Ответы [ 2 ]

0 голосов
/ 18 апреля 2019

В общем, здесь есть несколько факторов:

  • count выполняет запрос, эквивалентный SELECT COUNT(1) FROM table - это позволяет Spark применять радикальную раннюю оптимизацию, чтобы избежать выборки любых данных, которые строго не требуются для вычисления родительской таблицы.

    Однако, если данные помечены как o cached, кэширование может иметь приоритет , и все столбцы, присутствующие в плане, должны быть выбраны.

  • Spark SQL использует MEMORY_AND_DISK уровень хранилища - и выделение и / или восстановление памяти, а также потенциальный дисковый ввод-вывод обходятся дорого.

  • Наконец, кэширование не является бесплатным обедом - оно требует дорогостоящих и обширных преобразований - , следовательно, _AND_DISK уровень хранилища по умолчанию, чтобы уменьшить риск удаления кеша и повторных вычислений.

Если вы предполагаете, что окончательные данные содержат только небольшое количество строк, то наиболее вероятным виновником является первый компонент.

0 голосов
/ 18 апреля 2019

не уверен, что вы подразумеваете под строкой URL, но строки имеют больше байтов и занимают больше всего памяти при сериализации ... я бы запустил

df_join.explain()

и проверьте, сколько тасовок запускается в преобразованиях ... поскольку это небольшой набор данных, уменьшите его до значения, подобного

spark.conf.set("spark.sql.shuffle.partitions, 8)

также хотите убедиться, что у вас достаточно ядер для каждого исполнителя, которые вы можете установить через запуск оболочки во время выполнения, например

pyspark --master yarn executor-cores 5

в целом медлительность может быть вызвана многими причинами, такими как объем данных, с каким параметром конфигурации развертывания (локальный, автономный, пряжа [клиент / кластер]) ... обычно для того, что я видел виновника для более длительного задания сводятся ко многим # выходным разделам, вызванным широкими преобразованиями (соединениями / объединениями), нехваткой ядер исполнителя (по умолчанию при запуске 1, я считаю) и тем, что pyspark / sparkR просто не так быстры из-за отдельные процессы вне JVM, требующие передачи сериализованного объекта в

и из него

также проверьте Spark UI в STORAGE TAB и убедитесь, что все разделы кэшированы на 100% ... если в память помещается только часть, то вам, возможно, придется увеличить память исполнителя, потому что частично кэшированные DF вызывают массу проблем с получением некэшированные разделы

pyspark --master yarn --executor-memory "gb"

извините за множество предложений ... Spark - иногда неприятный маленький пагубник, и основной причиной может быть длинный список проблем

from pyspark.sql.functions import col, array

df = spark.createDataFrame([
    (["1, 2, 3"]),
    (["4, 5, 6"]),
    (["7, 8, 9"])
], ["string_array"])

df.select(array("string_array").alias("array_data")).printSchema()
df.select(array("string_array").alias("array_data")).show()

root
 |-- array_data: array (nullable = false)
 |    |-- element: string (containsNull = true)

+----------+
|array_data|
+----------+
| [1, 2, 3]|
| [4, 5, 6]|
| [7, 8, 9]|
+----------+

jsonDF = spark.range(1).selectExpr("""
  '{"myJSONValue" : [1, 2, 3]}' as jsonString""")
jsonDF.show(truncate=False)
jsonDF.printSchema()


jsonDF.select(array("jsonString").alias("json_array")).show(truncate=False)
jsonDF.select(array("jsonString").alias("json_array")).printSchema()


 +---------------------------+
    |jsonString                 |
    +---------------------------+
    |{"myJSONValue" : [1, 2, 3]}|
    +---------------------------+

root
 |-- jsonString: string (nullable = false)

+-----------------------------+
|json_array                   |
+-----------------------------+
|[{"myJSONValue" : [1, 2, 3]}]|
+-----------------------------+

root
 |-- json_array: array (nullable = false)
 |    |-- element: string (containsNull = false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...