Пример потоковой передачи Flink, которая генерирует собственные данные - PullRequest
0 голосов
/ 27 апреля 2020

Ранее я спрашивал о простом примере hello world для Flink. Это дало мне несколько хороших примеров!

Однако я хотел бы попросить более «потоковый» пример, где мы генерируем входное значение каждую секунду. В идеале это было бы случайным, но даже одно и то же значение каждый раз было бы хорошо.

Цель состоит в том, чтобы получить поток, который «движется» без / минимального внешнего прикосновения.

Следовательно, мой вопрос:

Как показать, что Flink действительно передает данные без внешних зависимостей?

Я нашел, как показать это, генерируя данные извне и записывая в Kafka, или слушая исходный c источник, однако я пытаюсь решить ее с минимальной зависимостью (например, начиная с GenerateFlowFile в Nifi).

1 Ответ

1 голос
/ 27 апреля 2020

Вот пример. Это было сделано как пример того, как сделать ваши источники и приемники подключаемыми. Идея заключалась в том, что в процессе разработки вы можете использовать случайный источник и печатать результаты, для тестов вы можете использовать аппаратный список входных событий и собирать результаты в список, а в производстве вы будете использовать реальные источники и приемники.

Вот работа:

/*
 * Example showing how to make sources and sinks pluggable in your application code so
 * you can inject special test sources and test sinks in your tests.
 */

public class TestableStreamingJob {
    private SourceFunction<Long> source;
    private SinkFunction<Long> sink;

    public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
        this.source = source;
        this.sink = sink;
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> LongStream =
                env.addSource(source)
                        .returns(TypeInformation.of(Long.class));

        LongStream
                .map(new IncrementMapFunction())
                .addSink(sink);

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
        job.execute();
    }

    // While it's tempting for something this simple, avoid using anonymous classes or lambdas
    // for any business logic you might want to unit test.
    public class IncrementMapFunction implements MapFunction<Long, Long> {

        @Override
        public Long map(Long record) throws Exception {
            return record + 1 ;
        }
    }

}

Вот эта RandomLongSource:

public class RandomLongSource extends RichParallelSourceFunction<Long> {

    private volatile boolean cancelled = false;
    private Random random;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        random = new Random();
    }

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (!cancelled) {
            Long nextLong = random.nextLong();
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(nextLong);
            }
        }
    }

    @Override
    public void cancel() {
        cancelled = true;
    }
}
...