Мигает странная ошибка «Не удается сериализовать объектный класс оператора ... CoBroadcastWithNonKeyedOperator» - PullRequest
0 голосов
/ 12 марта 2019

Я пытаюсь настроить проект с использованием BroadcastState, но по какой-то причине я получаю эту ошибку при попытке его запустить:

org.apache.flink.streaming.runtime.tasks.StreamTaskException:Невозможно сериализовать объектный класс класса org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.

Я не уверен, почему он его выбрасывает.Объекты, передаваемые в него и выводимые им (SampleInput и Token), являются очень простыми сгенерированными в avro pojos с двумя или тремя полями, и я попытался просто оставить методы BroadcastProcessFunction пустыми, чтобы вырезать все, что я мог установить, чтобы сделать невозможным сериализацию, новсе еще получаю ошибку.Вот соответствующая часть кода:

//Sideoutput that error strings will be written to
    OutputTag<String> sideOutputTag = new OutputTag<String>("side-output") {};

    //<Setup for broadcast state>
    StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.seconds(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot()
            .build();

    final MapStateDescriptor<String, Token> ruleStateDescriptor = new MapStateDescriptor<>(
            "oathTokens",
            BasicTypeInfo.STRING_TYPE_INFO,
            AvroTypeInfo.of(new TypeHint<Token>() {}));
    ruleStateDescriptor.enableTimeToLive(ttlConfig);

    DataStream<Token> tokenObjectStream = tokenSourceStream.process(new JsonToTokenProcessFunction(sideOutputTag))
            .startNewChain()
            .uid("tokenObjectStream")
            .name("tokenObjectStream");

    BroadcastStream<Token> ruleBroadcastStream = tokenObjectStream.broadcast(ruleStateDescriptor);
    //</Config for broadcast state>


    //<Main Data Input Stream>
    DataStream<SampleInput> jsonToSampleInput = kafkaStream.process(new JsonToPojoProcessFunction(sideOutputTag))
            .startNewChain()
            .uid("sampleInputStream")
            .name("sampleInputStream");

    BroadcastConnectedStream<SampleInput, Token> broadcastConnectedStream = jsonToSampleInput.connect(ruleBroadcastStream);

    DataStream<SampleInput> matchedBroadcastStream = broadcastConnectedStream.process(new BroadcastProcessFunction<SampleInput, Token, SampleInput>() {

        @Override
        public void processElement(SampleInput sampleInput, ReadOnlyContext readOnlyContext, Collector<SampleInput> collector) throws Exception {

        }

        @Override
        public void processBroadcastElement(Token token, Context context, Collector<SampleInput> collector) throws Exception {

        }
    });

Любая помощь будет принята с благодарностью.Я уверен, что я просто что-то упускаю.Спасибо!

1 Ответ

1 голос
/ 12 марта 2019

Оказывается, что объект ttlConfig не является сериализуемым. Удаление это решило проблему.

...