Как использовать очередь блокировки в Spring Boot? - PullRequest
0 голосов
/ 07 мая 2020

Я пытаюсь использовать BlockingQueue внутри Spring Boot. Мой дизайн был таким: пользователь отправляет запрос через контроллер, а контроллер, в свою очередь, помещает некоторые объекты в очередь блокировки. После этого потребитель должен иметь возможность брать объекты и обрабатывать их дальше.

Я использовал Asny c, ThreadPool и EventListener. Однако с помощью приведенного ниже кода я обнаружил, что потребительский класс не использует объекты. Не могли бы вы указать, как улучшить?

Конфигурация очереди

@Bean
public BlockingQueue<MyObject> myQueue() {
    return new PriorityBlockingQueue<>();
}

@Bean
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(3);
    executor.setMaxPoolSize(3);
    executor.setQueueCapacity(10);
    executor.setThreadNamePrefix("Test-");
    executor.initialize();
    return executor;
}

Rest Controller

@Autowired
BlockingQueue<MyObject> myQueue;

@RequestMapping(path = "/api/produce")
public void produce() {
    /* Do something */
    MyObject myObject = new MyObject();
    myQueue.put(myObject);
}

Consumer Class

@Autowired
private BlockingQueue<MyObject> myQueue;

@EventListener
public void onApplicationEvent(ContextRefreshedEvent event) {
    consume();
}

@Async
public void consume() {
    while (true) {
        try {
            MyObject myObject = myQueue.take();
        }
        catch (Exception e) {
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 11 мая 2020

В конце концов, я придумал это решение.

Rest Controller

@Autowired
BlockingQueue<MyObject> myQueue;

@RequestMapping(path = "/api/produce")
public void produce() {
    /* Do something */
    MyObject myObject = new MyObject();
    myQueue.put(myObject);
    Consumer.consume();
}

Это немного странно, потому что вам нужно сначала поставить объект в очередь самостоятельно, а затем использовать объект самостоятельно. Любые предложения по улучшению приветствуются.

0 голосов
/ 07 мая 2020

Ваша идея использует Queue для хранения сообщений, потребитель слушает spring events и потребляет. Я не видел, чтобы ваш код действительно опубликовал sh событие, просто сохраните их в queue. Если вы хотите использовать Spring Events, производителям может понравиться следующее:

@Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void doStuffAndPublishAnEvent(final String message) {
        System.out.println("Publishing custom event. ");
        CustomSpringEvent customSpringEvent = new CustomSpringEvent(this, message);
        applicationEventPublisher.publishEvent(customSpringEvent);
    }

проверьте это do c

Если вы все еще хотите использовать BlockingQueue, ваш потребитель должен быть запущенным потоком, постоянно ожидающим задач в очереди, например:

public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    public void run() {
        try {
            while (true) {
                Integer number = queue.take(); // always waiting 
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

может проверить этот пример кода

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...