Рассмотрим этого потребителя Kafka, который получает данные из темы, буферизует их в PreparedStatement и, когда пакетные записи по 100 КБ, отправляет запрос INSERT в БД.
Это работает хорошо, пока данные все еще не поступают.Однако, когда, например, буферизуются 20К-записи и больше нет входящих записей, он все еще ожидает более 80К-записей, пока в не будет сброшено оператора.Но Я бы хотел сбросить эти 20K , если они через некоторое время остановятся.Как я могу это сделать?Я не вижу способа как это зацепить.
Например, в PHP, который использует расширение php-rdkafka, основанное на librdkafka, я получаю RD_KAFKA_RESP_ERR__PARTITION_EOF
, когда достигается конец раздела, так что это довольно легкоперехватить очистку буфера, когда это произойдет.
Я попытался упростить код, чтобы остались только значимые части
public class TestConsumer {
private final Connection connection;
private final CountDownLatch shutdownLatch;
private final KafkaConsumer<String, Message> consumer;
private int processedCount = 0;
public TestConsumer(Connection connection) {
this.connection = connection;
this.consumer = new KafkaConsumer<>(getConfig(), new StringDeserializer(), new ProtoDeserializer<>(Message.parser()));
this.shutdownLatch = new CountDownLatch(1);
}
public void execute() {
PreparedStatement statement;
try {
statement = getPreparedStatement();
} catch (SQLException e) {
throw new RuntimeException(e);
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
commit(statement);
consumer.wakeup();
}));
consumer.subscribe(Collections.singletonList("source.topic"));
try {
while (true) {
ConsumerRecords<String, Message> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
records.forEach(record -> {
Message message = record.value();
try {
fillBatch(statement, message);
statement.addBatch();
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
processedCount += records.count();
if (processedCount > 100000) {
commit(statement);
}
}
} catch (WakeupException e) {
// ignore, we're closing
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
private void commit(PreparedStatement statement) {
try {
statement.executeBatch();
consumer.commitSync();
processedCount = 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
protected void fillBatch(PreparedStatement statement, Message message) throws SQLException {
try {
statement.setTimestamp(1, new Timestamp(message.getTime() * 1000L));
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}