Что является ключом для повышения скорости, когда нам нужна операция действия при обработке больших данных Spark? - PullRequest
0 голосов
/ 23 мая 2018

Я недавно использую Spark 1.5.1 для обработки данных hadoop.Тем не менее, мой опыт Spark не очень хорош для медленной обработки операции действия (например, .count (), collect ()).Моя задача может быть описана следующим образом:

У меня есть такой кадр данных:

----------------------------
trans   item_code   item_qty
----------------------------
001         A          2
001         B          3
002         A          4
002         B          6
002         C          10
003         D          1
---------------------------- 

Мне нужно найти правила связывания двух элементов, например, один из A приведет к одному и aполовина B с уверенностью 0,8.Фрейм данных с желаемым результатом выглядит следующим образом:

----------------------------  
item1  item2  conf   coef 
----------------------------
  A      B     0.8    1.5
  B      A     1.0    0.67
  A      C     0.7    2.5
----------------------------

Мой метод использует FP-Growth, чтобы сначала генерировать часто встречающиеся наборы элементов, а затем фильтровать наборы элементов из одного элемента и наборы элементов из двух элементов.После этого я могу рассчитать достоверность одного элемента, в результате чего другой.Например, имея (itemset = [A], support = 0.4), (itemset = [B], support = 0.2), (itemset = [A, B], support = 0.2), я могу генерировать правила ассоциации: (rule= (A-> B), достоверность = 0,5), (правило = (B-> A), достоверность = 1,0).Однако когда я транслирую наборы частых элементов из одного элемента в качестве словаря, действие .collectAsMap действительно очень медленное.Я пытался использовать .join , и это даже медленнее.Мне даже нужно подождать несколько часов, чтобы увидеть rdd.count () .Я знаю, что мы должны избегать любого использования action action в Spark, но иногда это неизбежно.Поэтому мне любопытно, что является ключом к повышению скорости, когда мы сталкиваемся с операциями действия .

Мой код здесь:

#!/usr/bin/python
from pyspark import SparkContext,HiveContext
from pyspark.mllib.fpm import FPGrowth
import time

#read raw data from database    
def read_data():
    sql="""select t.orderno_nosplit, 
          t.prod_code, 
          t.item_code, 
          sum(t.item_qty) 
          as item_qty 
          from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
          group by t.prod_code, t.orderno_nosplit,t.item_code """
    data=sql_context.sql(sql)                  
    return data.cache()

#calculate quantity coefficient of two items 
def qty_coef(item1,item2):    
    sql =""" select t1.item, t1.qty from table t1
           where t1.trans in 
           (select t2.trans from spu_table t2 where t2.item ='%s'
           and
           (select t3.trans from spu_table t3 where t3.item = '%s' """ %  (item1,item2)
    df=sql_context.sql(sql)
    qty_item1=df.filter(df.item_code==item1).agg({"item_qty":"sum"}).first()[0]
    qty_item2=df.filter(df.item_code==item2).agg({"item_qty":"sum"}).first()[0]
    coef=float(qty_item2)/qty_item1
    return coef

def train(prod):

    spu=total_spu.filter(total_spu.prod_code == prod)
    print 'data length',spu.count(),time.strftime("%H:%M:%S")
    supp=0.1  
    conf=0.7
    sql_context.registerDataFrameAsTable(spu,'spu_table')
    sql_context.cacheTable('spu_table')
    print 'table register over', time.strftime("%H:%M:%S")

    trans_sets=spu.rdd.repartition(32).map(lambda x:(x[0],x[2])).groupByKey().mapvalues(list).values().cache()
    print 'trans group over',time.strftime("%H:%M:%S")

    model=FPGrowth.train(trans_sets,supp,10)
    print 'model train over',time.strftime("%H:%M:%S")

    model_f1=model.freqItemsets().filter(lambda x: len(x[0]==1))
    model_f2=model.freqItemsets().filter(lambda x: len(x[0]==2))

    #register model_f1 as dictionary
    model_f1_tuple=model_f1.map(lambda (U,V):(tuple(U)[0],V))
    model_f1Map=model_f1_tuple.collectAsMap()

    #convert model_f1Map to broadcast
    bc_model=sc.broadcast(model_f1Map)

    #generate association rules
    model_f2_conf=model_f2.map(lambda x:(x[0][0],x[0][1],float(x[1])/bc_model.value[x[0][0]],float(x[1]/bc_model.value[x[0][1]])))
    print 'conf calculation over',time.strftime("%H:%M:%S")

    model_f2_conf_flt=model_f2_conf.flatMap(lambda x: (x[0],x[1]))

    #filter the association rules by confidence threshold
    model_f2_conf_flt_ftr=model_f2_conf_flt.filter(lambda x: x[2]>=conf)

    #calculate the quantity coefficient for the filtered association rules
    #since we cannot use nested sql operations in rdd, I have to collect the rules to list first
    asso_list=model_f2_conf_flt_ftr.map(lambda x: list(x)).collect()
    print 'coef calculation over',time.strftime("%H:%M:%S")
    for row in asso_list:
        row.append(qty_coef(row[0],row[1]))

    #rewrite the list to dataframe
    asso_df=sql_context.createDataFrame(asso_list,['item1','item2','conf','coef'])
    sql_context.clearCache()
    path = "hdfs:/user/hive/wilber/%s"%(prod)
    asso_df.write.mode('overwrite').parquet(path)

if __name__ == '__main__':
    sc = SparkContext()
    sql_context=HiveContext(sc)
    prod_list=sc.textFile('hdfs:/user/hive/wilber/prod_list').collect()  
    total_spu=read_data()
    print 'spu read over',time.strftime("%H:%M:%S")
    for prod in list(prod_list):
        print 'prod',prod
        train(prod)
...