Следующий шаблон, который также обсуждается в этом SO Q & A , может быть хорошим для изучения в вашем случае использования.Одним из вопросов, который может быть проблемой, является размер вашего сжатого медленного потока.Надеюсь, что это полезно.
Для этого шаблона мы можем использовать исходное преобразование GenerateSequence, чтобы периодически выдавать значение, например, один раз в день.Передайте это значение в глобальное окно через управляемый данными триггер, который активируется для каждого элемента.В DoFn используйте этот процесс как триггер для извлечения данных из вашего ограниченного источника. Создайте свой SideInput для использования в нисходящих преобразованиях.
Важно отметить, что, поскольку в этом шаблоне используется глобальное окно SideInput, запускаемое во время обработкисоответствие элементам, обрабатываемым во время события, будет недетерминированным.Например, если у нас есть основной конвейер, который работает с окнами во время события, версия SideInput View, которую увидят эти окна, будет зависеть от последнего триггера, который сработал во время обработки, а не от времени события.
Также важно отметить, что в целом SideInput должен быть чем-то, что умещается в памяти.
Java (SDK 2.9.0):
В приведенном ниже примере боковой вход обновляется с очень короткими интервалами, это позволяет легко увидеть эффекты.Ожидается, что боковой ввод обновляется медленно, например, каждые несколько часов или один раз в день.
В приведенном ниже примере кода мы используем карту, которую мы создаем в DoFn, которая становится View.asSingleton, это рекомендуемый подход для этого шаблона.
Пример ниже иллюстрирует шаблон, обратите внимание, что View.asSingleton перестраивается при каждом обновлении счетчика.
Для вашего варианта использования вы можете заменить преобразования GenerateSequence
преобразованиями PubSubIO
.Это имеет смысл?
public static void main(String[] args) {
// Create pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);
// Using View.asSingleton, this pipeline uses a dummy external service as illustration.
// Run in debug mode to see the output
Pipeline p = Pipeline.create(options);
// Create slowly updating sideinput
PCollectionView<Map<String, String>> map = p
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
@ProcessElement public void process(@Element Long input,
OutputReceiver<Map<String, String>> o) {
// Do any external reads needed here...
// We will make use of our dummy external service.
// Every time this triggers, the complete map will be replaced with that read from
// the service.
o.output(DummyExternalService.readDummyData());
}
})).apply(View.asSingleton());
// ---- Consume slowly updating sideinput
// GenerateSequence is only used here to generate dummy data for this illustration.
// You would use your real source for example PubSubIO, KafkaIO etc...
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
@ProcessElement public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));
}
}).withSideInputs(map));
p.run();
}
public static class DummyExternalService {
public static Map<String, String> readDummyData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}