Как очистить пакет данных в Kafka Consumer, когда в теме больше нет записей - PullRequest
2 голосов
/ 05 июня 2019

Рассмотрим этого потребителя Kafka, который получает данные из темы, буферизует их в PreparedStatement и, когда пакетные записи по 100 КБ, отправляет запрос INSERT в БД.

Это работает хорошо, пока данные все еще не поступают.Однако, когда, например, буферизуются 20К-записи и больше нет входящих записей, он все еще ожидает более 80К-записей, пока в не будет сброшено оператора.Но Я бы хотел сбросить эти 20K , если они через некоторое время остановятся.Как я могу это сделать?Я не вижу способа как это зацепить.

Например, в PHP, который использует расширение php-rdkafka, основанное на librdkafka, я получаю RD_KAFKA_RESP_ERR__PARTITION_EOF, когда достигается конец раздела, так что это довольно легкоперехватить очистку буфера, когда это произойдет.

Я попытался упростить код, чтобы остались только значимые части

public class TestConsumer {

    private final Connection connection;
    private final CountDownLatch shutdownLatch;
    private final KafkaConsumer<String, Message> consumer;
    private int processedCount = 0;

    public TestConsumer(Connection connection) {
        this.connection = connection;
        this.consumer = new KafkaConsumer<>(getConfig(), new StringDeserializer(), new ProtoDeserializer<>(Message.parser()));
        this.shutdownLatch = new CountDownLatch(1);
    }

    public void execute() {
        PreparedStatement statement;
        try {
            statement = getPreparedStatement();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            commit(statement);

            consumer.wakeup();
        }));

        consumer.subscribe(Collections.singletonList("source.topic"));

        try {
            while (true) {
                ConsumerRecords<String, Message> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

                records.forEach(record -> {
                    Message message = record.value();
                    try {
                        fillBatch(statement, message);
                        statement.addBatch();
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                });

                processedCount += records.count();

                if (processedCount > 100000) {
                    commit(statement);
                }
            }
        } catch (WakeupException e) {
            // ignore, we're closing
        } finally {
            consumer.close();
            shutdownLatch.countDown();
        }
    }

    private void commit(PreparedStatement statement) {
        try {
            statement.executeBatch();
            consumer.commitSync();
            processedCount = 0;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }


    protected void fillBatch(PreparedStatement statement, Message message) throws SQLException {
        try {
            statement.setTimestamp(1, new Timestamp(message.getTime() * 1000L));
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

1 Ответ

2 голосов
/ 06 июня 2019

Я понимаю вашу проблему следующим образом:

  • Вы хотите использовать сообщения от Кафки

  • Копировать их в память до 100K записей

  • Фиксация в пакетном режиме в БД

  • Но вы хотите ждать только t секунд (скажем, 10 секунд)

Это может быть достигнуто гораздо более эффективным и надежным способом с помощью встроенного в Kafka потребительского пакетирования. При условии, что вы можете каким-то образом предсказать средний размер ваших сообщений в байтах.

В пользовательской конфигурации Kafka, вы должны установить следующее:

fetch.min.bytes => это должно быть 100k x средний размер сообщений

fetch.max.wait.ms => это ваше время ожидания в мс (например, 5000 на 5 секундв ожидании)

max.partition.fetch.bytes => макс.количество данных на раздел.Это помогает уточнить общий размер выборки

max.poll.records => максимальное количество записей, возвращаемых в одном опросе. Можно установить значение 100K

fetch.max.bytes =>, если вы хотитеустановить верхний предел для одного запроса

Таким образом, вы можете получить до 100 КБ записей, если они вписываются в определенный размер в байтах, но он будет ждать настраиваемого числа миллис.

Один разОпрос возвращает записи, вы можете сохранить их за один раз и повторить.

...