Clickhouse не может получать сообщения от Kafka с форматированием TabSeparated - PullRequest
1 голос
/ 07 августа 2020

Я отправляю сообщения в Kafka из своего приложения Spring Boot

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("uniqTopic123", "testKey", "Test\tTest");
future.addCallback(
        (v) -> System.out.println("SUCCESS: " + v),
        (v) -> System.out.println("FAIL: " + v)
);
kafkaTemplate.flush();

application.properties

spring.kafka.consumer.group-id=app.1
kafka.server=<kafka_host>:9092
kafka.producer.id=kafkaProducerId

Configuration

@Value("${kafka.server}")
private String kafkaServer;

@Value("${kafka.producer.id}")
private String kafkaProducerId;

@Value("${spring.kafka.consumer.group-id}")
private String kafkaGroupId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    //props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerId);
    return props;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<String, String>(
            new DefaultKafkaProducerFactory<>(producerConfigs()));
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);

    return new DefaultKafkaConsumerFactory<>(props);
}

В журналах я вижу такие сообщения:

УСПЕХ: SendResult [producerRecord = ProducerRecord (topic = uniqTopic123, partition = null, headers = RecordHeaders (headers = [], isReadOnly = true), key = testKey, value = Test Test, timestamp = null), recordMetadata=uniqTopic123-0@1]

Но мой слушатель не улавливает никаких сообщений

@KafkaListener(topics="uniqTopic123")
public void msgListener(ConsumerRecord<String, String> record){
    System.out.println("test ======> " + record.value());
}

И таблица в ClickHouse пуста. Мои таблицы ClickHouse

CREATE TABLE IF NOT EXISTS test (key String, message String)
ENGINE = Kafka('<kafka_host>:9092', 'uniqTopic123', 'app.1', 'TabSeparated');

CREATE TABLE IF NOT EXISTS test_table (key String, message String)
ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO test_table AS
SELECT key, message FROM test;

Что не так в моем коде?

UPD .: Инструмент Kafka показывает, что сообщения находятся в Kafka введите описание изображения здесь

UPD : Ошибка заключалась в отсутствии перевода строки в конце сообщения «Test \ tTest \ n»

1 Ответ

1 голос
/ 07 августа 2020

Ошибка заключалась в отсутствии перевода строки в конце сообщения «Test \ tTest \ n»

...