Исходными данными являются журналы событий с устройства, и все данные имеют формат json, образец необработанных данных json
{"sn": "123", "ip": null, "evt_name": "client_requestData", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "music", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350052, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData2", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350053, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData3", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "video", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}, "evt_ts": 1521350054, "app_key": "f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759", "sdk_name": "countlysdk_0.0.9", "sdk_version": "17.05"}
{"sn": "123", "ip": null, "evt_name": "client_requestData4", "evt_content": {"count": 1, "hour": 13, "dow": 0, "segmentation": {"requestService": "fm", "requestData": "is_online", "requestOpcode": "get_state"}, "sum": 0}
У меня есть список событий, например: tar_task_list, около 100 и более элементов, и дляКаждое событие мне нужно агрегировать все событие из необработанных данных, а затем сохранить его в CSV-файл события
Ниже код
#read source data
raw_data = sc.textFile("s3://xxx").map(lambda x:json.loads(x))
# TODO: NEED TO SPEED UP THIS COMPUTING
for tar_evt_name in evts:
print("...")
table_name = out_table_prefix + tar_evt_name
evt_one_rdd = raw_data.filter(lambda x: x.get("evt_name") == tar_evt_name)
evt_one_rdd.cache()
evt_one_dict = evt_one_rdd.first()
Evt_one = Row(*sorted(['{}'.format(k) for k, v in evt_one_dict.items()]))
col_len = len(evt_one_rdd.first())
evt_one_rdd2 = evt_one_rdd.map(lambda x: to_list(x, col_len)).filter(lambda x: len(x) is not 0)
evt_one_rdd2.cache()
df = spark.createDataFrame(evt_one_rdd2.map(lambda x: Evt_one(*x)))
out_csv_path = output + '/' + tar_evt_name+'/'# add last '/' for copy err
df.write.csv(out_csv_path, mode='overwrite', header=True,sep='|',nullValue="NULL")
выходные данные, как показано ниже: время: 2018-05-07 00: 03 | 8dab4796-fa37-4114-0011-7637fa2b0001 | f6e7f4f8ec4b4d6dae6fa2b5ed8f90cb6a640759 | 0.2.23 | 131074 | 2018-05-08 23: 24: 25 | 0 | false | по умолчанию | 2.4.130