Поток потоков данных Google вставляет в BigQuery ограничения скорости - PullRequest
0 голосов
/ 25 февраля 2020

Я пытаюсь использовать потоковую обработку потока данных для вставки записей в BigQuery, используя Python. Измененные файлы в хранилище читаются из PubSub, затем файлы читаются, преобразуются и вставляются в BigQuery.

Однако, когда конвейер обрабатывает от 100 до 200 элементов / с c, я получаю такие ошибки, как ниже этого я превышаю ограничение скорости и ссылаюсь на эту страницу . Иногда в ошибках упоминается квота tabledata.list, равная 500 / с c.

Я не понимаю, почему я вообще вижу сообщения об этих квотах, поскольку квота на потоковые вставки для BigQuery равно 1,000,000 / se c.

> [while running 'generatedPtransform-52321']

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:332)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -52327: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
    schema)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
    additional_create_parameters=self.additional_bq_parameters)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
    found_table = self.get_table(project_id, dataset_id, table_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
    response = self.client.tables.Get(request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
    config, request, global_params=global_params)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 403,
    "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
    "errors": [
      {
        "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
        "domain": "usageLimits",
        "reason": "rateLimitExceeded"
      }
    ],
    "status": "PERMISSION_DENIED"
  }
}
>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 167, in _execute
    response = task()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 223, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 352, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in process_bundle
    data.transform_id].process_encoded(data.data)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 205, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 302, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 304, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 956, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
    schema)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
    additional_create_parameters=self.additional_bq_parameters)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
    found_table = self.get_table(project_id, dataset_id, table_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
    response = self.client.tables.Get(request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
    config, request, global_params=global_params)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)
RuntimeError: apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 403,
    "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
    "errors": [
      {
        "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
        "domain": "usageLimits",
        "reason": "rateLimitExceeded"
      }
    ],
    "status": "PERMISSION_DENIED"
  }
}

Код, который я использую ниже:

    files = (
        p
        | "read PubSub"
        >> beam.io.ReadFromPubSub(
            topic=known_args.input_topic, with_attributes=True, id_label=id_label
        )
        | "decode message" >> beam.Map(lambda pubsub_msg: json.loads(pubsub_msg.data))
        | "filter buckets with unknown encodings"
        >> beam.Filter(no_encoding_bucket_filter, encodings)
        | "get file from bucket" >> beam.ParDo(GetFileFromBucket())
    )

    policies = (
        files
        | f"filter for policies"
        >> beam.Filter(lambda msg: 'policies' in msg["bucket"])
        | f"encode policies"
        >> beam.Map(apply_encoding, encodings['policies'], 'policies')
        | f"filter out policies that failed to encode"
        >> beam.Filter(lambda item: True if item is not None else False)
        | f"insert policies to BigQuery"
        >> beam.io.WriteToBigQuery(
            project=project_id,
            table="service_policy_policies",
            dataset="mongo_landing_zone",
            insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR",
        )
    )

beam.io.WriteToBigQuery() работает с потоковыми данными, но из-за ошибок, которые я подозреваю он инициализирует или получает таблицу BigQuery как объект для каждого обрабатываемого элемента, а не просто вставляет строку. Неправильно ли я его использую?




Обновление 2020-03-11

Мне удалось улучшить, но не разрешить ситуацию. Я переключился с использования beam.io.WriteToBigQuery на написание собственного класса с именем WriteToBigQueryCustom, чтобы сделать то же самое. Я все еще получаю ошибки, но только при пропускной способности 500 / с c или выше.

Обновленный код:

class WriteToBigQueryCustom(beam.DoFn):
    """
    Stream insert records into a BigQuery table. Intended to work the same way you'd
    expect beam.io.WriteToBigQuery to work for streaming.

    Even though beam.io.WriteToBigQuery supports streaming, it seemed to be
    initialising the BigQuery connection for every element processed. Was
    getting throttled and causing errors about hitting BQ api limits at throughput of
    100 elements/sec when the streaming inserts limit is 1,000,000/sec.
    """

    def __init__(self, project_id, dataset, table_name):
        self.project_id = project_id
        self.dataset = dataset
        self.table_name = table_name
        self.table_id = f"{project_id}.{dataset}.{table_name}"

    def start_bundle(self):
        self.bq_client = bigquery.Client()
        self.table = self.bq_client.get_table(self.table_id)

    def process(self, dict_to_insert):
        """Insert a dict to the classes BigQuery table"""
        errors = self.bq_client.insert_rows(self.table, [dict_to_insert])
        if errors:
            logging.error(
                f"Hit error uploading row to bigquery table {self.table_id}: "
                f"{errors}. Was trying to insert dict: {dict_to_insert}"
            )

1 Ответ

1 голос
/ 11 марта 2020

Я столкнулся с теми же проблемами, что и вы, когда запускал аналогичный конвейер. Кажется, в Python / Beam SDK есть какая-то ошибка.

https://issues.apache.org/jira/browse/BEAM-6831

Добавление create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER помогло в моем случае .

С уважением, Майкл

...