Моя проблема в том, что у меня есть несколько запланированных пулов FAIR (всего 3), которые состоят из более чем 100 заданий потоковой передачи с искровой структурой в каждом пуле, в общей сложности более 300 заданий потоковой передачи с искровой структурой. Эти задания динамически создаются приложением spark и автоматически назначаются одному из 3 пулов в зависимости от конфигурации. Пулы появляются правильно, и я вижу, что каждое задание также правильно назначается в соответствующий пул. Проблема заключается в том, что время запуска всех заданий структурированной потоковой передачи занимает довольно много времени для планирования (буквально часы для всех запланированных заданий структурированной потоковой передачи). Когда они запускаются, они корректно работают в запланированном режиме FAIR, но время запуска является причиной проблемы. Мне было интересно, если моя реализация неверна, или возможно ли запустить несколько заданий структурированной потоковой передачи asyn c без больших затрат на запуск.
Прямо сейчас я oop через все конфигурации (син c) и запустите шаблон c для l oop, чтобы запустить все мои задания структурированной потоковой передачи на основе определенной конфигурации для всех планов. Каждый план структурированной потоковой передачи запускается часами (в конце концов, по расписанию) из SPARKUI после того, как они хотя бы один раз запускаются. Мне интересно, могу ли я asyn c вызвать job.run () для того же сеанса запуска и заставить его немедленно планировать задания, даже если задания ставятся в очередь.
Сейчас у меня в кластере более 3000 ядер и около 700 заданий потоковой передачи с искровым структурированием, каждое из которых имеет аналогичную реализацию, подобную приведенной ниже.
for(Plan plan : plans) {
// This line sets each Plan to the appropriate scheduler pool (one of the three above)
sparkSession.sparkContext().setLocalProperty("spark.scheduler.pool", plan.name());
for(OptimizationPlan op : plan.createOptimization()){
for(BaseStreamingJob job : baseStreamingJobs){
// This executes the code below
job.run(op);
}
}
}
sparkSession.streams().awaitAnyTermination();
@Override
public void run(OptimizationPlan op) {
Dataset<Row> ds = sparkSession.readStream()
.option("maxFilesPerTrigger", ac.getMaxFilesPerTrigger())
.option("latestFirst", "false")
.option("header", "true")
.option("sep", ",")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("inferSchema", "false")
.schema(op.getSchema())
.csv(ac.getSourceDirPath().concat(op.getPath()))
.withColumn("filename", callUDF("getFileName", input_file_name()))
.withColumn("spark_processed_time", current_timestamp())
.coalesce(op.getNumberOfPartitions());
ds
.writeStream()
.queryName(op.getQueryname())
.format("com.sample.libs.PulsarSinkProvider")
.outputMode(OutputMode.Append())
.option("topic", op.getTopic())
.option("broker", pc.getPulsar_service())
.option("tlscert", pc.getPulsar_tlscert())
.option("tlskey", pc.getPulsar_tlskey())
.option("tlscacert", pc.getPulsar_cacert())
.option("checkpointLocation", ac.getCheckpointDirPath().concat("/" + op.getQueryname()))
.option("CryptoKeyReader_public", pc.getPulsar_pubkey())
.option("CryptoKeyReader_private", pc.getPulsar_privkey())
.start();
}
Конфигурация пула
<?xml version="1.0"?>
<allocations>
<pool name="Pool1">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
</pool>
<pool name="Pool2">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
</pool>
<pool name="Pool3">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
</pool>
</allocations>