У меня возникли некоторые проблемы.
- У меня есть целая куча csv-файлов со стандартными тиковыми данными. Мне нужно будет рассчитать данные уровня секунд каждой акции.
Я хочу, чтобы данные каждой акции могли быть обработаны за 10 секунд, объединены в большой файл и, наконец, выведены в CSV. поскольку использование pandas будет ограничено памятью моего ноутбука, если я захочу сделать в pandas, мне нужно будет выполнить большую работу read_csv / to_csv. Я думаю, что это займет много времени, поэтому я выбрал следующий способ: - (1) Я использую pyspark, чтобы прочитать все файлы csv, сгенерировать большой файл df.
- (2) I получил список акций от дф. Затем делайте для итераций каждый раз, когда я выбираю фрейм данных pyspark для фондовых данных, переносу его в pandas фрейм данных, вычисляю его в pandas. Наконец, выведите этот файл в локальные файлы.
Теперь проблема в том, что программа работает с очень низкой скоростью, для некоторого запаса она обрабатывает 2 минуты. Для некоторых акций обработка занимает около 18 минут.
Я думаю, что проблема связана с разделом данных pyspark dataframe. Pyspark хранил данные в очень отдаленных местах, поэтому он реорганизует данные в соответствии с моими потребностями. Как я могу ускорить его?
#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')
df_empty=spark.createDataFrame(sc.emptyRDD(),t_schema)#pre-defined schema
#generate code list
codelist=df.select('stks').distinct().collect()
col_list=['stks','time']
df=df.orderBy(col_list,ascending=True)
from time import strftime, localtime
#For loops
for code in codelist:
print('++++++++++++++++++++++++++')
print(code)
print(strftime("%Y-%m-%d %H:%M:%S", localtime()))
df2=df.filter(df['stks']==code[0]).select('mkt','stks','time','price')
df2_1=df2.toPandas()
timelist=df2_1['time'].tolist()
pricelist=df2_1['price'].tolist()
#Add flag----target computation
flag_1=[]
flag_2=[]
for i in range(len(timelist)):
# calculate
flag_1.append(calc_incre_2(timelist,pricelist,i,30,0.05))
flag_2.append(calc_incre_2(timelist,pricelist,i,40,0.05))
df2_1['flag_1']=flag_1
df2_1['flag_2']=flag_2
df2_2=sqlContext.createDataFrame(df2_1)
df_empty=df_empty.union(df2_2)
#sub-function----------------------------
def calc_incre_2(timelist,pricelist,start_pos,secs_spec,incre_spec):
i=start_pos
flag=0
# timelist=df2['时间'].tolist()
# pricelist=df2['最新'].tolist()
for t in range(len(timelist)-i):
if (timelist[i+t]-timelist[i]).seconds>secs_spec:
incre=(pricelist[i+t]-pricelist[i])/pricelist[i]
if incre>=incre_spec:
flag=1
else:
flag=0
break
return flag
Я пытался использовать pandas_udf, хотя он все еще не может работать. Должен получить пустой RDD здесь или там.
#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')
df1=df.select('mkt','stks','time','price')
#Using PandasUDF
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
#create the schema for the resulting dataframe
dfs_schema=StructType([StructField('mkt',StringType(),False),
StructField('stks',IntegerType(),False),
StructField('time',TimestampType(),False),
StructField('price',DoubleType(),False),
StructField('flag',IntegerType(),False)])
@pandas_udf(dfs_schema,functionType=PandasUDFType.GROUPED_MAP)
def calc_incre_3(spd):
timelist=spd['time'].tolist()
pricelist=spd['price'].tolist()
flaglist=[]
for i in range(len(timelist)):
for t in range(len(timelist)-i):
if (timelist[i+t]-timelist[i]).seconds>30:
if (pricelist[i+t]-pricelist[i])/pricelist[i]>=0.05:
flaglist.append(1)
else:
flaglist.append(0)
break
spd['flag']=flaglist
return spd
pls_df=df1.groupBy('stks').apply(calc_incre_3)
pls_df.show()
Я загрузил файлы необработанных данных. https://www.dropbox.com/sh/gc5j36mik71a3yc/AAAUVfNINegdv9ozQPqJYiZHa?dl=0