Как реализовать FlinkKafkaПроизводитель сериализатора для Kafka 2.2 - PullRequest
0 голосов
/ 31 октября 2019

Я работал над обновлением процессора Flink (версия Flink 1.9), который читает из Kafka, а затем пишет в Kafka. Мы написали этот процессор для работы с кластером Kafka 0.10.2, и теперь мы развернули новый кластер Kafka с версией 2.2. Поэтому я решил обновить процессор, чтобы использовать последние версии FlinkKafkaConsumer и FlinkKafkaProducer (в соответствии с рекомендациями документации Flink). Однако я столкнулся с некоторыми проблемами с производителем Kafka. Я не могу заставить его сериализовать данные, используя устаревшие конструкторы (что неудивительно), и я не смог найти в Интернете никаких реализаций или примеров того, как реализовать Serializer (все примеры используют более старые соединители Kafka)

Текущая реализация (для Kafka 0.10.2) выглядит следующим образом

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

При попытке реализовать следующее FlinkKafkaProducer

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

я получаю следующую ошибку:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

и я так и не смог понять почему. Конструктор для FlinkKafkaProducer также устарел, и когда я пытаюсь реализовать неосуждаемый конструктор, я не могу понять, как сериализовать данные. Вот как это будет выглядеть:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return null;
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

Но я не понимаю, как реализовать KafkaSerializationSchema, и я не нахожу примеров этого в Интернете или в документах Flink.

У кого-нибудь есть опыт реализации этого или какие-либо советы о том, почему FlinkProducer получает исключение NullPointerException на этом шаге?

Ответы [ 2 ]

0 голосов
/ 31 октября 2019

Чтобы справиться с тайм-аутом в случае FlinkKafkaProducer.Semantic.EXACTLY_ONCE, вам следует прочитать https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-011-and-newer, особенно эту часть:

Режим Semantic.EXACTLY_ONCE зависит от возможности фиксациитранзакции, которые были начаты до взятия контрольной точки, после восстановления из указанной контрольной точки. Если время между сбоем приложения Flink и завершенным перезапуском превышает время ожидания транзакции Kafka, произойдет потеря данных (Kafka автоматически прервет транзакции, которые превысили время ожидания). Имея это в виду, пожалуйста, настройте время ожидания транзакции в соответствии с ожидаемым временем простоя.

Для брокеров Kafka по умолчанию для параметра database.action.max.timeout.ms установлено значение 15 минут. Это свойство не позволяет устанавливать тайм-ауты транзакций для производителей, превышающие его значение. FlinkKafkaProducer011 по умолчанию устанавливает для свойства Transaction.timeout.ms в конфигурации производителя значение 1 час, поэтому перед использованием режима Semantic.EXACTLY_ONCE необходимо увеличить значение exchange.max.timeout.ms.

0 голосов
/ 31 октября 2019

Если вы просто отправляете строку в Kafka:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

    private String topic;   

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

Для отправки объекта Java:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;


    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) {
            super();
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
            byte[] b = null;
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
             try {
                b= mapper.writeValueAsBytes(obj);
            } catch (JsonProcessingException e) {
                // TODO 
            }
            return new ProducerRecord<byte[], byte[]>(topic, b);
        }

    }

В вашем коде

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
...