Как получить пул из 3 потоков, используя несколько источников данных? - PullRequest
2 голосов
/ 27 июня 2019

Помогите пожалуйста.У меня есть класс, который отвечает за настройку базы данных:

@Getter
@Setter
@ConfigurationProperties(value = "spring")
public class DatabaseConfig {

    @NotEmpty
    private Map<String, Database> databases;

    @Getter
    @Setter
    public static class Database {
    @NotEmpty
    private String url;

    @NotEmpty
    private String username;

    private String password;

    @NotEmpty
    private String driverClassName;
    }
}

В моем файле application.yml есть 3 базы данных:

spring:
    H21:
      url: jdbc:h2:tcp://localhost/~/test
      username: sa
      password:
      driver-class-name: org.h2.Driver
    H22:
      url: jdbc:h2:tcp://localhost/~/test2
      username: sa
      password:
      driver-class-name: org.h2.Driver
    H23:
      url: jdbc:h2:tcp://localhost/~/test3
      username: sa
      password:
      driver-class-name: org.h2.Driver

У меня естьКарта, на которой при инициализации приложения (для этого я использую метод @ PostConstruct ) включаются все источники данных из файла application.yml.Хорошо.Все в порядке.Но у меня есть проблема.

В методе метода doCheck я создаю пул потоков = номер источника данных на карте (3 элемента) и передаю метод Map и вызываю метод для баз данных.

Строка template.query (query, ResultSet :: getRow) в executeQuery Метод должен вызываться параллельно для всех баз данных, а не последовательно.Для меня это очень важно.

Если в базах 3 файла application.yml не работает (приложение запускается и когда я отправляю запрос через Postman, программа долго ждет и ничего не происходит).Если в файле application.yml 2 базы данных, то все работает.Я читал в интернете, что parallelStream () создает только 2 пула потоков.Поэтому это работает для меня, когда в файле application.yml есть только 2 базы данных, и они работают параллельно.Но когда в файле application.yml есть 3 базы, он у меня не работает.

@Service
@Log
@RequiredArgsConstructor
public class DBServiceImpl implements DBService {

    private final DatabaseConfig databaseConfig;

    private Map<String, JdbcTemplate> templates;

    @PostConstruct
    public void init() {
    templates = databaseConfig.getDatabases()
            .entrySet()
            .stream()
            .collect(Collectors.toMap(Map.Entry::getKey, entry -> createJdbcTemplate(entry.getValue())));
    }

    private JdbcTemplate createJdbcTemplate(DatabaseConfig.Database database) {
    return new JdbcTemplate(DataSourceBuilder
            .create()
            .url(database.getUrl())
            .username(database.getUsername())
            .password(database.getPassword())
            .driverClassName(database.getDriverClassName())
            .build());
    }

    public synchronized PerformanceCheckResult doCheck(String query) throws InterruptedException, ExecutionException {
    PerformanceCheckResult checkResult = PerformanceCheckResult
            .builder()
            .queryExecutionResults(new CopyOnWriteArrayList<>())
            .checkStartedAt(System.currentTimeMillis())
            .build();

    ExecutorService executor = Executors.newFixedThreadPool(templates.size());

    List<QueryExecutionResult> queryExecutionResults = executor.submit(() -> templates
            .entrySet()
            .parallelStream()
            .map(entry -> performQuery(entry.getKey(), entry.getValue(), query))
            .collect(Collectors.toList())).get();

    checkResult.setQueryExecutionResults(queryExecutionResults);
    checkResult.setCheckEndedAt(System.currentTimeMillis());
    checkResult.setTotalCheckDurationTime(getDurationMillis(checkResult.getCheckStartedAt(), checkResult.getCheckEndedAt()));
    return checkResult;
    }

    private QueryExecutionResult performQuery(String databaseName, JdbcTemplate template, String query) {
    QueryExecutionResult executionResult = QueryExecutionResult
            .builder()
            .databaseName(databaseName)
            .queryExecutionStartedAt(System.currentTimeMillis())
            .build();
    template.query(query, ResultSet::getRow);
    executionResult.setQueryExecutionEndedAt(System.currentTimeMillis());
    executionResult.setQueryExecutionDurationTime(
            getDurationMillis(executionResult.getQueryExecutionStartedAt(), executionResult.getQueryExecutionEndedAt()));
    return executionResult;
    }

    private Long getDurationMillis(Long start, Long end) {
    return end - start;
    }

}   

Мне очень нужна ваша помощь.Спасибо.

1 Ответ

0 голосов
/ 27 июня 2019

Вы должны использовать invokeAll на исполнителе, а не submit. Этот код просто передает одну задачу исполнителю, который затем вызывает parallelStream:

List<QueryExecutionResult> queryExecutionResults = executor.submit(() -> templates
        .entrySet()
        .parallelStream()
        .map(entry -> performQuery(entry.getKey(), entry.getValue(), query))
        .collect(Collectors.toList())).get();

Вам не нужно parallelStream; просто создайте коллекцию задач и используйте метод исполнителя invokeAll, который попытается распараллелить как можно больше.

Вот набросок того, что я имею в виду:

class Query implements Callable<String> {
    public String call() {
        String s = ""; // do some database query that returns a string
        return s;
    }
}

public static void main(String[] args) {
    var queries = List.of(new Query(...), ...);
    var exec = Executors.newFixedThreadPool(queries.size());
    var futures = exec.invokeAll(queries);

    for (f : futures) {
        f.get();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...