У меня есть приложение для регистрации транзакций в таблицах hbase. Я использую сообщения транзакций от kafka, но мой слушатель kafka регистрирует следующие строки. И не мог упорствовать в таблице. Я использую для потребителя Spring-Kafka версии 2.1.7. Как я могу решить эту проблему? Моя потребительская реализация кафки вот так;
@KafkaListener(topics = "${kafka.consumer.topic}")
public void receive(ConsumerRecord<String, String> consumerRecord) throws IOException {
if (StringUtils.hasText(consumerRecord.value())) {
//Some business logic
}
}
Kafka Listener Config
@Configuration
@EnableKafka
public class KafkaListenerConfig {
@Autowired
private KafkaListenerProperties kafkaListenerProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}
@Bean
public Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrap());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroup());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
@Bean
public Deserializer stringKeyDeserializer() {
return new StringDeserializer();
}
@Bean
public Deserializer workUnitJsonValueDeserializer() {
return new StringDeserializer();
}
}
Регистрация Kafka Listener-Hbase, подобная следующей;
INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] #106, waiting for some tasks to finish. Expected max=0, tasksInProgress=2 hasError=false, tableName=FraudRequest
INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Left over 2 task(s) are processed on server(s): [XXXXX.kfs.local,60020,1552673278405]
INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Regions against which left over task(s) are processed: [FraudRequest,15543323,1554891506303.d3bb6fef4ab349e93729d14d13f730bc.]
INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] #107, waiting for some tasks to finish. Expected max=0, tasksInProgress=2 hasError=false, tableName=FraudRequest
INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Left over 2 task(s) are processed on server(s): [XXXXX.kfs.local,60020,1552673278405]
INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Regions against which left over task(s) are processed: [FraudRequest,15543323,1554891506303.d3bb6fef4ab349e93729d14d13f730bc.]