Передать блокировку Queue Spring KafkaListener - PullRequest
0 голосов
/ 17 сентября 2018

Я новичок в Java, Spring и Kafka в целом. Вот ситуация:

Я использовал аннотации @KafkaListener для создания Потребителя Kafka, который выглядит следующим образом:

public class Listener {

private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
    Properties appProps = new AppProperties().get();
    this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record, ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue) throws InterruptedException, ExecutionException
    {
        futuresThread1.add(executorService.submit(new Runnable() {
                @Override public void run() {
                    System.out.println(record);
                    arrayBlockingQueue.add(record);
                }
        }));
    }

}

Я добавил параметр ArrayBlockingQueue для слушателя, к которому я хотел бы добавить сообщения от Кафки.

Проблема, с которой я столкнулся, заключается в том, что я не могу понять, как я на самом деле передаю ArrayBlockingQueue в слушатель, потому что Spring обрабатывает создание и запуск слушателя за кулисами.

Мне нужна эта очередь блокировки, чтобы другой объект за пределами слушателя мог получить доступ к сообщениям и поработать с ним. Например, в моем основном:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        Properties appProps = new AppProperties().get();
        ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
           Integer.parseInt(appProps.getProperty("blockingQueueSize"))
        );
        //TODO: This starts my listener. How do I pass the queue to it?
        SpringApplication.run(SourceAccountListenerApp.class, args);
    }
}

1 Ответ

0 голосов
/ 17 сентября 2018

Существует множество способов объявить очередь блокировки как bean-компонент.

Один пример, main:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        SpringApplication.run(SourceAccountListenerApp.class, args);
    }

    @Bean
    public ArrayBlockingQueue arrayBlockingQueue() {
        Properties appProps = new AppProperties().get();
        ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
           Integer.parseInt(appProps.getProperty("blockingQueueSize"))
        );
        return arrayBlockingQueue;
    }
}

Слушатель:

public class Listener {

    @Autowired
    ArrayBlockingQueue arrayBlockingQueue;
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...