Как остановиться после прочтения нескольких сообщений от Кафки в Flink Scala Shell? - PullRequest
0 голосов
/ 12 марта 2019

Я читаю сообщения от Кафки в оболочку Flink Scala. Я хочу прочитать несколько сообщений, сохранить их и остановиться. Не хочу постоянно читать. Я могу читать сообщения следующим образом:

scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("some-test-topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@3ba8ef4e

scala> senv.execute("Kafka Consumer Test")
Submitting job with JobID: 79b7380516b8ccb9ae71da29c97c8d8d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:55677/user/jobmanager#2006479320] with leader session id 00000000-0000-0000-0000-000000000000.
03/12/2019 15:15:16 Job execution switched to status RUNNING.
03/12/2019 15:15:16 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
03/12/2019 15:15:16 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
03/12/2019 15:15:16 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 

Я не могу остановить чтение сообщений без выхода из всей оболочки. Когда я нажимаю Ctrl-C, это убивает и работу, и оболочку. Как мне рассчитать время, чтобы он прочитал несколько сообщений и через некоторое время остановился? Можно ли убить его во время работы, не выходя из оболочки?

...