Я реализую конечную точку в весенней загрузке, которая при вызове будет делать дамп всех сообщений, находящихся в теме кафки (для тестирования).
Я ожидаю, что когда производитель пишет в тему 'testTopic', а затем потребитель опрашивает, он должен прочитать только что созданное сообщение.
Наблюдаемое мной поведение заключается в том, что потребитель не может использовать созданное сообщение. Кроме того, если производитель создает намного больше сообщений (скажем, 10-15), то потребитель сбросит их все за один раз. С этого момента, если производитель выдает хотя бы одно сообщение, потребитель будет потреблять, как ожидалось.
Интуитивно я подумал, что настройка FETCH_MIN_BYTES_CONFIG
может быть как-то связана с этим - возможно, потребитель ждал достаточного количества байтов для записи. Но это уже установлено в 1 байт (по умолчанию) и не объясняет последующие успешные отдельные чтения сообщений.
Затем я подумал, что, возможно, я регистрировал потребителя до создания темы (слишком быстро вызывая регистрирующую конечную точку). Но я подтвердил от kafka-topics.sh
, что тема существует до регистрации потребителя.
Я заметил, что если я включаю автокоммит смещений, то поведение иногда такое, как ожидалось, а иногда нет. При ручной коррекции смещений (не показано в коде ниже) поведение очень странное, как описано выше.
Я также знаю, что производитель работает, как ожидалось, подтвердив это, используя kafka-console-consumer
.
Также попытался увеличить время ожидания опроса до 1 секунды, но безуспешно.
// Consumer
@Component
public class TestConsumer{
private KafkaConsumer testConsumer = null;
public void registerConsumer(final String consumerId) {
if (consumer == null) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
testConsumer = new KafkaConsumer<String, String>(props);
testConsumer.subscribe(Collections.singletonList("testTopic"));
}
else{
logger.debug("Consumer already registered");
}
}
public Map<String, List<String>> consume() {
Map<String, List<String>> messages = new HashMap<>();
if (testConsumer == null){
logger.error("testConsumer was not instantiated");
return null;
}
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
List<String> buffer = new ArrayList<>();
for (ConsumerRecord<String, String> record: records){
logger.debug(String.format("Consuming %s", record.value()));
buffer.add(record.value());
}
messages.put("data", buffer);
return messages;
}
}
Последовательность шагов:
1. запускается приложение весенней загрузки
2. тема кафки создана, могу подтвердить через консоль кафки
3. Я регистрирую производителя и потребителя
4. Производитель производит, и я могу подтвердить это с помощью консоли kafka (другая группа потребителей).
5. Потребитель не потребляет
Я ожидаю, что результат будет следующим:
{
"data" : ["message1"]
}
Я получаю
{
"data" : []
}
Есть идеи, почему потребитель не потребляет записи, пока не будет написано предельное количество сообщений?
EDIT_1:
Добавлено свойство props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
для потребителя без эффекта.