Чтобы просто ответить на мой собственный вопрос, я взглянул на библиотеки интеграции Kafka, такие как Spring-Kafka и Spring Cloud Stream, но интеграция с реестром схем Confluent либо не завершена, либо мне не совсем понятна. Этого достаточно для примитивов, но нам это нужно для типизированных объектов Avro, которые проверяются реестром схемы. Теперь я внедрил решение, не зависящее от Кафки, основанное на ответе Spring Boot - Лучший способ запустить фоновый поток при развертывании
Окончательный код выглядит так:
@Component
public class AccountStreamConsumer implements DisposableBean, Runnable {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final AccountService accountService;
private final KafkaProperties kafkaProperties;
private final Consumer<AccountKey, Account> consumer;
@Autowired
public AccountStreamConsumer(AccountService accountService, KafkaProperties kafkaProperties,
ConfluentProperties confluentProperties) {
this.accountService = accountService;
this.kafkaProperties = kafkaProperties;
if (!kafkaProperties.getEnabled()) {
consumer = null;
return;
}
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentProperties.getSchemaRegistryUrl());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSecurityProtocolConfig());
props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getSaslMechanism());
props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required username=\"" + kafkaProperties.getUsername() + "\" password=\"" + kafkaProperties.getPassword() + "\";");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getAccountConsumerGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(kafkaProperties.getAccountsTopicName()));
Thread thread = new Thread(this);
thread.start();
}
@Override
public void run() {
if (!kafkaProperties.getEnabled())
return;
logger.debug("Started account stream consumer");
try {
//noinspection InfiniteLoopStatement
while (true) {
ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
List<Account> accounts = new ArrayList<>();
records.iterator().forEachRemaining(record -> accounts.add(record.value()));
if (accounts.size() != 0)
accountService.store(accounts);
}
} catch (WakeupException e) {
logger.info("Account stream consumer woken up for exiting.");
} finally {
consumer.close();
}
}
@Override
public void destroy() {
if (consumer != null)
consumer.wakeup();
logger.info("Woke up account stream consumer, exiting.");
}
}