Я запускаю kafka на одном узле, я хочу увидеть поведение kafka Producer, когда я выключаю свой брокер kafka, затем я перезагружаю свой брокер за несколько секунд, поэтому я создаю проект весенней загрузки, куда я могу отправить 1000 объектов JSON клиента ивыведите смещение JSON-объекта, отправляемого каждый раз.Мое приложение работает нормально, но когда я закрываю свой брокер kafka и через несколько секунд перезагружаю моего брокера, мой производитель возвращается, чтобы нормально посылать объекты из последнего смещения.Проблема в том, что для моего примера, когда в консоли смещение = 983, я опускаю кафку, и когда я снова запускаю своего брокера, я вижу, что кафка начинает отправлять сообщения с смещения = 984 , но я нахожу, что 3 или4 сообщения теряются при этом сообщении об ошибке !!!
983
offset acked
2019-04-01 16:20:34.635 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:34.638 INFO 7656 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-2, groupId=jsa-group] Error sending fetch request (sessionId=1910675333, epoch=31) to node 0: org.apache.kafka.common.errors.DisconnectException.
2019-04-01 16:20:34.689 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
2019-04-01 16:20:34.844 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
2019-04-01 16:20:35.050 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
2019-04-01 16:20:35.557 WARN 7656 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=jsa-group] Connection to node 0 could not be established. Broker may not be available.
.........
dali 36
2019-04-01 16:20:55.592 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.594 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:55.594 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:55.696 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.696 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
984
offset acked
985
offset acked
986
offset acked
987
offset acked
988
offset acked
989
offset acked
990
offset acked
991
offset acked
992
offset acked
993
offset acked
994
offset acked
995
offset acked
996
offset acked
997
offset acked
998
offset acked
999
offset acked
1000
offset acked
1001
offset acked
1002
offset acked
1003
offset acked
1004
offset acked
2019-04-01 16:20:55.799 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.803 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:55.803 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:55.904 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:55.904 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.006 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.010 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.011 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.111 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.111 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.213 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.215 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.215 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.317 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.317 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.419 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.422 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.422 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.523 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.523 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
dali 37
1005
offset acked
2019-04-01 16:20:56.625 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.628 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.628 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.729 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.729 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.831 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.834 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:56.834 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:56.936 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:56.936 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.037 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.040 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.040 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.140 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.141 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.245 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.252 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.252 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.358 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.358 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.460 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.466 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.467 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.567 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.568 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
dali 38
1006
offset acked
2019-04-01 16:20:57.671 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.681 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: This is not the correct coordinator.
2019-04-01 16:20:57.681 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.785 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.786 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-04-01 16:20:57.889 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Discovered group coordinator dali-X556UJ:9092 (id: 2147483647 rack: null)
2019-04-01 16:20:57.896 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: The coordinator is loading and hence can't process requests.
2019-04-01 16:20:58.007 ERROR 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Offset commit failed on partition jsa-kafka-topic-0 at offset 984: The coordinator is loading and hence can't process requests.
2019-04-01 16:20:58.125 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Revoking previously assigned partitions [jsa-kafka-topic-0]
2019-04-01 16:20:58.125 INFO 7656 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [jsa-kafka-topic-0]
2019-04-01 16:20:58.125 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] (Re-)joining group
2019-04-01 16:20:58.149 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Successfully joined group with generation 31
2019-04-01 16:20:58.149 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Setting newly assigned partitions [jsa-kafka-topic-0]
2019-04-01 16:20:58.154 INFO 7656 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [jsa-kafka-topic-0]
dali 39
1007
offset acked
dali 40
1008
offset acked
dali 41
1009
offset acked
dali 42
1010
offset acked
dali 43
1011
offset acked
//...continue to send normally
Почему мои сообщения с смещением = 1004 и смещением = 1005 и d смещением = 1006 и первое сообщение с offset = 983 (когда я отказался от моего брокера) не удалось отправить при повторном запуске моего брокера kafka.
// мой основной класс
public static void main(String[] args) {
SpringApplication.run(SpringKafkaSendConsumeJavaObjectApplication.class, args);
}
@Bean
ApplicationRunner run(CustomerRepository personRepository) {
return args -> {
List<Customer> list = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
list.add(new Customer("dali "+i, 25, "homme"));
Runnable runnable = new MyRunnable(list,producer);
runnable.run();
};
}
это мой метод запуска, когда я отправляю одного клиента каждую ОДНУ секунду
public class MyRunnable implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MyRunnable.class);
private List<Customer> customers;
private KafkaProducer kafkaProducer;
public MyRunnable(List<Customer> customers,KafkaProducer kafkaProducer) {
this.customers = customers.stream().collect(Collectors.toList());
this.kafkaProducer = kafkaProducer;
}
@Override
public void run() {
customers.forEach(customer -> {
System.out.println(customer.getName());
kafkaProducer.send(customer);
try {
Thread.sleep(1000);//1second
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// это мой метод отправки
@Service
public class KafkaProducer {
private static final Logger LOGGER=LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;
@Value("${jsa.kafka.topic}")
String kafkaTopic = "jsa-test";
@Async
public void send(Customer customer) {
//LOGGER.info("sending data= '{}' " , customer);
ListenableFuture<SendResult<String,Customer>> listenableFuture = kafkaTemplate.send(kafkaTopic ,customer);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String,Customer>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onSuccess(final SendResult<String, Customer> message) {
System.out.println(message.getRecordMetadata().offset());
System.out.println("offset acked");
}
});
}
// это мой кафкаконфиг в application.properties
#Kafka Cluster
jsa.kafka.bootstrap-servers=localhost:9092
#consumer group id
jsa.kafka.consumer.group-id=jsa-group
#topic name
jsa.kafka.topic=jsa-kafka-topic
#server port
server.port=9000
#reconnect.backoff.ms=10000
#Integer.MAX_VALUE
retries=2147483647
retry.backoff.ms=1000
#5 minutes
request.timeout.ms=305000
#Integer.MAX_VALUE
max.block.ms=2147483647