В частности, в отношении view.asMap
и накопления панелей обсуждение в комментариях:
Если вы хотите использовать боковой ввод View.asMap
(например, когда источником элементов карты являетсясам распределен - часто потому, что вы создаете побочный ввод из выходных данных предыдущего преобразования), есть некоторые другие факторы, которые необходимо учитывать: View.asMap
само является агрегацией, она унаследует триггеры и накопит свой ввод,В этом конкретном шаблоне установка конвейера в режим аккумулирования панелей перед этим преобразованием приведет к дублированию ошибок ключа, даже если преобразование, такое как Latest.perKey
, используется до преобразования View.asMap
.
Учитывая, что чтение обновляет всю карту, тогда я думаю, что использование View.asSingleton
будет лучшим подходом для этого варианта использования.
Некоторые общие замечания относительно этого шаблона, которые, будем надеяться, будут полезны и для других:
Для этого шаблона мы можем использовать исходное преобразование GenerateSequence
, чтобы периодически выдавать значение, например, один раздень.Передайте это значение в глобальное окно через управляемый данными триггер, который активируется для каждого элемента.В DoFn
используйте этот процесс в качестве триггера для извлечения данных из вашего ограниченного источника Create
вашего SideInput для использования в нисходящих преобразованиях.
Важно отметить, что, поскольку этот шаблон использует сторону глобального окназапуск входа во время обработки, сопоставление с элементами, обрабатываемыми во время события, будет недетерминированным.Например, если у нас есть основной конвейер, в котором отображается время события, версия SideInput View, которую увидят эти окна, будет зависеть от последнего триггера, который сработал во время обработки, а не от времени события.
Также важно отметить, что в общем случае боковой ввод должен быть чем-то, что умещается в памяти.
Java (SDK 2.9.0):
В приведенном ниже примере боковые входные данные обновляются с очень короткими интервалами, так что эффекты можно легко увидеть.Ожидается, что боковой ввод обновляется медленно, например, каждые несколько часов или один раз в день.
В приведенном ниже примере кода мы используем Map
, который мы создаем в DoFn
, который становится View.asSingleton, это рекомендуемый подход для этого шаблона.
Пример ниже иллюстрирует шаблон, обратите внимание, что View.asSingleton
перестраивается при каждом обновлении счетчика.
public static void main(String[] args) {
// Create pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);
// Using View.asSingleton, this pipeline uses a dummy external service as illustration.
// Run in debug mode to see the output
Pipeline p = Pipeline.create(options);
// Create slowly updating sideinput
PCollectionView<Map<String, String>> map = p
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
@ProcessElement public void process(@Element Long input,
OutputReceiver<Map<String, String>> o) {
// Do any external reads needed here...
// We will make use of our dummy external service.
// Every time this triggers, the complete map will be replaced with that read from
// the service.
o.output(DummyExternalService.readDummyData());
}
})).apply(View.asSingleton());
// ---- Consume slowly updating sideinput
// GenerateSequence is only used here to generate dummy data for this illustration.
// You would use your real source for example PubSubIO, KafkaIO etc...
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
@ProcessElement public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));
}
}).withSideInputs(map));
p.run();
}
public static class DummyExternalService {
public static Map<String, String> readDummyData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}