Сообщения Kafka в журналах тем, но не потребляются Kafka Consumer - PullRequest
0 голосов
/ 04 октября 2019

Я запускаю Kafka в контейнере докера, используя образ докера. Я посылаю сообщения из небольшого Java-приложения. Но когда я пытаюсь потреблять сообщения из терминала, я не вижу никаких сообщений, потребляемых. Я использую команду kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning для получения сообщений в контейнере док-станции kafka. Я также пытался использовать localhost:9094 для просмотра потреблять сообщения. Однако я могу видеть сообщения в журналах тем в контейнере Kafka в /kafka/kafka-logs-kafka/test-topic-0, поэтому я знаю, что они делают это для kafka. Может кто-нибудь сказать, почему сообщения находятся в журналах, но не потребляются и что за исправление? Я смог производить и принимать сообщения из контейнера Docker.

Это мой docker-compose.yml для Kafka

kafka:
    hostname: kafka
    image: wurstmeister/kafka:0.10.2.1
    environment:
      KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: zk:2181
      KAFKA_CREATE_TOPICS: test-topic:1:1
    ports:
      - "22181:2181"
      - "9092:9092"
      - "9094:9094"

Java-приложение, в котором я создаю сообщения

    public static void main( String[] args )
    {
        final String TOPIC = "test-topic";

        final Producer<Long, String> producer = MessageProducer.createProducer();
        long time = System.currentTimeMillis();

        try {
            for (long index = 0; index < 10; index++) {
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(TOPIC, index, "TEST!!!!! " + index);
                RecordMetadata metadata = producer.send(record).get();
                long elapsedTime = System.currentTimeMillis() - time;
                System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n",
                        record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime);

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            producer.flush();
            producer.close();
        }
    }
class MessageProducer {

    private final static String BOOTSTRAP_SERVERS = "localhost:9094";

    static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }
}
...