Я изучал пул java fork / join и узнал, что пул fork-join
- это более эффективный способ одновременного выполнения задачи, так как он использует work-stealing algorithm
.На данный момент мы использовали ThreadPool
Executor, используя TaskExecutor
, предоставленный Spring Boot
.Теперь я хотел бы использовать пул Fork-Join вместо TaskExecutor.Проблема в том, что задача в Fork-Join должна быть рекурсивной.
public class DedupeConsumerService {
final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private PropertyConfig config;
@Autowired
private ApplicationContext applicationContext;
public void consume() {
String topic = config.getDedupServiceConsumerTopic();
String consGroup = config.getDedupServiceConsGroup();
Properties props = new Properties();
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "20000");
props.put("max.poll.records", "10000");
KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);
logger.info("Dedupe Kafka Consumer Initialized......");
try {
while (true) {
ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
if (records.count() > 0) {
logger.debug(">>records count = " + records.count());
Date startTime = Calendar.getInstance()
.getTime();
for (ConsumerRecord<String, AvroSyslogMessage> record : records) {
logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key() + " : record.partition() = " + record.partition() + " : record.topic() = " + record.topic() + " : record.timestamp() = " + record.timestamp());
AvroSyslogMessage avroMessage = record.value();
logger.debug("avro Message = " + avroMessage);
DedupeFilterProcessThread dedupeProcessThread = applicationContext.getBean(DedupeFilterProcessThread.class);
dedupeProcessThread.setMessage(avroMessage);
taskExecutor.execute(dedupeProcessThread);
consumer.commitSync();
}
Date endTime = Calendar.getInstance()
.getTime();
long durationInMilliSec = endTime.getTime() - startTime.getTime();
logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
}
}
} catch (Throwable e) {
logger.error("Error occured while processing message", e);
e.printStackTrace();
} finally {
logger.debug("dedupe kafka consume is closing");
consumer.close();
}
}
}
Может ли кто-нибудь помочь мне в решении этой проблемы.