Слушатель Кафки не может получить сообщение и сохранить его в таблице hbase - PullRequest
0 голосов
/ 18 апреля 2019

У меня есть приложение для регистрации транзакций в таблицах hbase. Я использую сообщения транзакций от kafka, но мой слушатель kafka регистрирует следующие строки. И не мог упорствовать в таблице. Я использую для потребителя Spring-Kafka версии 2.1.7. Как я могу решить эту проблему? Моя потребительская реализация кафки вот так;

@KafkaListener(topics = "${kafka.consumer.topic}")
public void receive(ConsumerRecord<String, String> consumerRecord) throws IOException {
    if (StringUtils.hasText(consumerRecord.value())) {
     //Some business logic
    }
}

Kafka Listener Config

@Configuration
@EnableKafka
public class KafkaListenerConfig {

    @Autowired
    private KafkaListenerProperties kafkaListenerProperties;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =  new ConcurrentKafkaListenerContainerFactory<>();
//        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public DefaultKafkaConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
    }

    @Bean
    public Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrap());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroup());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        return props;
    }

    @Bean
    public Deserializer stringKeyDeserializer() {
        return new StringDeserializer();
    }

    @Bean
    public Deserializer workUnitJsonValueDeserializer() {
        return new StringDeserializer();
    }

}

Регистрация Kafka Listener-Hbase, подобная следующей;

INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] #106, waiting for some tasks to finish. Expected max=0, tasksInProgress=2 hasError=false, tableName=FraudRequest

INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Left over 2 task(s) are processed on server(s): [XXXXX.kfs.local,60020,1552673278405]

INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Regions against which left over task(s) are processed: [FraudRequest,15543323,1554891506303.d3bb6fef4ab349e93729d14d13f730bc.]

INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] #107, waiting for some tasks to finish. Expected max=0, tasksInProgress=2 hasError=false, tableName=FraudRequest

INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Left over 2 task(s) are processed on server(s): [XXXXX.kfs.local,60020,1552673278405]

INFO org.apache.hadoop.hbase.client.AsyncProcess [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Regions against which left over task(s) are processed: [FraudRequest,15543323,1554891506303.d3bb6fef4ab349e93729d14d13f730bc.]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...