как ускорить вычисления для pyspark - PullRequest
0 голосов
/ 22 мая 2018

Исходными данными являются журналы событий с устройства, и все данные имеют формат json, образец необработанных данных json

{"sn": "123", "ip": null, "evt_name": "client_requestData", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "music", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350052, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData2", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350053, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData3", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "video", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350054, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData4", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}

У меня есть список событий, например: tar_task_list, около 100 и более элементов, и дляКаждое событие мне нужно агрегировать все событие из необработанных данных, а затем сохранить его в CSV-файл события

Ниже код

#read source data
raw_data = sc.textFile("s3://xxx").map(lambda x:json.loads(x))
# TODO: NEED TO SPEED UP THIS COMPUTING
for tar_evt_name in evts:
    print("...")
    table_name = out_table_prefix + tar_evt_name
    evt_one_rdd = raw_data.filter(lambda x: x.get("evt_name") == tar_evt_name)
    evt_one_rdd.cache()
    evt_one_dict = evt_one_rdd.first()
    Evt_one = Row(*sorted(['{}'.format(k) for k, v in evt_one_dict.items()]))

    col_len = len(evt_one_rdd.first())
    evt_one_rdd2 = evt_one_rdd.map(lambda x: to_list(x, col_len)).filter(lambda x: len(x) is not 0)
    evt_one_rdd2.cache()
    df = spark.createDataFrame(evt_one_rdd2.map(lambda x: Evt_one(*x)))
    out_csv_path = output + '/' + tar_evt_name+'/'# add last '/' for copy err
    df.write.csv(out_csv_path, mode='overwrite', header=True,sep='|',nullValue="NULL")

выходные данные, как показано ниже: время: 2018-05-07 00: 03 | 8dab4796-fa37-4114-0011-7637fa2b0001 | f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759 | 0.2.23 | 131074 | 2018-05-08 23: 24: 25 | 0 | false | по умолчанию | 2.4.130

1 Ответ

0 голосов
/ 23 мая 2018

Вот моя попытка,

Я заметил несколько проблем здесь,

  • for tar_evt_name in evts - это собственный цикл Python for, который влечет за собой снижение производительности, когда кажется, чтовы хотите создать группу по операции;
  • .cache() используется, но, по-видимому, без причины;
  • Не уверен, что такое to_list;
  • Донне думаю evt_one_rdd2.map(lambda x: Evt_one(*x))) работает;

    import json
    from pyspark.sql import functions as F
    from pyspark.sql import Row
    from pyspark.sql import Window
    
    raw_data = sc.textFile('test.txt')
    
    df = raw_data.map(
        # Map the raw input to python dict using `json.loads`
        json.loads,
    ).map(
        # Duplicate the evt_name and evt_ts for later use in a Row object
        lambda x: Row(evt_name=x['evt_name'], evt_ts=x.get('evt_ts', 1), data=x),
    ).toDF()  # Convert into a dataframe...
    # ... (I am actually unusre if this is faster...
    # ... but I am more comfortable with this)
    
    filtered_df = df.withColumn(
        # NOTE: Assumed you want the first row, as you used `evt_one_rdd.first()`.
        #       So we assign a row number (named rn) and then filter on rn = 1.
        #       Here the evt_name and evt_ts becomes handy, you might want to set
        #       your own evt_ts properly.
        'rn', F.row_number().over(
            Window.partitionBy(df['evt_name']).orderBy(df['evt_ts'])
        ),
    ).filter('rn = 1').where(
        # NOTE: Since you used `map(lambda x: to_list(x, col_len)).filter(lambda x: len(x) is not 0)`,
        #       I assume you meant data should have more than 0 keys,
        #       but this should be almost always true?
        #       Since you are grouping by `evt_name`, which means
        #       there is at least that key most of the time.
        F.size(F.col('data')) > 0
    )
    
    filtered_df.write(....)
    
...