Получение ошибки «Клиенты имеют нетривиальное состояние, которое является локальным и необратимым». при использовании клиентской библиотеки BigQuery в python? - PullRequest
1 голос
/ 16 января 2020

Я разрабатываю облачную функцию GCP, которая будет выполнять запрос к таблице BigQuery, загружать данные во некоторую временную таблицу, а затем эти данные могут использоваться другими буквами облачных функций из этой временной таблицы.

В своем запросе я использую одну таблицу и хочу проверить существование этой таблицы, прежде чем выполнить свой запрос в облачной функции. Для этого я написал две попытки, кроме блоков, первая проверит существование таблицы. Если таблица еще не создана, она создаст эту таблицу с предоставленным запросом create_table. После этого следует выполнить вторую попытку кроме блока, но здесь я получаю ошибку. первая попытка кроме блока создает таблицу, если она еще не создана, но после этого ей не удается выполнить вторую попытку, кроме блока с ошибкой:

"" Клиенты имеют нетривиальное состояние, которое local и unpickleable. ", _pickle.PicklingError: Выбор объектов клиента явно не поддерживается. Клиенты имеют нетривиальное состояние, которое является локальным и не может быть выбрано. "

На данный момент я пробовал ниже код в python:

def main(request):
    my_client = bigquery.Client()
    create_table = 'CREATE OR REPLACE TABLE {}.{} (customerName STRING, IDNumber STRING)'.format(dest_dataset, dest_table)
    job_configs = bigquery.QueryJobConfig()
    destination_dataset = my_client.dataset(dest_dataset, dest_project)
    destination_table = destination_dataset.table(dest_table)
    job_configs.destination = destination_table

    # Check the final destination table is already exists, if not then create it based on create_table query.

    try:
        table = my_client.get_table(destination_table)
        if table:
            print('Table {}\'s existence sucessfully proved!'.format(destination_table))

    except NotFound as error:
        temporary_table = my_client.query(create_table, location='US')
        temporary_table.result()
        print('Table {} is created!'.format(destination_table))


    # After checking destination_table existance, run actual query (destination_table being used in this query) and load data into temporary table
    try:

        client = bigquery.Client()
        job_config = bigquery.QueryJobConfig()
        dest_dataset = client.dataset(temporary_dataset, temporary_project)
        dest_table = dest_dataset.table(temporary_table)
        job_config.destination = dest_table
        job_config.create_disposition = 'CREATE_IF_NEEDED'
        job_config.write_disposition = 'WRITE_TRUNCATE'
        query_job = client.query(query, location='US', job_config=job_config)
        query_job.result()
        table = client.get_table(dest_table)
        expiration = (datetime.now() + timedelta(minutes=expiration_time))
        table.expires = expiration
        table = client.update_table(table, ["expires"])
        logger.info("Query result loaded into temporary table: {}".format(temporary_table))

    except RuntimeError:
        logger.error("Exception occurred {}".format(RuntimeError))

Есть ли способ исправить эту ошибку или любой другой подход для проверки существования таблицы

1 Ответ

0 голосов
/ 17 января 2020

Я нашел обходной путь, который решил мою проблему. Я пытаюсь написать две отдельные функции для каждой попытки, кроме блока, как показано ниже:

def check_dest_table_existence(request):
    my_client = bigquery.Client()
    create_table = 'CREATE OR REPLACE TABLE {}.{} (customerName STRING, IDNumber STRING)'.format(dest_dataset, dest_table)
    job_configs = bigquery.QueryJobConfig()
    destination_dataset = my_client.dataset(dest_dataset, dest_project)
    destination_table = destination_dataset.table(dest_table)
    job_configs.destination = destination_table

    # Check the final destination table is already exists, if not then create it based on create_table query.

    try:
        table = my_client.get_table(destination_table)
        if table:
            print('Table {}\'s existence sucessfully proved!'.format(destination_table))

    except NotFound as error:
        temporary_table = my_client.query(create_table, location='US')
        temporary_table.result()
        print('Table {} is created!'.format(destination_table))



def main(request):

    # Check destination_table existance, run actual query (destination_table being used in this query) and load data into temporary table
    try:

        # Calling another function to check existence of destination table.
        check_dest_table_existence(request)

        client = bigquery.Client()
        job_config = bigquery.QueryJobConfig()
        dest_dataset = client.dataset(temporary_dataset, temporary_project)
        dest_table = dest_dataset.table(temporary_table)
        job_config.destination = dest_table
        job_config.create_disposition = 'CREATE_IF_NEEDED'
        job_config.write_disposition = 'WRITE_TRUNCATE'
        query_job = client.query(query, location='US', job_config=job_config)
        query_job.result()
        table = client.get_table(dest_table)
        expiration = (datetime.now() + timedelta(minutes=expiration_time))
        table.expires = expiration
        table = client.update_table(table, ["expires"])
        logger.info("Query result loaded into temporary table: {}".format(temporary_table))

    except RuntimeError:
        logger.error("Exception occurred {}".format(RuntimeError))

...