Как пакетные потоковые вставки в BigQuery из задания Beam - PullRequest
0 голосов
/ 09 марта 2019

Я пишу в BigQuery в лучшем виде из неограниченного источника. Я использую СТРИМОВЫЕ ВСТАВКИ в качестве метода. Я искал, как регулировать строки в BigQuery, основываясь на рекомендациях

https://cloud.google.com/bigquery/quotas#streaming_inserts

API-интерфейс BigQueryIO.Write не позволяет устанавливать микропартии.

Я смотрел на использование триггеров, но не уверен, что BigQuery группирует все в панели в запросе. Я настроил триггер, как показано ниже

    Window.<Long>into(new GlobalWindows())
    .triggering(
        Repeatedly.forever(
            AfterFirst.of(
                AfterPane.elementCountAtLeast(5),
                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2)))
            ))
    .discardingFiredPanes());

Q1. Поддерживает ли Beam микропакеты или создает один запрос для каждого элемента в PCollection?

Q2. Если вышеуказанный триггер имеет смысл? Даже если бы я установил окно / триггер, он мог бы отправлять один запрос на каждый элемент.

Ответы [ 2 ]

0 голосов
/ 10 марта 2019

У меня была такая же проблема, я создал GAE, который запускает задание DataFlow.в любое время с другими параметрами, в моем случае, каждый раз, когда я отправляю другой диапазон дат.И затем я планирую это с работой CRON.

    from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials=credentials)

# Set the following variables to your values.
JOBNAME = '[JOB_NAME]'
PROJECT = '[YOUR_PROJECT_ID]'
BUCKET = '[YOUR_BUCKET_NAME]'
TEMPLATE = '[YOUR_TEMPLATE_NAME]'

GCSPATH="gs://{bucket}/templates/{template}".format(bucket=BUCKET, template=TEMPLATE)
BODY = {
    "jobName": "{jobname}".format(jobname=JOBNAME),
    "parameters": {
        "inputFile" : "gs://{bucket}/input/my_input.txt",
        "outputFile": "gs://{bucket}/output/my_output".format(bucket=BUCKET)
     },
     "environment": {
        "tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
        "zone": "us-central1-f"
     }
}

request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()

print(response)

app.yaml

runtime: python37
service: run-data-flow

cron.yaml

cron:
- description: "copy history daily Job"
url: /run
target: run-data-flow
schedule: every 1 hours from 9:00 to 23:00
retry_parameters:
  min_backoff_seconds: 3
  max_doublings: 5
0 голосов
/ 10 марта 2019

Я не знаю, что вы подразумеваете под микропартией.На мой взгляд, BigQuery поддерживает загрузку данных либо в пакетном режиме, либо в потоковом режиме.

По сути, пакетные загрузки подчиняются квотам, а потоковые загрузки немного дороже.

После настройки метод вставки для вашей BigQueryIO документации гласит:

Note: If you use batch loads in a streaming pipeline, you must use withTriggeringFrequency to specify a triggering frequency.

Никогда не пробовал, но withTriggeringFrequency, похоже, то, что вам нужно здесь.

...