Моя цель состоит в том, чтобы запустить несколько моделей регрессионных мер искры мл (1000 раз) для одного набора данных, и я хочу сделать это, используя zio вместо будущего, потому что он работает слишком медленно. Ниже приведен рабочий пример использования Future. Отдельный список ключей используется для фильтрации многораздельного набора данных по ключу и запуска модели. Я настроил пул потоков с 8 исполнителями для управления им, но он быстро снижает производительность.
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.duration._
import org.apache.spark.sql.SaveMode
val pool = Executors.newFixedThreadPool(8)
implicit val xc: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool)
case class Result(key: String, coeffs: String)
try {
import spark.implicits._
val tasks = {
for (x <- keys)
yield Future {
Seq(
Result(
x.group,
runModel(input.filter(col("group")===x)).mkString(",")
)
).toDS()
.write.mode(SaveMode.Overwrite).option("header", false).csv(
s"hdfs://namenode:8020/results/$x.csv"
)
}
}.toSeq
Await.result(Future.sequence(tasks), Duration.Inf)
}
finally {
pool.shutdown()
pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
}
Я пытался реализовать это в zio, но я не знаю, как реализовать очереди и установить лимит исполнителей, как в фьючерсах.
Ниже моя неудачная попытка до сих пор ...
import zio._
import zio.console._
import zio.stm._
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
import org.apache.spark.sql.functions.col
//example data/signatures
case class ModelResult(key: String, coeffs: String)
case class Data(key: String, sales: Double)
val keys: Array[String] = Array("100_1", "100_2")
def runModel[T](ds: Dataset[T]): Vector[Double]
object MyApp1 extends App {
val spark = SparkSession
.builder()
.getOrCreate()
import spark.implicits._
val input: Dataset[Data] = Seq(Data("100_1", 1d), Data("100_2", 2d)).toDS
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
for {
queue <- Queue.bounded[Int](8)
_ <- ZIO.foreach(1 to 8) (i => queue.offer(i)).fork
_ <- ZIO.foreach(keys) { k => queue.take.flatMap(_ => readWrite(k, input, queue)) }
} yield 0
}
def writecsv(k: String, v: String) = {
Seq(ModelResult(k, v))
.toDS
.write
.mode(SaveMode.Overwrite).option("header", value = false)
.csv(s"hdfs://namenode:8020/results/$k.csv")
}
def readWrite[T](key: String, ds: Dataset[T], queue: Queue[Int]): ZIO[ZEnv, Nothing, Int] = {
(for {
result <- runModel(ds.filter(col("key")===key)).mkString(",")
_ <- writecsv(key, result)
_ <- queue.offer(1)
_ <- putStrLn(s"successfully wrote output for $key")
} yield 0)
}
}
//to run
MyApp1.run(List[String]())
Каков наилучший способ решить эту проблему в zio?