Как ускорить программирование Pyspark, когда вам нужно разделить наборы данных - PullRequest
0 голосов
/ 19 апреля 2020

У меня возникли некоторые проблемы.

  • У меня есть целая куча csv-файлов со стандартными тиковыми данными. Мне нужно будет рассчитать данные уровня секунд каждой акции.
    Я хочу, чтобы данные каждой акции могли быть обработаны за 10 секунд, объединены в большой файл и, наконец, выведены в CSV. поскольку использование pandas будет ограничено памятью моего ноутбука, если я захочу сделать в pandas, мне нужно будет выполнить большую работу read_csv / to_csv. Я думаю, что это займет много времени, поэтому я выбрал следующий способ:
  • (1) Я использую pyspark, чтобы прочитать все файлы csv, сгенерировать большой файл df.
  • (2) I получил список акций от дф. Затем делайте для итераций каждый раз, когда я выбираю фрейм данных pyspark для фондовых данных, переносу его в pandas фрейм данных, вычисляю его в pandas. Наконец, выведите этот файл в локальные файлы.
    Теперь проблема в том, что программа работает с очень низкой скоростью, для некоторого запаса она обрабатывает 2 минуты. Для некоторых акций обработка занимает около 18 минут.
    Я думаю, что проблема связана с разделом данных pyspark dataframe. Pyspark хранил данные в очень отдаленных местах, поэтому он реорганизует данные в соответствии с моими потребностями. Как я могу ускорить его?
#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')

df_empty=spark.createDataFrame(sc.emptyRDD(),t_schema)#pre-defined schema
#generate code list
codelist=df.select('stks').distinct().collect()
col_list=['stks','time']
df=df.orderBy(col_list,ascending=True)
from time import strftime, localtime
#For loops
for code in codelist:
    print('++++++++++++++++++++++++++')
    print(code)
    print(strftime("%Y-%m-%d %H:%M:%S", localtime()))
    df2=df.filter(df['stks']==code[0]).select('mkt','stks','time','price')
    df2_1=df2.toPandas()
    timelist=df2_1['time'].tolist()
    pricelist=df2_1['price'].tolist()
#Add flag----target computation
    flag_1=[]
    flag_2=[]
    for i in range(len(timelist)):
#        calculate 
        flag_1.append(calc_incre_2(timelist,pricelist,i,30,0.05))
        flag_2.append(calc_incre_2(timelist,pricelist,i,40,0.05))
    df2_1['flag_1']=flag_1
    df2_1['flag_2']=flag_2
    df2_2=sqlContext.createDataFrame(df2_1)
    df_empty=df_empty.union(df2_2)
#sub-function----------------------------
def calc_incre_2(timelist,pricelist,start_pos,secs_spec,incre_spec):
    i=start_pos
    flag=0
#    timelist=df2['时间'].tolist()
#    pricelist=df2['最新'].tolist()
    for t in range(len(timelist)-i):
        if (timelist[i+t]-timelist[i]).seconds>secs_spec:
            incre=(pricelist[i+t]-pricelist[i])/pricelist[i]
            if incre>=incre_spec:
                flag=1
            else:
                flag=0
            break
    return flag

Я пытался использовать pandas_udf, хотя он все еще не может работать. Должен получить пустой RDD здесь или там.

#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')
df1=df.select('mkt','stks','time','price')
#Using PandasUDF
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
#create the schema for the resulting dataframe
dfs_schema=StructType([StructField('mkt',StringType(),False),
                       StructField('stks',IntegerType(),False),
                       StructField('time',TimestampType(),False),
                       StructField('price',DoubleType(),False),
                       StructField('flag',IntegerType(),False)])
@pandas_udf(dfs_schema,functionType=PandasUDFType.GROUPED_MAP)
def calc_incre_3(spd):
    timelist=spd['time'].tolist()
    pricelist=spd['price'].tolist()
    flaglist=[]
    for i in range(len(timelist)):
        for t in range(len(timelist)-i):
            if (timelist[i+t]-timelist[i]).seconds>30:
                if (pricelist[i+t]-pricelist[i])/pricelist[i]>=0.05:
                    flaglist.append(1)
                else:
                    flaglist.append(0)
                break
    spd['flag']=flaglist
    return spd
pls_df=df1.groupBy('stks').apply(calc_incre_3)
pls_df.show()

Я загрузил файлы необработанных данных. https://www.dropbox.com/sh/gc5j36mik71a3yc/AAAUVfNINegdv9ozQPqJYiZHa?dl=0

1 Ответ

0 голосов
/ 21 апреля 2020

Если вы можете поделиться некоторыми примерами данных (замаскированными, если есть проблема с обменом данными), было бы легко предложить, как подойти.

Кроме того, поделитесь следующей информацией

  • общее количество записей, которые имеет ваш DF на рынке.
  • различное количество записей для рынка и акций (это помогает разработать раздел).

Ваш код имеет 2 проблемы. Слишком много циклов (вложенных) и вызовов функций (это не лучший метод в PySpark, поэтому следует избегать такого большого количества усилий, или вы можете написать функцию Java / Scala и использовать ее через PySpark, что несколько повышает производительность, но повышает сложность. когда дело доходит до удобства сопровождения кода)

Рекомендуемый подход

  • Использовать правильное число раздела, которое не указано
  • Использовать оконные функции, чтобы избежать вложенных циклов.
  • Кроме того, необходимо понимать поведение случайного воспроизведения в Spark UI для решения проблемы с производительностью.
  • Выровняйте таблицу, используя для вычисления вложенную l oop.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...