Я недавно использую Spark 1.5.1 для обработки данных hadoop.Тем не менее, мой опыт Spark не очень хорош для медленной обработки операции действия (например, .count (), collect ()).Моя задача может быть описана следующим образом:
У меня есть такой кадр данных:
----------------------------
trans item_code item_qty
----------------------------
001 A 2
001 B 3
002 A 4
002 B 6
002 C 10
003 D 1
----------------------------
Мне нужно найти правила связывания двух элементов, например, один из A приведет к одному и aполовина B с уверенностью 0,8.Фрейм данных с желаемым результатом выглядит следующим образом:
----------------------------
item1 item2 conf coef
----------------------------
A B 0.8 1.5
B A 1.0 0.67
A C 0.7 2.5
----------------------------
Мой метод использует FP-Growth, чтобы сначала генерировать часто встречающиеся наборы элементов, а затем фильтровать наборы элементов из одного элемента и наборы элементов из двух элементов.После этого я могу рассчитать достоверность одного элемента, в результате чего другой.Например, имея (itemset = [A], support = 0.4), (itemset = [B], support = 0.2), (itemset = [A, B], support = 0.2), я могу генерировать правила ассоциации: (rule= (A-> B), достоверность = 0,5), (правило = (B-> A), достоверность = 1,0).Однако когда я транслирую наборы частых элементов из одного элемента в качестве словаря, действие .collectAsMap действительно очень медленное.Я пытался использовать .join , и это даже медленнее.Мне даже нужно подождать несколько часов, чтобы увидеть rdd.count () .Я знаю, что мы должны избегать любого использования action action в Spark, но иногда это неизбежно.Поэтому мне любопытно, что является ключом к повышению скорости, когда мы сталкиваемся с операциями действия .
Мой код здесь:
#!/usr/bin/python
from pyspark import SparkContext,HiveContext
from pyspark.mllib.fpm import FPGrowth
import time
#read raw data from database
def read_data():
sql="""select t.orderno_nosplit,
t.prod_code,
t.item_code,
sum(t.item_qty)
as item_qty
from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
group by t.prod_code, t.orderno_nosplit,t.item_code """
data=sql_context.sql(sql)
return data.cache()
#calculate quantity coefficient of two items
def qty_coef(item1,item2):
sql =""" select t1.item, t1.qty from table t1
where t1.trans in
(select t2.trans from spu_table t2 where t2.item ='%s'
and
(select t3.trans from spu_table t3 where t3.item = '%s' """ % (item1,item2)
df=sql_context.sql(sql)
qty_item1=df.filter(df.item_code==item1).agg({"item_qty":"sum"}).first()[0]
qty_item2=df.filter(df.item_code==item2).agg({"item_qty":"sum"}).first()[0]
coef=float(qty_item2)/qty_item1
return coef
def train(prod):
spu=total_spu.filter(total_spu.prod_code == prod)
print 'data length',spu.count(),time.strftime("%H:%M:%S")
supp=0.1
conf=0.7
sql_context.registerDataFrameAsTable(spu,'spu_table')
sql_context.cacheTable('spu_table')
print 'table register over', time.strftime("%H:%M:%S")
trans_sets=spu.rdd.repartition(32).map(lambda x:(x[0],x[2])).groupByKey().mapvalues(list).values().cache()
print 'trans group over',time.strftime("%H:%M:%S")
model=FPGrowth.train(trans_sets,supp,10)
print 'model train over',time.strftime("%H:%M:%S")
model_f1=model.freqItemsets().filter(lambda x: len(x[0]==1))
model_f2=model.freqItemsets().filter(lambda x: len(x[0]==2))
#register model_f1 as dictionary
model_f1_tuple=model_f1.map(lambda (U,V):(tuple(U)[0],V))
model_f1Map=model_f1_tuple.collectAsMap()
#convert model_f1Map to broadcast
bc_model=sc.broadcast(model_f1Map)
#generate association rules
model_f2_conf=model_f2.map(lambda x:(x[0][0],x[0][1],float(x[1])/bc_model.value[x[0][0]],float(x[1]/bc_model.value[x[0][1]])))
print 'conf calculation over',time.strftime("%H:%M:%S")
model_f2_conf_flt=model_f2_conf.flatMap(lambda x: (x[0],x[1]))
#filter the association rules by confidence threshold
model_f2_conf_flt_ftr=model_f2_conf_flt.filter(lambda x: x[2]>=conf)
#calculate the quantity coefficient for the filtered association rules
#since we cannot use nested sql operations in rdd, I have to collect the rules to list first
asso_list=model_f2_conf_flt_ftr.map(lambda x: list(x)).collect()
print 'coef calculation over',time.strftime("%H:%M:%S")
for row in asso_list:
row.append(qty_coef(row[0],row[1]))
#rewrite the list to dataframe
asso_df=sql_context.createDataFrame(asso_list,['item1','item2','conf','coef'])
sql_context.clearCache()
path = "hdfs:/user/hive/wilber/%s"%(prod)
asso_df.write.mode('overwrite').parquet(path)
if __name__ == '__main__':
sc = SparkContext()
sql_context=HiveContext(sc)
prod_list=sc.textFile('hdfs:/user/hive/wilber/prod_list').collect()
total_spu=read_data()
print 'spu read over',time.strftime("%H:%M:%S")
for prod in list(prod_list):
print 'prod',prod
train(prod)