У меня есть набор данных, которым я пытаюсь управлять с помощью pyspark
и для которого я хотел бы выбрать подмножество. Позвольте мне сказать, что я совершенно новичок в spark
, scala
и pyspark
в целом.
Это то, что я делаю.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as fs
spark = SparkSession.builder\
.master("local")\
.enableHiveSupport()\
.getOrCreate()
spark.conf.set("spark.executor.memory", '16g')
spark.conf.set('spark.executor.cores', '16')
spark.conf.set("spark.shuffle.compress", 'True')
spark.conf.set("spark.python.worker.memory", '16g')
spark.conf.set("spark.default.parallelism", '10')
sc = spark.sparkContext
%%time
f = "myData"
schema = StructType([
StructField("ID", StringType(), True),
StructField("Code", StringType(), True),
StructField("bool", IntegerType(), True),
StructField("lat", FloatType(), True),
StructField("lon", FloatType(), True),
StructField("v1", FloatType(), True),
StructField("v2", IntegerType(), True),
StructField("v3", FloatType(), True)])
df = spark.read.format("csv").schema(schema).load(f)
df.show()
+--------------------+----+----+---------+----------+---------+----------+---------+
| ID|Code|bool| lat| lon| v1| v2| v3|
+--------------------+----+----+---------+----------+---------+----------+---------+
|5ac52674ffff34c98...|IDFA| 1|42.377167| -71.06994|17.422535|1525319638|36.853622|
|5ac52674ffff34c98...|IDFA| 1| 42.37747|-71.069824|17.683573|1525319639|36.853622|
|5ac52674ffff34c98...|IDFA| 1| 42.37757| -71.06942|22.287935|1525319640|36.853622|
|5ac52674ffff34c98...|IDFA| 1| 42.37761| -71.06943|19.110023|1525319641|36.853622|
|5ac52674ffff34c98...|IDFA| 1|42.377243| -71.06952|18.904774|1525319642|36.853622|
|5ac52674ffff34c98...|IDFA| 1|42.378254| -71.06948|20.772903|1525319643|36.853622|
|5ac52674ffff34c98...|IDFA| 1| 42.37801| -71.06983|18.084948|1525319644|36.853622|
|5ac52674ffff34c98...|IDFA| 1|42.378693| -71.07033| 15.64326|1525319645|36.853622|
|5ac52674ffff34c98...|IDFA| 1|42.378723|-71.070335|21.093477|1525319646|36.853622|
|5ac52674ffff34c98...|IDFA| 1| 42.37868| -71.07034|21.851894|1525319647|36.853622|
|5ac52674ffff34c98...|IDFA| 1|42.378716| -71.07029|20.583202|1525319648|36.853622|
|5ac52674ffff34c98...|IDFA| 1| 42.37872| -71.07067|19.738768|1525319649|36.853622|
|5ac52674ffff34c98...|IDFA| 1|42.379112| -71.07097|20.480911|1525319650|36.853622|
|5ac52674ffff34c98...|IDFA| 1| 42.37952| -71.0708|20.526752|1525319651| 44.93808|
|5ac52674ffff34c98...|IDFA| 1| 42.37902| -71.07056|20.534052|1525319652| 44.93808|
|5ac52674ffff34c98...|IDFA| 1|42.380203| -71.0709|19.921381|1525319653| 44.93808|
|5ac52674ffff34c98...|IDFA| 1| 42.37968|-71.071144| 20.12599|1525319654| 44.93808|
|5ac52674ffff34c98...|IDFA| 1|42.379696| -71.07114|18.760069|1525319655| 36.77853|
|5ac52674ffff34c98...|IDFA| 1| 42.38011| -71.07123|19.155525|1525319656| 36.77853|
|5ac52674ffff34c98...|IDFA| 1| 42.38022| -71.0712|16.978994|1525319657| 36.77853|
+--------------------+----+----+---------+----------+---------+----------+---------+
only showing top 20 rows
CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 3.44 s
Позвольте сказать, что я бы нравится выделять информацию о первых ID
.
%%time
id0 = df.first().ID ## First ID
tmp = df.filter( (df['ID'] == id0) )
CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 514 ms
%%time
tmp.count()
CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 1min 25s
Out[5]:
3299
На выходе получается pyspark.sql.dataframe.DataFrame
с 3299
строками. Первый вопрос: почему требуется так много времени для подсчета 3299
строк?
Второй вопрос: как я могу сохранить его или преобразовать в обычный фрейм данных? Это нормально, что так долго конвертируется в обычный фрейм данных?
%%time
result_pdf = tmp.select("*").toPandas()
CPU times: user 60 ms, sys: 12 ms, total: 72 ms
Wall time: 3min 33s