Сообщения Кафки теряются, когда потребитель падает - PullRequest
0 голосов
/ 18 февраля 2019

Здравствуйте, я пишу kafka-потребителя-производителя с использованием весеннего облачного потока.Внутри моего потребителя я сохраняю свои данные в базе данных. Если база данных выйдет из строя, я выйду из приложения вручную. После перезапуска приложения, если база данных все еще не работает, в результате приложение снова останавливается.Теперь, если я перезапускаю приложение в третий раз, сообщения, полученные в среднем интервале (два сбоя), теряются, потребитель kafka принимает последнее сообщение, а также пропускает сообщение, из которого я вышел из кода.

Интерфейс привязки входящего и исходящего каналов

public interface EventChannel {

String inputEvent = "inputChannel";
String outputEvent = "outputChannel";

@Input(inputChannel)
SubscribableChannel consumeEvent();

@Output(outputEvent)
SubscribableChannel produceEvent();
}

Класс обслуживания -

1) Служба производителя

@Service
@EnableBinding(EventChannel.class)

public class EventProducerService{

private final EventChannel eventChannel;

@Autowired  
public EventConsumerService(EventChannel eventChannel){
this.eventChannel = eventChannel;
}

public void postEvent(EventDTO event) {
    MessageChannel messageChannel = eventChannel.produceEvent();
    messageChannel.send(MessageBuilder
            .withPayload(event)
            .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
            .setHeader("partitionKey",event.getId().toString())
            .build());     
    }
}

2) Служба потребителя

@Component
@EnableBinding(EventChannel.class)
public class EventConsumerService{ 

private final ApplicationContext applicationContext;
private final EventChannel eventChannel;

@Autowired  
public EventConsumerService(ApplicationContext applicationContext,EventChannel eventChannel){
this.applicationContext = applicationContext;
this.eventChannel = eventChannel;
}

@StreamListener(EventChannel.inputEvent)
public void saveUpdateCassandra(EventDTO event){
  Event newEvent = new Event(event);
  try{
     eventRepository.save(newEvent)
    } catch(Exceptione e){
     e.printStackTrace();
     SpringApplication.exit(applicationContext,()-> 0); 
  }
}

Файл свойств приложения

#Spring Cloud Streams Configuration
##Broker
spring.cloud.stream.kafka.binder.brokers=localhost:9092
##EventIngestion 
spring.cloud.stream.bindings.outputChannel.destination=Event
spring.cloud.stream.bindings.outputChannel.producer.partitionKeyExpression=headers.partitionKey
spring.cloud.stream.bindings.inputChannel.destination=Event
spring.cloud.stream.bindings.inputChannel.group=event-consumer-1
spring.cloud.stream.kafka.bindings.inputChannel.consumer.startOffset=earliest

Оба приложения работают независимо. Поэтому, если моя база данных выходит из строя, потребитель останавливается, при последовательных сбоях сообщения теряются

1 Ответ

0 голосов
/ 18 февраля 2019

Во-первых, я не уверен, чего вы ожидаете от SpringApplication.exit(applicationContext,()-> 0);, но вы, по сути, закрываете все приложение со всем, что там может быть запущено.Во-вторых, ваша потеря сообщения связана с тем, что Kafka Binder абсолютно не знает, что произошло исключение, и что он должен вернуть сообщение обратно в тему.Фактически, с точки зрения связующего и благодаря вашему коду каждое сообщение всегда успешно обрабатывается.Так.,.

Пожалуйста, удалите try/catch из вашего метода StreamListener и разрешите распространению исключения, тем самым сообщая связующему, что произошла ошибка.

...