JpaRepository не подключается автоматически в пользовательской RichSinkFunction - PullRequest
0 голосов
/ 11 октября 2018

Я создал пользовательскую функцию Flink RichSinkFunction и попытался автоматически связать JpaRepository в этом пользовательском классе, но я постоянно получаю NullPointerException.Если я автоматически подключу его в конструкторе, я увижу, что JpaRepo был найден, но когда вызывается метод invoke, я получаю NullPointerException.

public interface MessageRepo extends JpaRepository<Message, Long> {
}


@Component
public class MessageSink extends RichSinkFunction<Message> {

    private final transient MessageRepo messageRepo; //if i don't make this transient, i get the error message "The implementation of the RichSinkFunction is not serializable"

    @Autowired
    public MessageSink(MessageRepo messageRepo){
        this.messageRepo = messageRepo;
        messageRepo.save(new Message()); //no issues when i do this
    }

    @Override
    public void invoke(Message message, Context context) {
         // the message is not null
         messageRepo.save(message); // NPE
    }

Кто-нибудь сталкивался с такой проблемой раньше?Похоже, что метод вызова MessageSink вызывается в отдельном потоке, поэтому messageRepo всегда равен null?Другие части моего кода могут использовать MessageRepo, кроме случаев, когда у меня есть свой собственный приемник.

Ответы [ 2 ]

0 голосов
/ 06 марта 2019

Я думаю, что проблема здесь в том, что flink необходимо сериализовать пользовательскую функцию приемника, прежде чем распространять ее своим работникам.

Отметив транзит MessageRepo, это означает, что поле будет пустым, когда рабочий узел прекратит использование этой функции.Обычно вы инициализируете транзитную зависимость в открытой функции, которая будет вызываться после десериализации объекта.

0 голосов
/ 11 октября 2018

Я не совсем уверен в причине, но я думаю, что весенняя загрузка отдает приоритет вашим классам обслуживания, когда дело доходит до инъекции bean-компонентов.Я столкнулся с подобной проблемой, когда пытался написать слушателя для моего класса Entity.Вот как я это решил.Создайте класс компонента, который реализует интерфейс ApplicationContextAware и переопределите метод setApplicationContext.В вашем классе есть статический метод с именем getBean, который будет автоматически подключаться при первом запросе.Пример кода ---

@Component
public class SpringBeansUtil implements ApplicationContextAware {
private static ApplicationContext context;

    @SuppressWarnings("static-access")
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) 
    throws BeansException {
    this.context = applicationContext;
}

    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }
}

А затем просто получите бин в своем коде ------- >> ClassName referenceName = (ClassName) SpringBeansUtil.getBean (ClassName.class);

...