Флинк присоединяется, чтобы обогатить поток - PullRequest
1 голос
/ 03 февраля 2020

Я очень новичок в Apache Flink. Я использую v1.9.0. Я хочу объединить несколько потоков пример . Я получаю следующее исключение при выполнении следующего примера.

Исключение:

15:18:51,839 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingEventTimeWindows(2, 1), EventTimeTrigger, CoGroupWindowFunction) -> Sink: Print to Std. Out (3/4) (ebc7985691707417b57a391ac83104f9) switched from RUNNING to FAILED.
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
    at org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows.assignWindows(SlidingEventTimeWindows.java:78)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

Мой основной метод:

public class ProcessingTimeJoinExercise {

@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Simulated trade stream
    DataStream<Trade> tradeStream = FinSources.tradeSource(env);

    // Simulated customer stream
    DataStream<Customer> customerStream = FinSources.customerSource(env);

    // Stream of enriched trades
    DataStream<EnrichedTrade> joinedStream = tradeStream
            .join(customerStream).where(new KeySelector<Trade, Long>() {
                @Override
                public Long getKey(Trade trade) throws Exception {
                    return trade.customerId;
                }
            }).equalTo(new KeySelector<Customer, Long>() {
                @Override
                public Long getKey(Customer cust) throws Exception {
                    // TODO Auto-generated method stub
                    return cust.customerId;
                }
            }).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
            .apply(new JoinFunction<Trade, Customer, EnrichedTrade>() {
                @Override
                public EnrichedTrade join(Trade trade, Customer customer) {
                    return new EnrichedTrade(trade, customer);
                }
            });

    joinedStream.print();

    env.execute("processing-time join");

Любая идея, что я Я делаю не так?

1 Ответ

2 голосов
/ 03 февраля 2020

Если вы добавите

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

в начале, сразу после инициализации env, он будет работать правильно.

...