Как оптимизировать вложенные данные для l oop, циклически перебирая данные json для извлечения значений определенных ключей в python - PullRequest
0 голосов
/ 25 февраля 2020

Я читаю файлы журнала в моем коде python, который содержит некоторые вложенные данные json. У меня есть вложенный для l oop, содержащий 4 цикла for, из которых значения определенных ключей извлекаются и добавляются к фрейму данных.

Вложенный для-l oop занимает слишком много времени, и я увидел, из других ответов, что многопроцессорная обработка - это путь к go для вложенных циклов for, но не удалось найти пример для данных json.

Каков наилучший подход для этого? Ниже приведен мой код для извлечения данных из файлов журнала и в кадры данных. recommendation_list - это список json объектов.

    for recommendation in recommendation_list:
    if recommendation['type'] == "httpRequest":
        session_id = recommendation['query'].split('sessionId=')[1].split('&')[0]
        category_id = recommendation['query'].split('categoryId=')[1].split('&')[0]
    if recommendation['type'] == "httpResponse":
        recommendation_count = recommendation_count + 1
        user_id = recommendation['userId']
        time_stamp = recommendation['ts']
        event_date = time_stamp.split("T")[0]
        time = time_stamp.split("T")[-1]
        try:
            product_list = json.loads(recommendation['body'])['products']
        except:
            product_list = []
        if len(product_list) > 0:
            for product in product_list:
                product_id = product["id"]
                if 'recommendationMeta' in product:
                    data_frame = data_frame.append({
                        "transaction_id": last_id,
                        "user_id": user_id,
                        "session_id": session_id,
                        "category_id": category_id,
                        "product_id": product_id,
                        "date": event_date,
                        "time": time[0:12],
                        "event": "recommendation",
                        "ab_bucket": "B",
                        "recommendation_count": recommendation_count,
                    }, ignore_index=True)

                    for learning_unit in product['recommendationMeta']:
                        lu_name = learning_unit['lu']
                        lu_value = learning_unit['value']
                        recommendation_mode = learning_unit['recommendationMode']
                        prod_def1 = products[(products["product_id"] == product_id) &
                                             (products["lu_value"].str.lower() == lu_value)]
                        if len(prod_def1) != 0:
                            product_list = prod_def1.to_dict('records')

                            for product_id in product_list:
                                category = categories[(categories["category_def_id"] == product_id["category_def_id"]) &
                                                      (categories["lu_name"].str.lower() == lu_name)]
                                if len(category) != 0:
                                    product_def_id = product_id['product_def_id']
                                    lu_df = lu_df.append({
                                        "lu_data_id": lu_id,
                                        "product_def_id": product_def_id,
                                        "transaction_id": last_id,
                                        "rec_mode": recommendation_mode,
                                    }, ignore_index=True)
                                    lu_id = lu_id+1
                    last_id = last_id + 1

Я считаю, что самый внутренний for-l oop выполняется чаще всего, и решил использовать для него многопроцессорность.

Я заменил

                                for product_id in product_list:
                                category = categories[(categories["category_def_id"] == product_id["category_def_id"]) &
                                                      (categories["lu_name"].str.lower() == lu_name)]
                                if len(category) != 0:
                                    product_def_id = product_id['product_def_id']
                                    lu_df = lu_df.append({
                                        "lu_data_id": lu_id,
                                        "product_def_id": product_def_id,
                                        "transaction_id": last_id,
                                        "rec_mode": recommendation_mode,
                                    }, ignore_index=True)
                                    lu_id = lu_id+1

на это ...

for product_id in product_list:
      pool = Pool()  # Create a multiprocessing Pool
      data = pool.starmap(create_lu_data, [last_id, categories, recommendation_mode,
      lu_name, lu_df, lu_id, product_id])
      lu_id = lu_id + 1
      p.close()
      print(data)

, где create_lu_data равно

def create_lu_data(last_id, categories, recommendation_mode, lu_name, lu_df, lu_id, product_id):

category = categories[(categories["category_def_id"] == product_id["category_def_id"]) &
                      (categories["lu_name"].str.lower() == lu_name)]
if len(category) != 0:
    product_def_id = product_id['product_def_id']
    lu_df = lu_df.append({
        "lu_data_id": lu_id,
        "product_def_id": product_def_id,
        "transaction_id": last_id,
        "rec_mode": recommendation_mode,
        }, ignore_index=True)
return lu_df

Я не получил никаких ошибок, но в выходном фрейме данных ожидаемое количество строк в несколько раз больше.

...