Мой потребитель Spring / Java не может получить доступ к сообщению, созданному производителем. Однако, когда я запускаю потребителя из консоли / терминала, он может получить сообщение, созданное производителем spring / java.
Конфигурация потребителя:
@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {
private String bootstrap;
private String group;
private String topic;
public String getBootstrap() {
return bootstrap;
}
public void setBootstrap(String bootstrap) {
this.bootstrap = bootstrap;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}
Конфигурация слушателя:
@Configuration
@EnableKafka
public class KafkaListenerConfig {
@Autowired
private KafkaConsumerProperties kafkaConsumerProperties;
@Bean
public Map<String, Object> getConsumerProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return properties;
}
@Bean
public Deserializer stringKeyDeserializer() {
return new StringDeserializer();
}
@Bean
public Deserializer transactionJsonValueDeserializer() {
return new JsonDeserializer(Transaction.class);
}
@Bean
public ConsumerFactory<String, Transaction> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Слушатель Кафки:
@Service
public class TransactionConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);
@KafkaListener(topics={"transactions"}, containerFactory = "kafkaListenerContainerFactory")
public void onReceive(Transaction transaction) {
LOGGER.info("transaction = {}",transaction);
}
}
Потребительское приложение:
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
ТЕСТ-СЛУЧАЙ 1: ПРОЙДЕН
Я начал свой весенний / Java-продюсер и запускаю потребителя с консоли. Когда я создаю формирователь сообщения, мой консольный потребитель может получить доступ к сообщению.
ТЕСТ, СЛУЧАЙ 2: СБОЙ
Я запустил свой Spring / Java-клиент и запустил его с консоли. Когда я создаю консоль производителя сообщений, мой потребитель Spring / Java не может получить доступ к сообщению.
ТЕСТ, СЛУЧАЙ 3: СБОЙ
Я начал свой потребитель весны / Java и управляю производителем весны / Java. Когда я создаю сообщение из источника Spring / Java, мой потребитель Spring / Java не может получить доступ к сообщению.
Вопрос
Что-то не так в моем коде пользователя?
Мне не хватает какой-либо конфигурации для моего слушателя kafka?
Нужно ли явно запускать прослушиватель? (Я так не думаю, так как вижу в журнале терминала соединение с темой, но я не уверен)