У меня есть приложение Spark, которое выбирает подмножество и выполняет некоторые операции над подмножеством. Нет никакой зависимости и взаимодействия между каждым подмножеством и его работой, поэтому я попытался использовать многопоточность, чтобы позволить им работать параллельно для повышения производительности. Код выглядит следующим образом:
Dataset<Row> fullData = sparkSession.read().json("some_path");
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Runnable> tasks = Lists.newArrayList();
for (int i = 1; i <= 50; i++) {
final int x = i;
tasks.add(() -> {
Dataset<Row> subset_1 = fullData.filter(length(col("name")).equalTo(x));
Dataset<Row> subset_2 = fullData.filter(length(col("name")).equalTo(x));
Dataset<Row> result = subset_1.join(subset_2, ...);
log.info("Res size is " + result.count()); // force Spark do the join operation
});
}
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, executor))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
executor.shutdown();
В пользовательском интерфейсе управления заданиями Spark я заметил, что эти 50 задач отправляются параллельно, но обработка по-прежнему блокируется, одна задача запускается до завершения другой. Как сделать так, чтобы несколько задач выполнялись параллельно, а не одна за другой?