Как остановить Flink Job после выполнения определенного условия - PullRequest
1 голос
/ 17 мая 2019

У меня есть приложение flink с 1 входным источником, оно проходит через несколько карт и возвращается к источнику кафки.

Основано на некоторых условиях в одной карте.Я хотел бы остановить карту и все карты в работе безопасным способом.И безопасно остановите задание.

Как мне остановить задание из Flink-приложения?

Я попытался добавить серьезный сбой.Подобно доступу к нулевому указателю, который может привести к сбою приложения.

Однако иногда только эта карта терпит неудачу, а все остальные либо переходят в отмененную (ожидаемую), либо просто висящую (отменяющую)

 val in_stream = env.addSource(myConsumer).name(topic_in).setParallelism(1)
  .map(x => doSomethingA(x)
  .map(x => doSomethingB(x)
  .map(x => doSomethingC(x) //In this map i want to safely stop the job based on some condition
  .map(x => doSomethingD(x)
  .addSink(new FlinkKafkaProducer09[String](broker, topic_out, new SimpleStringSchema()))

Я ожидаю безопасного выхода из работы (все карты отменены). Как мне поступить?

...