Я использую стартер весенней загрузки 2.2.0.RELEASE для моего приложения. У меня есть один потребитель кафки. Теперь я хочу внедрить мои пружинные сущности в моего потребителя kafka (потребитель kafka не управляется контейнером spring).
Я попробовал ApplicationContextAware, но это не помогло мне. Я получаю applicationContext как ноль в моем потребителе кафки, и, следовательно, я не могу получить бин из контейнера Spring. Первый раз applicationContext устанавливается правильно, но когда контекст загружается во второй раз, он устанавливается равным нулю. Ниже приведена краткая информация о моем приложении
@SpringBootApplication
@ComponentScan({"com.xyz.config_assign.service"})
public class ConfigAssignServer {
private static Logger log = LoggerFactory.getLogger(ConfigAssignServer.class);
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(ConfigAssignServer.class, args);
log.info("Started ConfigAssign Server!!! AppName= {} ", applicationContext.getApplicationName());
QkafkaClient.loadConfiguration();
}
}
Все мои классы приложений представлены в com.xyz.config_assign.service, поэтому проблем со сканированием компонентов не возникает. Это работало хорошо, прежде чем я добавил Kafka customer
My ApplicationContextProvider, который использует известный ApplicationContextAware
@Component
public class ApplicationContextProvider implements ApplicationContextAware{
public static ApplicationContext applicationContext;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ApplicationContextProvider.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}
А теперь мой kafkaconsumer
public class ConfigAssignmentAssetTagChangeKafkaTopicProcessor implements BatchTopicProcessor<String, String> {
private Logger log = LoggerFactory.getLogger(ConfigAssignmentAssetTagChangeKafkaTopicProcessor.class);
private AssignConfigServiceImpl assignConfigServiceImpl;
public ConfigAssignmentAssetTagChangeKafkaTopicProcessor(){
ApplicationContext applicationContext = ApplicationContextProvider.getApplicationContext();
assignConfigServiceImpl = applicationContext.getBean(AssignConfigServiceImpl.class);
}
@Override
public void handleError(ConsumerRecords<String, String> arg0, Exception arg1) {
// TODO Auto-generated method stub
}
@Override
public void process(ConsumerRecords<String, String> records, long arg1) {}
}
И сервис, который я хочу внедрить в kafka - это
@Service
public class AssignConfigServiceImpl implements AssignConfigService {
private Logger log = LoggerFactory.getLogger(AssignConfigServiceImpl.class);
@Autowired
private ConfigProfileDBService dbService;
public boolean assignConfigToAgents(List<UUID> agentList, long customerId) {
List<AgentConfigurationProfile> configProfiles =
dbService.getConfigurationProfilesForCustomer(customerId);
boolean isAssignSuccessful = assignConfigToAgents(agentList, customerId, configProfiles);
log.info("Config Assignment status ={}", isAssignSuccessful);
return isAssignSuccessful;
}
Другая услуга, которую я использовал,
@Service
public class ConfigProfileDBService implements DBService {
private static Logger log = LoggerFactory.getLogger(ConfigProfileDBService.class);
@Autowired
private JdbcTemplate jdbcTemplate;
public List<AgentConfigurationProfile> getConfigurationProfilesForCustomer(Long customerId) {
List<AgentConfigurationProfile> agentConfigList;
}
}
Может кто-нибудь, пожалуйста, дайте мне знать, что пошло не так, я пробовал несколько онлайн-решений но у меня не получилось. Заранее спасибо. Примечание. Я не инициализировал ни один класс с помощью оператора new.