В конвейере Beam существуют различные опции для чтения и записи во внешние источники данных.Наиболее распространенным методом является использование встроенных приемников и источников, созданных сообществом Beam (встроенные преобразования ввода / вывода).Этим разъемам часто приходилось тратить значительные усилия на разработку и они подвергались промышленной закалке.Например, BigQueryIO использовался в производстве в течение многих лет, и в течение этого периода непрерывно развивался.Поэтому общий совет заключается в том, чтобы по возможности использовать стандартные мойки и источники.
Однако не все взаимодействия с внешними источниками данных должны осуществляться через источники и приемники, существуют случаи, когда вручную созданный обмен данными от DoFn к внешнему источнику является правильным путем.Несколько примеров ниже (их, конечно, больше!);
- Нет источника / источника для источника данных, или есть источник, но он еще не поддерживает все переключатели / режимы и т.д. длятвои нужды.Конечно, вы всегда можете усовершенствовать существующий Sink / Source или, если он не существует, создать новый разъем ввода-вывода с нуля, и, если возможно, было бы здорово внести это обратно в сообщество:)
- Вы являетесьобогащение элементов, проходящих через ваш потоковый конвейер, небольшим подмножеством данных из большого набора данных.Например, предположим, что ваши события обработки поступают из заказа на продажу, и вы хотите добавить информацию для каждого элемента.Информация о товаре живет в большом мультибайтном магазине, но в среднем вы получите доступ к небольшому проценту данных в качестве ключей поиска.В этом примере имеет смысл обогащать каждый элемент путем внешнего вызова хранилища данных в DoFn.Вместо чтения всех данных в качестве источника и выполнения операции соединения в конвейере.
Дополнительные примечания / подсказки:
При вызове внешних систем имейте в виду, что ApacheBeam предназначен для распределения работы по многим потокам, это может создать значительную нагрузку на ваш внешний источник данных, вы часто можете уменьшить эту нагрузку, используя аннотации начального и конечного пакетов;
Java (SDK 2.9.0)
- DoFn.StartBundle
- DoFn.FinishBundle
Python (SDK 2.9.0)
- start_bundle ()
- finish_bundle ()