У меня есть несколько серверов, с которых будут создаваться сообщения, и мне нужен брокер и потребитель на одном сервере.
Если у меня и производитель, и потребитель работают на одном и том же сервере, то он работает нормально, но не уверен, какие изменения необходимо внести, чтобы разделить производителей. Я не хочу никакой зависимости серверов zookeeper и kafka от серверов производителей, поскольку их много, и они будут увеличиваться.
Я пытался изменить сервер начальной загрузки на сервер брокера / потребителя, например 192.168.0.1:9092, при настройке KafkaProducer, но все еще не смог генерировать сообщения. Не уверен, что мне не хватает, пожалуйста, помогите мне здесь.
producer.props
bootstrap.servers=192.168.0.1:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true
Код производителя
public class Producer {
public static void main(String[] args) throws IOException {
// set up the producer
KafkaProducer<String, String> producer;
System.out.println("1");
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
}
try {
for (int i = 0; i < 1000000; i++) {
// send lots of messages
System.out.println("bedore send");
producer.send(new ProducerRecord<String, String>(
"fast-messages",
String.format("{\"type\":\"marker\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
producer.send(new ProducerRecord<String, String>(
"summary-markers",
String.format("{\"type\":\"other\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
producer.flush();
System.out.println("Sent msg number " + i);
}
System.out.println("fdone");
} catch (Throwable throwable) {
throwable.printStackTrace();
System.out.printf("%s", throwable.getStackTrace());
} finally {
producer.close();
}
}
}