Я пытаюсь создать приложение весенней загрузки с kafka, которое имеет несколько классов, одним из которых является отправка сообщения kafka topi c. Кафка и зоокейпер находятся на localhost. У меня есть класс Sender, который инициализирует мой kafkaTemplate.
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private String topic = "Kafka_Example";
public void send(String message){
kafkaTemplate.send(topic, message);
}
}
У меня есть другой класс FileChangeListenerService, работа которого заключается в отправке сообщения kafka в topi c при изменении набора файлов в каталоге.
@Service
public class FileChangeListenerService implements FileChangeListener {
private Logger logger = LoggerFactory.getLogger(FileChangeListenerService.class);
@Autowired
private Sender sender;
public FileChangeListenerService() {
}
@Override
public void onChange(Set<ChangedFiles> changeSet) {
for (ChangedFiles cfiles : changeSet) {
for (ChangedFile cfile : cfiles.getFiles()) {
if ((cfile.getType().equals(Type.MODIFY) || cfile.getType().equals(Type.ADD)) && !isLocked(cfile.getFile().toPath())) {
logger.info("New File Found by file watcher: " + cfile.getFile().getName() + " . Path: "
+ cfile.getFile().getAbsolutePath());
try {
sender.send("this is test message1");
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
}
private boolean isLocked(Path path) {
try (FileChannel ch = FileChannel.open(path, StandardOpenOption.WRITE); FileLock lock = ch.tryLock()) {
return lock == null;
} catch (IOException e) {
return true;
}
}
}
Проблема заключается в том, что всякий раз, когда я пытаюсь отправить сообщение, поле отправителя, которое я объявил автоматически подключенным, имеет значение null, что дает исключение NullPointerException при попытке доступа к сообщению отправки шаблона kafka. Даже если я создаю kafkaTemplate в классе FileChangeListenerService и пытаюсь отправить оттуда сообщение, все равно возникает исключение нулевого указателя. Подводя итог, пружинный контейнер не создает объект kafkaTemplate.
У кого-нибудь есть идеи, что здесь может быть не так?