Когда вы используете parallelStream
, вы используете потоки из собственного пула потоков Java. Вам необходимо использовать @Async
Spring для активации MODE_INHERITABLETHREADLOCAL
.
Чтобы использовать @Async
, сначала необходимо настроить пул потоков.
@Configuration
@EnableAsync
public class AsyncExecutionConfiguration extends AsyncConfigurerSupport {
@PostConstruct
protected void init() {
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
}
@Override
@Primary
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Copy the current RequestContext to each and every new @Async task
executor.setCorePoolSize(standardPoolConfigs.getCorePoolSize());
executor.setMaxPoolSize(standardPoolConfigs.getMaxPoolSize());
executor.setQueueCapacity(standardPoolConfigs.getQueueCapacity());
executor.setThreadPriority(standardPoolConfigs.getThreadPriority());
executor.setThreadNamePrefix("async-thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
Затем вы можете использовать CompletableFuture
для захвата результата.
@Async
public CompletableFuture<Color> getColor(String colorId) {
return CompletableFuture.completedFuture(this.colorRepository.findById(colorId));
}
Затем вы можете выполнять потоковую передачу таким образом.
@Service
public class ColorService {
@Resource
private ColorService self;
public Collection<Color> getCarColors(String carId) {
Queue<Color> colors = new ConcurrentLinkedQueue<>();
this.getCar(carId)
.getColors()
.stream()
.map(colorId -> self.getColor(colorId)
.thenAccept(colors::add))
.collect(CompletableFutures.joinList());
return colors;
}
}
Добавьте это к своему pom.xml
.
<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
<version>${completable-futures.version}</version>
</dependency>
Вместо того, чтобы делать public CompletableFuture<Color> getColor(String colorId)
, вы на самом деле можете по-прежнему использовать public Color getColor(String colorId)
даже без @Async
поверх него. Однако вам нужно изменить вызывающего абонента, чтобы использовать CompletableFuture.supplyAsync
вместо. Затем вы можете передать пул потоков, созданный в @Configuration
, на supplyAsync
. Я позволю вам исследовать этот путь;)