Выполнение большого запроса для API BigQuery с python - PullRequest
0 голосов
/ 19 февраля 2019

Я пишу функцию Python, которая выполняет запрос к таблицам BigQuery и записывает результат в другую таблицу. Вот моя функция с небольшим запросом:

def generate_user_pit(days_ago):
    ### Generate a user facts table for x days ago and save it into BigQuery
    days_ago = days_ago
    query = """
    SELECT
    Date,
    sum(totals.visits)
    FROM
      `project.dataset.ga_sessions_20*` AS t
    WHERE
      parse_DATE('%y%m%d',
        _table_suffix) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL @days_ago day)
      AND DATE_SUB(CURRENT_DATE(), INTERVAL @days_ago day)
    GROUP BY Date
    """
    query_params = [
    bigquery.ScalarQueryParameter('days_ago', 'INT64', days_ago),
    ]

    job_config = bigquery.QueryJobConfig()
    job_config.query_parameters = query_params
    job_config.write_disposition = 'WRITE_TRUNCATE'


    print("Generating dataframe...")
    df = client.query(query, job_config=job_config).to_dataframe()

    date = df['Date'].max()
    table_name = "data_"
    complete_table_name = table_name + date

    dataset_ref = client.dataset('test')
    table_ref = dataset_ref.table(complete_table_name)

    print("Writing", complete_table_name, "to BigQuery...")
    client.load_table_from_dataframe(df, table_ref).result()

    print("Done!")

Это работает, как и ожидалось, но когдаЯ запускаю его с реальным запросом, который требует гораздо больше памяти, я нажимаю Memory Error.

Могу ли я что-нибудь сделать, чтобы улучшить эту функцию и сделать ее более эффективной?Например, мне не нужен запрос, записанный на фрейм данных, это просто способ, которым я смог заставить его работать сейчас.

Вот мой запрос, которыйвозвращение максимального объема памяти:

WITH Q1 AS(
    SELECT
      customDimension.value AS UserID,
      # Visits from this individual
      COUNT(DISTINCT CONCAT(CAST(fullVisitorId AS STRING),CAST(visitId AS STRING))) AS visits,
      # Orders from this individual
      COUNT(DISTINCT hits.transaction.transactionId) AS orders,
      # COUNT(DISTINCT hits.transaction.transactionId)/COUNT(DISTINCT VisitId) AS frequency,
      # Conversion rate of the individual
      SAFE_DIVIDE(COUNT(DISTINCT hits.transaction.transactionId),
        COUNT(DISTINCT CONCAT(CAST(fullVisitorId AS STRING),CAST(visitId AS STRING)))) AS conversion_rate,

       # first visit date
      MIN(Date) AS first_visit_date,

      # last visit date
      MAX(DATE) AS last_visit_date

    FROM
      `project.dataset.ga_sessions_20*` AS t
    CROSS JOIN
      UNNEST (hits) AS hits
    CROSS JOIN
      UNNEST(t.customdimensions) AS customDimension
    CROSS JOIN
      UNNEST(hits.product) AS hits_product
    WHERE
      parse_DATE('%y%m%d',
        _table_suffix) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 999 day)
      AND DATE_SUB(CURRENT_DATE(), INTERVAL @days_ago day)
      AND customDimension.index = 2
      AND customDimension.value NOT LIKE "true"
      AND customDimension.value NOT LIKE "false"
      AND customDimension.value NOT LIKE "undefined"
      AND customDimension.value IS NOT NULL
    GROUP BY
      UserID
    ),
    Q2 AS(
    SELECT
      customDimension.value AS UserID,
      IFNULL(SUM(totals.bounces),0) AS bounces,
      IFNULL(SUM(totals.transactionRevenue)/1000000,0) AS revenue

    FROM
      `project.dataset.ga_sessions_20*` AS t

    CROSS JOIN
      UNNEST(t.customdimensions) AS customDimension

        WHERE parse_date('%y%m%d', _table_suffix) between 
    DATE_sub(current_date(), interval 999 day) and
    DATE_sub(current_date(), interval @days_ago day)

      AND customDimension.index = 2
      AND customDimension.value NOT LIKE "true"
      AND customDimension.value NOT LIKE "false"
      AND customDimension.value NOT LIKE "undefined"
      AND customDimension.value IS NOT NULL
    GROUP BY
      UserID
    ),
    Q3 AS(
    SELECT customDimension.value AS UserID, MAX(Date) AS last_order
    FROM `project.dataset.ga_sessions_*` AS t
      CROSS JOIN UNNEST(t.customdimensions) AS customDimension
    WHERE totals.transactions > 0
    AND customDimension.index = 2
    GROUP BY UserID
    ),
    Q4 AS(
    SELECT customDimension.value AS UserID, MIN(Date) AS first_order
    FROM `project.dataset.ga_sessions_*` AS t
      CROSS JOIN UNNEST(t.customdimensions) AS customDimension
    WHERE totals.transactions > 0
    AND customDimension.index = 2
    GROUP BY UserID
    )

    SELECT a.UserID AS UserID,
    SUM(visits) AS visits,
    IFNULL(SUM(orders),0) AS orders,
    IFNULL(MIN(conversion_rate),0) AS conversion_rate,
    IFNULL(SUM(bounces),0) AS bounces,
    IFNULL(SUM(revenue),0) AS revenue,
    IFNULL(SAFE_DIVIDE(SUM(revenue),SUM(orders)),0) AS AOV,
    IFNULL(SAFE_DIVIDE(SUM(revenue),SUM(visits)),0) AS rev_per_visit,
    IFNULL(SAFE_DIVIDE(SUM(bounces),SUM(visits)),0) AS bounce_rat

    FROM Q1 AS a
    LEFT JOIN Q2 AS b
    USING (UserID)

    LEFT JOIN Q4 AS f
    USING (UserID)

    LEFT JOIN Q3 AS l
    USING (UserID)

    GROUP BY UserID

1 Ответ

0 голосов
/ 19 февраля 2019

Вы можете попробовать по крайней мере эти варианты: - записать результаты запроса во внешний файл и затем экспортировать - использовать запрос с подкачкой результатов и выполнить в цикле. Что касается оптимизации Python, вы можете попробовать, но если результаты больше памятив системе вам придется рано или поздно попробовать другой подход.Все зависит от того, как извлеченные данные будут использоваться, но в вашем случае, я думаю, запрос с подкачкой будет лучшим подходом.Посмотрите на этот ответ о разбивке по страницам: Каков наилучший способ разбить результаты на страницы в SQL Server

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...