Spring Autowired Shared Queue NullPointerException - PullRequest
       15

Spring Autowired Shared Queue NullPointerException

0 голосов
/ 17 сентября 2018

Я использую Spring впервые и пытаюсь реализовать общую очередь, в которой слушатель Kafka помещает сообщения в общую очередь, а ThreadManager, который в конечном итоге будет выполнять что-то многопоточное, с элементами, которые он удаляет из общей очереди.Вот моя текущая реализация:

Слушатель:

@Component
public class Listener {
    @Autowired
    private QueueConfig queueConfig;
    private ExecutorService executorService;
    private List<Future> futuresThread1 = new ArrayList<>();
    public Listener() {
        Properties appProps = new AppProperties().get();
        this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
    }
    //TODO: how can I pass an approp into this annotation?
    @KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
    public void listener(ConsumerRecord<?, ?> record) throws InterruptedException, ExecutionException
        {
            futuresThread1.add(executorService.submit(new Runnable() {
                    @Override public void run() {
                        try{
                            queueConfig.blockingQueue().put(record);
//                            System.out.println(queueConfig.blockingQueue().take());
                        } catch (Exception e){
                            System.out.print(e.toString());
                        }

                    }
            }));
        }
}

Очередь:

@Configuration
public class QueueConfig {
    private Properties appProps = new AppProperties().get();

    @Bean
    public BlockingQueue<ConsumerRecord> blockingQueue() {
        return new ArrayBlockingQueue<>(
                Integer.parseInt(appProps.getProperty("blockingQueueSize"))
        );
    }
}

ThreadManager:

@Component
public class ThreadManager {
    @Autowired
    private QueueConfig queueConfig;
    private int threads;

    public ThreadManager() {
        Properties appProps = new AppProperties().get();
        this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
    }


    public void run() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        try {
            while (true){
                queueConfig.blockingQueue().take();
            }
        } catch (Exception e){
            System.out.print(e.toString());
            executorService.shutdownNow();
            executorService.awaitTermination(1, TimeUnit.SECONDS);
        }
    }
}

Наконец,основной поток, из которого все начинается:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        SpringApplication.run(SourceAccountListenerApp.class, args);
        ThreadManager threadManager = new ThreadManager();
        try{
            threadManager.run();
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    }
}

Проблема

Когда я запускаю это в отладчике, я могу сказать, что слушатель добавляет вещи в очередь,Когда ThreadManager снимает общую очередь, он сообщает, что очередь пуста, и я получаю NPE.Кажется, что автоматическое подключение не работает для подключения очереди, которую слушатель использует, к ThreadManager.Любая помощь приветствуется.

Ответы [ 2 ]

0 голосов
/ 18 сентября 2018

Это проблема:

ThreadManager threadManager = new ThreadManager();

Поскольку вы создаете экземпляр вручную, вы не можете использовать DI, предоставленный Spring.

Одно простое решение -реализовать CommandLineRunner, который будет выполняться после полной инициализации SourceAccountListenerApp:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        SpringApplication.run(SourceAccountListenerApp.class, args);            
    }

    // Create the CommandLineRunner Bean and inject ThreadManager 
    @Bean
    CommandLineRunner runner(ThreadManager manager){
        return args -> {
            manager.run();
        };
    }

}
0 голосов
/ 18 сентября 2018

Вы используете программный Spring, так называемый JavaConfig, способ настройки bean-компонентов Spring (классы, аннотированные @Configuration, с методами, аннотированными @Bean).Обычно при запуске приложения Spring вызывает эти @Bean методы изнутри и регистрирует их в контексте своего приложения (если область действия - singleton - по умолчанию - это произойдет только один раз!).Нет необходимости вызывать эти @Bean методы в любом месте вашего кода напрямую ... вы не должны, в противном случае вы получите отдельный, свежий экземпляр, который, возможно, не полностью настроен!

Вместо этого вам нужно ввестиBlockingQueue<ConsumerRecord>, который вы «настроили» в вашем методе QueueConfig.blockingQueue() на ваш ThreadManager.Поскольку очередь, по-видимому, является обязательной зависимостью для работы ThreadManager, я бы позволил Spring внедрить ее через конструктор:

@Component
public class ThreadManager {

    private int threads;

    // add instance var for queue...
    private BlockingQueue<ConsumerRecord> blockingQueue;

    // you could add @Autowired annotation to BlockingQueue param,
    // but I believe it's not mandatory... 
    public ThreadManager(BlockingQueue<ConsumerRecord> blockingQueue) {
        Properties appProps = new AppProperties().get();
        this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
        this.blockingQueue = blockingQueue;
    }

    public void run() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        try {
            while (true){
                this.blockingQueue.take();
            }
        } catch (Exception e){
            System.out.print(e.toString());
            executorService.shutdownNow();
            executorService.awaitTermination(1, TimeUnit.SECONDS);
        }
    }
}

Просто чтобы прояснить еще одну вещь: по умолчанию имя метода aМетод @Bean используется Spring для присвоения этому бину уникального идентификатора (имя метода == идентификатор бина).Таким образом, ваш метод называется blockingQueue, это означает, что ваш экземпляр BlockingQueue<ConsumerRecord> также будет зарегистрирован с идентификатором blockingQueue в контексте приложения.Новый параметр конструктора также называется blockingQueue, а его тип соответствует BlockingQueue<ConsumerRecord>.Упрощенно, это один из способов, которым Spring ищет и вводит зависимости / провода.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...