Вот пример. Это было сделано как пример того, как сделать ваши источники и приемники подключаемыми. Идея заключалась в том, что в процессе разработки вы можете использовать случайный источник и печатать результаты, для тестов вы можете использовать аппаратный список входных событий и собирать результаты в список, а в производстве вы будете использовать реальные источники и приемники.
Вот работа:
/*
* Example showing how to make sources and sinks pluggable in your application code so
* you can inject special test sources and test sinks in your tests.
*/
public class TestableStreamingJob {
private SourceFunction<Long> source;
private SinkFunction<Long> sink;
public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
this.source = source;
this.sink = sink;
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> LongStream =
env.addSource(source)
.returns(TypeInformation.of(Long.class));
LongStream
.map(new IncrementMapFunction())
.addSink(sink);
env.execute();
}
public static void main(String[] args) throws Exception {
TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
job.execute();
}
// While it's tempting for something this simple, avoid using anonymous classes or lambdas
// for any business logic you might want to unit test.
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1 ;
}
}
}
Вот эта RandomLongSource
:
public class RandomLongSource extends RichParallelSourceFunction<Long> {
private volatile boolean cancelled = false;
private Random random;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (!cancelled) {
Long nextLong = random.nextLong();
synchronized (ctx.getCheckpointLock()) {
ctx.collect(nextLong);
}
}
}
@Override
public void cancel() {
cancelled = true;
}
}