Pyspark: как выбрать подмножество набора данных и сохранить его? - PullRequest
0 голосов
/ 03 февраля 2020

У меня есть набор данных, которым я пытаюсь управлять с помощью pyspark и для которого я хотел бы выбрать подмножество. Позвольте мне сказать, что я совершенно новичок в spark, scala и pyspark в целом.

Это то, что я делаю.

from pyspark.sql import SparkSession 
from pyspark.sql.types import *
import pyspark.sql.functions as fs

spark  = SparkSession.builder\
                  .master("local")\
                  .enableHiveSupport()\
                  .getOrCreate()

spark.conf.set("spark.executor.memory", '16g')
spark.conf.set('spark.executor.cores', '16')
spark.conf.set("spark.shuffle.compress", 'True')
spark.conf.set("spark.python.worker.memory", '16g')
spark.conf.set("spark.default.parallelism", '10')

sc = spark.sparkContext


%%time
f = "myData"
schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Code", StringType(), True),
    StructField("bool", IntegerType(), True),
    StructField("lat", FloatType(), True),
    StructField("lon", FloatType(), True),
    StructField("v1", FloatType(), True),
    StructField("v2", IntegerType(), True),
    StructField("v3", FloatType(), True)])

df = spark.read.format("csv").schema(schema).load(f)
df.show()

+--------------------+----+----+---------+----------+---------+----------+---------+
|                  ID|Code|bool|      lat|       lon|       v1|        v2|       v3|
+--------------------+----+----+---------+----------+---------+----------+---------+
|5ac52674ffff34c98...|IDFA|   1|42.377167| -71.06994|17.422535|1525319638|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37747|-71.069824|17.683573|1525319639|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37757| -71.06942|22.287935|1525319640|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37761| -71.06943|19.110023|1525319641|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.377243| -71.06952|18.904774|1525319642|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378254| -71.06948|20.772903|1525319643|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37801| -71.06983|18.084948|1525319644|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378693| -71.07033| 15.64326|1525319645|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378723|-71.070335|21.093477|1525319646|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37868| -71.07034|21.851894|1525319647|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378716| -71.07029|20.583202|1525319648|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37872| -71.07067|19.738768|1525319649|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.379112| -71.07097|20.480911|1525319650|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37952|  -71.0708|20.526752|1525319651| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37902| -71.07056|20.534052|1525319652| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.380203|  -71.0709|19.921381|1525319653| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37968|-71.071144| 20.12599|1525319654| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.379696| -71.07114|18.760069|1525319655| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38011| -71.07123|19.155525|1525319656| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38022|  -71.0712|16.978994|1525319657| 36.77853|
+--------------------+----+----+---------+----------+---------+----------+---------+
only showing top 20 rows

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 3.44 s

Позвольте сказать, что я бы нравится выделять информацию о первых ID.

%%time
id0 = df.first().ID  ## First ID
tmp = df.filter( (df['ID'] == id0) )

CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 514 ms

%%time
tmp.count()
CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 1min 25s
Out[5]:
3299

На выходе получается pyspark.sql.dataframe.DataFrame с 3299 строками. Первый вопрос: почему требуется так много времени для подсчета 3299 строк?

Второй вопрос: как я могу сохранить его или преобразовать в обычный фрейм данных? Это нормально, что так долго конвертируется в обычный фрейм данных?

%%time
result_pdf = tmp.select("*").toPandas()
CPU times: user 60 ms, sys: 12 ms, total: 72 ms
Wall time: 3min 33s

1 Ответ

0 голосов
/ 03 февраля 2020

Ну, оба ваших вопроса связаны с ОЗУ.

  1. Тот факт, что Spark не загружает данные в ОЗУ (в отличие от Pandas), объясняет медлительность. Дело в том, что строка 3200 недостаточно велика , чтобы использовать Spark, я бы сказал.

    Если вы можете использовать Pandas, лучше использовать его, потому что он будет быстрее.

    Если вы хотите масштабировать до нескольких миллионов / миллиардов и не можете загрузить свои данные в ОЗУ (потому что Pandas действительно жадный: для набора данных 1 ГБ вам, вероятно, понадобится 10 ГБ ОЗУ), так что вы можете из памяти компьютер быстро.

    Итак, если вы получили несколько ГБ или ТБ (или даже ПБ), Spark для вас. Если вы получили меньше этого и можете разместить свои данные в ОЗУ, используйте Pandas

Так что масштабирование скорости не достаточно быстрое, чтобы вы могли увидеть разница, но Spark, вероятно, будет столь же быстрым для данных 3K и 3M, потому что он разделяет выполнение, распределяет вычисления, затем собирает их снова и т. д. c. В отличие от Pandas, который выполняет все вычисления в оперативной памяти.

Скорость преобразования (toPandas()) также медленная, потому что вы загружаете все данные в память (RAM) для передачи. Это действительно похоже на загрузку ваших данных с помощью Pandas и последующую фильтрацию.

Если вы хотите сохранить их, лучший способ:

data.write.parquet("data.parquet")

или

data.write.csv("data.csv")

Дополнительная документация: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...