Здравствуйте, я пишу kafka-потребителя-производителя с использованием весеннего облачного потока.Внутри моего потребителя я сохраняю свои данные в базе данных. Если база данных выйдет из строя, я выйду из приложения вручную. После перезапуска приложения, если база данных все еще не работает, в результате приложение снова останавливается.Теперь, если я перезапускаю приложение в третий раз, сообщения, полученные в среднем интервале (два сбоя), теряются, потребитель kafka принимает последнее сообщение, а также пропускает сообщение, из которого я вышел из кода.
Интерфейс привязки входящего и исходящего каналов
public interface EventChannel {
String inputEvent = "inputChannel";
String outputEvent = "outputChannel";
@Input(inputChannel)
SubscribableChannel consumeEvent();
@Output(outputEvent)
SubscribableChannel produceEvent();
}
Класс обслуживания -
1) Служба производителя
@Service
@EnableBinding(EventChannel.class)
public class EventProducerService{
private final EventChannel eventChannel;
@Autowired
public EventConsumerService(EventChannel eventChannel){
this.eventChannel = eventChannel;
}
public void postEvent(EventDTO event) {
MessageChannel messageChannel = eventChannel.produceEvent();
messageChannel.send(MessageBuilder
.withPayload(event)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader("partitionKey",event.getId().toString())
.build());
}
}
2) Служба потребителя
@Component
@EnableBinding(EventChannel.class)
public class EventConsumerService{
private final ApplicationContext applicationContext;
private final EventChannel eventChannel;
@Autowired
public EventConsumerService(ApplicationContext applicationContext,EventChannel eventChannel){
this.applicationContext = applicationContext;
this.eventChannel = eventChannel;
}
@StreamListener(EventChannel.inputEvent)
public void saveUpdateCassandra(EventDTO event){
Event newEvent = new Event(event);
try{
eventRepository.save(newEvent)
} catch(Exceptione e){
e.printStackTrace();
SpringApplication.exit(applicationContext,()-> 0);
}
}
Файл свойств приложения
#Spring Cloud Streams Configuration
##Broker
spring.cloud.stream.kafka.binder.brokers=localhost:9092
##EventIngestion
spring.cloud.stream.bindings.outputChannel.destination=Event
spring.cloud.stream.bindings.outputChannel.producer.partitionKeyExpression=headers.partitionKey
spring.cloud.stream.bindings.inputChannel.destination=Event
spring.cloud.stream.bindings.inputChannel.group=event-consumer-1
spring.cloud.stream.kafka.bindings.inputChannel.consumer.startOffset=earliest
Оба приложения работают независимо. Поэтому, если моя база данных выходит из строя, потребитель останавливается, при последовательных сбоях сообщения теряются