Исключение в функции flatmap при развертывании в кластер - PullRequest
1 голос
/ 01 июля 2019

У меня есть приложение с мгновенным зажиганием. Я получаю сообщения от Кафки и обрабатываю сообщения, а затем кеширую, чтобы зажечь. когда я запускаю программу в ide (intellij) и автономном jar, проблем не возникает, но при развертывании в кластере я получаю это исключение (я создал таблицу ранее в коде.). Заранее спасибо. Обратите внимание, что переменные соединения в моем основном классе являются статическими.

   Caused by: java.lang.NullPointerException
        at altosis.flinkcompute.compute.Main$2.flatMap(Main.java:95)
        at altosis.flinkcompute.compute.Main$2.flatMap(Main.java:85)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        ... 22 more
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.getConfig();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            environment.setParallelism(1);
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            props.setProperty("group.id","event-group");

            FlinkKafkaConsumer<EventSalesQuantity> consumer = new FlinkKafkaConsumer<EventSalesQuantity>("EventTopic",new EventSerializationSchema(),props);
            DataStream<EventSalesQuantity> eventDataStream = environment.addSource(consumer);

            KeyedStream<EventSalesQuantity, String> keyedEventStream = eventDataStream.assignTimestampsAndWatermarks(
                    new AssignerWithPeriodicWatermarksImpl()
            ).
                    keyBy(new KeySelector<EventSalesQuantity, String>() {
                        @Override
                        public String getKey(EventSalesQuantity eventSalesQuantity) throws Exception {
                            return  eventSalesQuantity.getDealer();
                        }
                    });

            DataStream<Tuple2<EventSalesQuantity,Integer>> eventSinkStream = keyedEventStream.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS),Time.hours(21))).aggregate(new AggregateImpl());
            ignite = Ignition.start();
            ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
            igniteClient = Ignition.startClient(cfg);

            System.out.println(">>> Thin client put-get example started.");
            igniteClient.query(
                    new SqlFieldsQuery(String.format(
                            "CREATE TABLE IF NOT EXISTS Eventcache (eventtime VARCHAR PRIMARY KEY, bayi VARCHAR, sales INT ) WITH \"VALUE_TYPE=%s\"",
                            EventSalesQuantity.class.getName()
                    )).setSchema("PUBLIC")
            ).getAll();

            eventSinkStream.addSink(new FlinkKafkaProducer<Tuple2<EventSalesQuantity, Integer>>("localhost:9092","SinkEventTopic",new EventSinkSerializationSchema()));
            Class.forName("org.apache.ignite.IgniteJdbcThinDriver");

            conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");
            eventSinkStream.flatMap(new FlatMapFunction<Tuple2<EventSalesQuantity, Integer>, Object>() {
                @Override
                public void flatMap(Tuple2<EventSalesQuantity, Integer> eventSalesQuantityIntegerTuple2, Collector<Object> collector) throws Exception {
                    Ignsql= conn.prepareStatement(
                            "INSERT INTO Eventcache (eventtime, bayi, sales) VALUES (?, ?, ?)");

                    Ignsql.setString(1, eventSalesQuantityIntegerTuple2.f0.getTransactionDate());
                    Ignsql.setString(2, eventSalesQuantityIntegerTuple2.f0.getDealer());
                    Ignsql.setInt(3, eventSalesQuantityIntegerTuple2.f1);
                    Ignsql.execute();
                    Ignsql.close();
                }
            });

           // eventSinkStream.print();
            environment.execute();```

1 Ответ

1 голос
/ 01 июля 2019

Полагаю, когда вы говорите: «Обратите внимание, что переменные соединения являются статическими в моем основном классе», вы говорите о Ignsql.Если это так, то ваш код не будет работать, потому что эта переменная недоступна для вашей функции карты, которая сериализуется и распространяется JobManager до фактического запуска рабочего процесса.

Вы должны создать класс RichFlatMapFunction,и в методе open() вы устанавливаете нужные переменные соединения, а затем закрываете их в методе close().Если у вас есть параметры конфигурации, необходимые для настройки переменной соединения, вы должны передать их в конструктор RichFlatMapFunction и сохранить их в (не переходных) переменных, а затем использовать их в методе open().

...