Google Dataflow - PubSub to BigQuery - высокая скорость, проблема с производительностью - PullRequest
0 голосов
/ 26 марта 2020

Я пытался найти ответ на свою проблему в Google и StackOverflow, но даже при наличии некоторых полезных сообщений я не смог найти один, который помог. Я решил попробовать здесь.

Я написал небольшой скрипт потока данных, используя Apache Beam Python SDK. Он не делает ничего особенного, просто получает данные из PubSub и отправляет их в BigQuery через потоковый API.

Задание выполняется. Все идет нормально. Но я нашел производительность несколько ниже моих объяснений. В PubSub у нас до 30000 событий в секунду, но даже при 300 рабочих (600 vCPU) ему удается обработать только половину событий. Единственный признак того, что что-то не так, - это время стены последнего шага (beam.io.gcp.bigquery.WriteToBigQuery), которое суммирует до «17 дней 2 часа», даже если задание выполняется только «22 мин 27 с * 1020» * "со 100 рабочими (200 vCPU).

Мой конвейер выглядит следующим образом:

        input_collection = (
            pipeline
            | 'R|Read PubSub Messages' >> ReadFromPubSub(topic=input_topic).with_output_types(bytes)
            | 'R|Convert to dict' >> beam.ParDo(SplitToDict())
    )

    run_collection = (
            input_collection
            | 'R|Prepare data' >> beam.ParDo(Data(label))
            | 'R|Write To BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
               table_name,
               schema=schema,
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
               create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
               method='STREAMING_INSERTS',
               insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR",
               batch_size=batch_size)
    )

Мой DoFn выглядит так:

class SplitToDict(beam.DoFn):
def process(self, elements):
    elements_list = json.loads(elements)
    yield elements_list


class Data(beam.DoFn):
def __init__(self, label):
    self.label = label

def process(self, elements):
    if self.label == 1:
        yield {
            "nameId": elements["nameId"],
            "timestamp": elements["timestamp"]
        } ...

I set" streaming = True "и использовать" --enable_streaming_engine "в качестве параметров конвейера. Как я уже сказал, конвейер работает, но не так быстро и легко, как ожидалось, учитывая, что потоковый API BigQuery должен обрабатывать 100000 вставок в секунду. или что я мог попробовать?

С уважением, Майкл

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...