Замена пула потоков с помощью forkjoinpool Java 8 - PullRequest
0 голосов
/ 04 октября 2018

Я изучал пул java fork / join и узнал, что пул fork-join - это более эффективный способ одновременного выполнения задачи, так как он использует work-stealing algorithm.На данный момент мы использовали ThreadPool Executor, используя TaskExecutor, предоставленный Spring Boot.Теперь я хотел бы использовать пул Fork-Join вместо TaskExecutor.Проблема в том, что задача в Fork-Join должна быть рекурсивной.

public class DedupeConsumerService {

    final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);

    @Autowired
    private TaskExecutor taskExecutor;

    @Autowired
    private PropertyConfig config;

    @Autowired
    private ApplicationContext applicationContext;

    public void consume() {

        String topic = config.getDedupServiceConsumerTopic();
        String consGroup = config.getDedupServiceConsGroup();

        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "20000");
        props.put("max.poll.records", "10000");

        KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);

        logger.info("Dedupe Kafka Consumer Initialized......");

        try {
            while (true) {
                ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
                if (records.count() > 0) {
                    logger.debug(">>records count = " + records.count());
                    Date startTime = Calendar.getInstance()
                        .getTime();
                    for (ConsumerRecord<String, AvroSyslogMessage> record : records) {
                        logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key() + " : record.partition() = " + record.partition() + " : record.topic() = " + record.topic() + " : record.timestamp() = " + record.timestamp());

                        AvroSyslogMessage avroMessage = record.value();
                        logger.debug("avro Message = " + avroMessage);

                        DedupeFilterProcessThread dedupeProcessThread = applicationContext.getBean(DedupeFilterProcessThread.class);
                        dedupeProcessThread.setMessage(avroMessage);
                        taskExecutor.execute(dedupeProcessThread);
                        consumer.commitSync();
                    }

                    Date endTime = Calendar.getInstance()
                        .getTime();
                    long durationInMilliSec = endTime.getTime() - startTime.getTime();
                    logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);

                }
            }

        } catch (Throwable e) {
            logger.error("Error occured while processing message", e);
            e.printStackTrace();
        } finally {
            logger.debug("dedupe kafka consume is closing");
            consumer.close();
        }

    }

}

Может ли кто-нибудь помочь мне в решении этой проблемы.

...