Я интегрировал Spark Structured Streaming с Kafka, в котором я слушаю 2 темы
def main(args: Array[String]): Unit = {
val schema = StructType(
List(
StructField("gatewayId", StringType, true),
StructField("userId", StringType, true)
)
)
val spark = SparkSession
.builder
.master("local[4]")
.appName("DeviceAutomation")
.getOrCreate()
val dfStatus = spark.readStream.
format("kafka").
option("subscribe", "utility-status").
option("kafka.bootstrap.servers", "localhost:9092").
option("startingOffsets", "earliest")
.load()
val dfCritical = spark.readStream.
format("kafka").
option("subscribe", "utility-critical").
option("kafka.bootstrap.servers", "localhost:9092").
option("startingOffsets", "earliest")
.load()
}
Поскольку у меня есть еще несколько тем для перечисления и выполнения различных операций, я хотел бы переместить каждую тему в отдельный класс длялучшая ясностьВозможно ли и как их инициировать из основного класса?