Переверните добавление перебалансировки в поток, причину сбоя задания, когда StreamExecutionEnvironment установлен с использованием TimeCharacteristi c .IngestionTime - PullRequest
1 голос
/ 26 мая 2020

Я пытаюсь запустить задание потоковой передачи, которое потребляет сообщения от Kafka, преобразует их и опускается в Cassandra.

Текущий фрагмент кода не работает out, или задание изменено на использование TimeCharacteristi c .EventTime и присвоение водяного знака, как в следующем фрагменте, тогда это работает.

val env: StreamExecutionEnvironment = getExecutionEnv("dev")
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
.
.

  val source = env.addSource(kafkaConsumer)
              .uid("kafkaSource")
              .rebalance
              .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessRawDataTimestampExtractor[RawData](Time.seconds(1)))


val transformedObjects = source.map(rawData=>TransforemedObjects.fromRawData(rawData))
        .setParallelism(dataSinkParallelism)
    sinker.apply(transformedObjects,dataSinkParallelism)

Трассировка стека:

java.lang.Exception: java.lang.RuntimeException: 1
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 1
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    at org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:143)
    at org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:45)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$AutomaticWatermarkContext.processAndCollect(StreamSourceContexts.java:176)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$AutomaticWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:194)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:246)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
    ... 16 more

Я что-то делаю не так? Или есть ограничение на использование функции rebalance, когда для TimeCharacteristi c установлено значение IngestionTime?

Заранее спасибо ...

1 Ответ

0 голосов
/ 28 мая 2020

Можете ли вы предоставить используемую вами версию flink?

Кажется, ваша проблема связана с этим билетом Jira

https://issues.apache.org/jira/browse/FLINK-14087

Вы использовали rebalance только один раз в своей задаче? recordWriter может использовать тот же channelSelector, который определяет, куда будет перенаправлена ​​запись. Трассировка стека показывает, что он пытается выбрать канал, выходящий за границы.

...