У меня есть служба весенней загрузки, которая использует тему кафки. Когда я потребляю, я выполняю определенные задачи в сообщении kafka. Прежде чем я смогу выполнить эти операции, мне нужно подождать, пока служба загрузит некоторые данные в кеши, которые я настроил. Моя проблема в том, что если я установил для kafka автомата автозапуск, он начинает потреблять до загрузки кеша и выдает ошибку.
Я пытаюсь явно запустить потребителя после загрузки кеша, однако я получаю исключения нулевого указателя.
@Configuration
public class KafkaConfig {
@Value("${kafka.server}")
String server;
@Value("${kafka.port}")
String port;
@Value("${kafka.group.id}")
String groupid;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server+":"+port);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put("security.protocol","SASL_PLAINTEXT");
// config.put("sasl.kerberos.service.name","kafka");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(false);
return factory;
}
}
KafkaListener
@Service
public class KafkaConsumer {
@Autowired
AggregationService aggregationService;
@Autowired
private KafkaListenerEndpointRegistry registry;
private final CounterService counterService;
public KafkaConsumer(CounterService counterService) {
this.counterService = counterService;
}
@KafkaListener(topics = "gliTransactionTopic", group = "gliDecoupling", id = "gliKafkaListener")
public boolean consume(String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset) throws ParseException {
System.out.println("Inside kafka listener :" + message+" partition :"+partition.toString()+" offset :"+offset.toString());
aggregationService.run();
return true;
}
}
service Для запуска остановка
@Service
public class DecouplingController {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public void stop() {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
.getListenerContainer("gliKafkaListener");
listenerContainer.stop();
}
public void start() {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
.getListenerContainer("gliKafkaListener");
listenerContainer.start();
}
}
основной метод
@SpringBootApplication
public class DecouplingApplication {
Ignite ignite;
static IgniteCache<Long, MappingsEntity> mappingsCache;
public static void main(String[] args) {
SpringApplication.run(DecouplingApplication.class, args);
Ignition.setClientMode(true);
Ignite ignite = Ignition.ignite("ignite");
loadCaches(ignite);
}
public static boolean loadCaches(Ignite ignite) {
mappingsCache = ignite.getOrCreateCache("MappingsCache");
mappingsCache.loadCache(null);
System.out.println("Data Loaded");
DecouplingController dc=new DecouplingController();
dc.start();
return true;
}
}
Ниже приведено исключение
Data Loaded
Exception in thread "main" java.lang.NullPointerException
at com.ignite.spring.decoupling.controller.DecouplingController.start(DecouplingController.java:126)
at com.ignite.spring.decoupling.DecouplingApplication.loadCaches(DecouplingApplication.java:64)
at com.ignite.spring.decoupling.DecouplingApplication.main(DecouplingApplication.java:37)