Я пытался найти ответ на свою проблему в Google и StackOverflow, но даже при наличии некоторых полезных сообщений я не смог найти один, который помог. Я решил попробовать здесь.
Я написал небольшой скрипт потока данных, используя Apache Beam Python SDK. Он не делает ничего особенного, просто получает данные из PubSub и отправляет их в BigQuery через потоковый API.
Задание выполняется. Все идет нормально. Но я нашел производительность несколько ниже моих объяснений. В PubSub у нас до 30000 событий в секунду, но даже при 300 рабочих (600 vCPU) ему удается обработать только половину событий. Единственный признак того, что что-то не так, - это время стены последнего шага (beam.io.gcp.bigquery.WriteToBigQuery), которое суммирует до «17 дней 2 часа», даже если задание выполняется только «22 мин 27 с * 1020» * "со 100 рабочими (200 vCPU).
Мой конвейер выглядит следующим образом:
input_collection = (
pipeline
| 'R|Read PubSub Messages' >> ReadFromPubSub(topic=input_topic).with_output_types(bytes)
| 'R|Convert to dict' >> beam.ParDo(SplitToDict())
)
run_collection = (
input_collection
| 'R|Prepare data' >> beam.ParDo(Data(label))
| 'R|Write To BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table_name,
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
method='STREAMING_INSERTS',
insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR",
batch_size=batch_size)
)
Мой DoFn выглядит так:
class SplitToDict(beam.DoFn):
def process(self, elements):
elements_list = json.loads(elements)
yield elements_list
class Data(beam.DoFn):
def __init__(self, label):
self.label = label
def process(self, elements):
if self.label == 1:
yield {
"nameId": elements["nameId"],
"timestamp": elements["timestamp"]
} ...
I set" streaming = True "и использовать" --enable_streaming_engine "в качестве параметров конвейера. Как я уже сказал, конвейер работает, но не так быстро и легко, как ожидалось, учитывая, что потоковый API BigQuery должен обрабатывать 100000 вставок в секунду. или что я мог попробовать?
С уважением, Майкл