Я запускаю Kafka в контейнере докера, используя образ докера. Я посылаю сообщения из небольшого Java-приложения. Но когда я пытаюсь потреблять сообщения из терминала, я не вижу никаких сообщений, потребляемых. Я использую команду kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
для получения сообщений в контейнере док-станции kafka. Я также пытался использовать localhost:9094
для просмотра потреблять сообщения. Однако я могу видеть сообщения в журналах тем в контейнере Kafka в /kafka/kafka-logs-kafka/test-topic-0
, поэтому я знаю, что они делают это для kafka. Может кто-нибудь сказать, почему сообщения находятся в журналах, но не потребляются и что за исправление? Я смог производить и принимать сообщения из контейнера Docker.
Это мой docker-compose.yml
для Kafka
kafka:
hostname: kafka
image: wurstmeister/kafka:0.10.2.1
environment:
KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zk:2181
KAFKA_CREATE_TOPICS: test-topic:1:1
ports:
- "22181:2181"
- "9092:9092"
- "9094:9094"
Java-приложение, в котором я создаю сообщения
public static void main( String[] args )
{
final String TOPIC = "test-topic";
final Producer<Long, String> producer = MessageProducer.createProducer();
long time = System.currentTimeMillis();
try {
for (long index = 0; index < 10; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index, "TEST!!!!! " + index);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(), metadata.offset(), elapsedTime);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}
class MessageProducer {
private final static String BOOTSTRAP_SERVERS = "localhost:9094";
static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
}