Как создать поток фальшивых данных, поступающих в Apache Beam конвейер через определенные промежутки времени? - PullRequest
0 голосов
/ 18 марта 2020

Я пытаюсь создать небольшие Apache программы потоковой передачи Beam для проверки идей, и я думаю, что для меня проще всего было бы использовать структуры фреймворка, такие как Create.of, для создания поддельных данных. Таким образом, мне не придется настраивать больше, чем нужно, например, настраивать GCP Pub / Sub topi c в качестве источника и публиковать в нем.

Проблема в том, что я хочу попробовать вещи, которые основаны на времени, такие как управление окнами и работа с состоянием и таймерами. Я был в состоянии собрать это вместе:

public class TestPipeline {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        p.apply(Create.of(1, 2, 3))
            .apply(ParDo.of(new DoFn<Integer, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.output(c.element().toString());
                }
            }))
            .apply(TextIO.write().to("myfile.txt"));

        p.run().waitUntilFinish();
    }
}

Это завершает мою цель отправки трех частей данных в начале моего конвейера, но он отправляет их все сразу. Я бы предпочел, чтобы я мог настроить отправку каждого фрагмента данных каждые 10 секунд и т. Д. c.

Я следовал этому руководству с Apache Flink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/walkthroughs/datastream_api.html) который показывает пример того, что я пытаюсь выполнить sh. Я копался в коде этого учебного пособия, но не смог выяснить, какая именно часть инфраструктуры Flink сделала это возможным.

Ответы [ 2 ]

0 голосов
/ 14 апреля 2020

В итоге я использовал класс TestStream. Я обнаружил, что класс UnBoundedSource слишком сложен для моего варианта использования. Сообщение в блоге https://beam.apache.org/blog/2016/10/20/test-stream.html помогло мне понять, как использовать этот класс для моих тестов.

0 голосов
/ 19 марта 2020

Проверьте SyntheticBoundedSource и SyntheticUnboundedSource классы!

Они позволяют вам многое параметризовать при генерировании данных, начиная с размера ключа / значения, задержек между выбросами записи и т. П. c. Вы можете параметризовать их, используя опцию SyntheticSourceOptions, так что это также может быть хорошим местом для проверки возможных настроек.

...