Что касается данных «на лету», одним из вариантов будет использование GenerateSequence, например:
pipeline.apply(GenerateSequence.from(0).withRate(RATE,Duration.millis(1000)))
Для создания других типов объектов вы можете использовать ParDo после использования Longи превратить его во что-то другое:
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
p.apply(GenerateSequence.from(0).withRate(2, Duration.millis(1000)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(FlatMapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(i -> IntStream.range(0,2).mapToObj(k -> KV.of(String.format("Gen Value %s" , i),String.format("FlatMap Value %s ", k))).collect(Collectors.toList())))
.apply(ParDo.of(new DoFn<KV<String,String>, String>() {
@ProcessElement
public void process(@Element KV<String,String> input){
LOG.info("Value was {}", input);
}
}));
p.run();
Это должно генерировать такие значения, как:
Value was KV{Gen Value 0, FlatMap Value 0 }
Value was KV{Gen Value 0, FlatMap Value 1 }
Value was KV{Gen Value 1, FlatMap Value 0 }
Value was KV{Gen Value 1, FlatMap Value 1 }
Value was KV{Gen Value 2, FlatMap Value 0 }
Value was KV{Gen Value 2, FlatMap Value 1 }
Некоторые другие вещи, которые следует учитывать при тестировании производительности конвейеров:
Прямой бегун предназначен для модульного тестирования, он делает классные вещи, такие как имитация сбоев, это помогает выявить проблемы, которые будут видны при запуске производственного конвейера.Однако он не предназначен для тестирования производительности.Я бы рекомендовал всегда использовать главный бегун для таких типов интеграционных тестов.
Обратите внимание на оптимизацию Fusion Ссылка на документы , при использовании искусственного источника данных, такого как GenerateSequence, вам может понадобиться сделать GBK в качестве следующего шага, чтобы разрешитьработа должна быть распараллелена.Для бегуна потока данных больше информации можно найти здесь: Ссылка на документы
В общем, для тестирования производительности, я бы рекомендовал тестировать весь сквозной конвейер.Существуют взаимодействия с источниками и приемниками (например, водяными знаками), которые не будут тестироваться в отдельном конвейере.
Надеюсь, что это поможет.