Правильный способ обработки больших объемов сообщений в шаблоне потока данных Google из Pub / Sub в BigQuery. - PullRequest
0 голосов
/ 25 мая 2020

Как указано в заголовке, мы используем поток данных для отправки сообщений из PubSub в BigQuery с использованием стандартного шаблона. Ошибка потока данных выглядит так:

Превышен максимально допустимый размер строки. Разрешено: 1048576 Размер строки: 2148312

И да, я знаю, что все дело в ограничениях потоковой вставки.

Время от времени мы видим сообщения большого объема. Несмотря на то, что в общем, большинство из них далеко не такие большие, и мы можем их потерять, но мы хотели бы знать о них. Есть ли способ сделать это? В результате мы хотели бы получить строку, хранящуюся в таблице недействительных писем, и усеченная строка могла бы работать.

Я попытался переписать функцию UDF, чтобы она возвращала пустой результат, если полученное сообщение слишком велико, но, как и ожидалось, оно просто потеряно, и то же самое происходит с любой информацией, если это происходит, что может быть вредным для нас, если станет распространенной проблемой.

Выдача ошибки также не работает, потому что ошибка все еще существует, но в другой ячейке. Более того, поток данных постоянно пытается вставить запись. Итак, для 2-3 таких сообщений мы получили примерно> 300 рабочих ошибок.

Что можно сделать, чтобы сохранить некоторые данные об этом, но при этом избавиться от ошибки?

Ответы [ 2 ]

2 голосов
/ 25 мая 2020

Вместо использования шаблона потока данных вы можете написать простое задание и позаботиться об обработке ошибок в соответствии с вашими требованиями.

Вы можете использовать коннекторы OOB для написания своей работы.

1. PubSubIO.Read - для чтения сообщений

2. BigQuery.Write - для записи в большой запрос. Он возвращает WriteResult и использует getFailedInserts для получения PCollection неудачных записей. Затем вы можете вставить эти записи в свою таблицу ошибок или обработать ее соответствующим образом.

В качестве альтернативы, для пункта 2 мы также могли бы написать собственный PTransform и использовать для записи клиентские библиотеки BigQuery. Это обеспечит более детальный уровень контроля для обработки исключений. Однако по возможности старайтесь использовать BigQueryIO только в том случае, если он оптимизирован и протестирован.

1 голос
/ 25 мая 2020

Вы не можете записать эти данные в BigQuery в потоковом режиме. Итак, у вас есть 2 решения

  • Записывать слишком большое сообщение в файлы на GCS и периодически загружать их в BigQuery
  • Записывать слишком большое сообщение в PubSub и периодически запускать функции (или Cloud Run) в flu sh очередь PubSub и создайте задание загрузки в BigQuery.

В обоих случаях я упоминаю «периодически», потому что вы ограничены 1000 загрузками заданий в день и если вы выполняйте загрузку для каждого слишком большого сообщения, возможно, вы достигнете этого предела (вы должны оценить риск этого, это всего лишь совет).

Если у вас много больших сообщений, постарайтесь не нарушить ограничение памяти Cloud Function / Cloud Run, когда вы грипп sh ваши сообщения PubSub.

Для экономии времени и ресурсов, Я предпочитаю фильтровать размеры сообщения перед отправкой в ​​BigQuery. Однако решение @Nirley, о котором я не знал (спасибо !! +1), также может работать.

...