Создание пакетов из записей опроса в кафке - PullRequest
0 голосов
/ 22 января 2019

У нас есть требование выполнять массовую запись вasticsearch. Мы хотели бы знать, существует ли лучший способ пакетирования данных и избежать потери данных при выполнении пакетирования

 public void consume() {
        logger.debug("raw consume......");

        String topic = "json.incoming";
        String consGroup = "rConsumerGroup";

        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "20000");
        props.put("max.poll.records", "10000");

        consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER, props);
        logger.debug("Kafka Consumer Initialized......");
        buffer = new ArrayList<MessageVO>();

        while (true) {
            try {
                ConsumerRecords<String, JsonNode> records = consumer.poll(100);
                Date startTime = Calendar.getInstance()
                    .getTime();
                if (records.count() == 0 && !buffer.isEmpty()) {
                    lastSeenZeroPollCounter++;
                }
                if (records.count() > 0) {
                    logger.debug(">>records count = " + records.count());
                    for (ConsumerRecord<String, JsonNode> record : records) {
                        logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());
                        JsonNode jsonMessage = record.value();
                        logger.debug("incoming Message = " + jsonMessage);
                        ObjectMapper objectMapper = new ObjectMapper();
                        MessageVO rawMessage = objectMapper.convertValue(jsonMessage, MessageVO.class);
                        logger.info("Size of the buffer is " + buffer.size());
                        buffer.add(rawMessage);
                    }
                    Date endTime = Calendar.getInstance()
                        .getTime();
                    long durationInMilliSec = endTime.getTime() - startTime.getTime();
                    logger.debug("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
                }
                if ((buffer.size() >= 1000 && buffer.size() <= 3000) || (buffer.size() > 0 && lastSeenZeroPollCounter >= 3000)) {
                    lastSeenZeroPollCounter = 0;
                    List<RawSyslogMessageVO> clonedBuffer = deepCopy(buffer);
                    logger.info("The size of clonedBuffer is ::: " + clonedBuffer.size());
                    writerService.writeRaw(clonedBuffer);
                    buffer.clear();
                }

                consumer.commitSync();
            } catch (Throwable throwable) {
                logger.error("Error occured while processing message", throwable);
                throwable.printStackTrace();
            }
        }
    }

Код для клонирования данных во избежание потери данных

 private List<MessageVO> deepCopy(List<MessageVO> messages) {
        List<MessageVO> listOfMessages = new ArrayList<>();
        logger.debug("DeepClone :: listOfMessages size ::: " + listOfMessages.size());
        listOfMessages.addAll(messages);
        return Collections.unmodifiableList(messages);
    }

Любая помощь приветствуется. Спасибо.

Ответы [ 2 ]

0 голосов
/ 24 января 2019

Мы использовали тот же вариант использования, немного упростив дизайн приложения: в основном мы выполняем следующие шаги

  1. Используйте Spring Kafka BatchAcknowledgingMessageListener для получения записи с max.poll.records установлены в соответствии с требованиями
  2. Для каждой выборки фиксируйте сообщения, используя Elasticsearch BulkRequest API
  3. После успешного массового индексирования, подтвердите это Кафке.
  4. При неудачной повторной попытке или ошибке обработки

Следуя этой более простой схеме, мы понимаем, что большинство массовых коммитов будет иметь желаемое количество записей. Для случая, когда в теме Kafka не так много сообщений, как желаемое количество записей для массовой индексации, мы решаем индексировать все, что доступно в одной выборке, в любом случае вместо явной обработки состояния фиксации, управления буфером и т. Д. В приложении. .

Массовая фиксация Elasticsearch - это оптимизация - я не вижу причин, чтобы быть предельно точным в отношении общего количества записей на один массовый запрос. (см. также это руководство ).

PS: нам нужно было писать код вместо использования коннектора или готовых решений, потому что наши входные данные взяты из нескольких тем в различных форматах, таких как protobuf, zip-файл XML, Json и т. Д., И нам нужно было выполнить преобразование формата и сложную десериализацию перед индексированием данные

0 голосов
/ 22 января 2019

Лучший способ, чем писать самому, - это использовать API-интерфейс Kafka Connect Apache Kafka - он был создан специально для потоковой интеграции из систем в Kafka и из Kafka в другие системы: -)

The Соединитель Elasticsearch будет передавать данные из раздела Kafka в Elasticsearch с настраиваемыми размерами пакетов и т. Д., А также с семантикой доставки в один раз, масштабируемой обработкой и т. Д.

Отказ от ответственности: я работаю на Confluent.

...