Если данные в разных темах одинаковы и у вас одинаковая логика обработки при использовании данных, вы можете использовать разные темы в одном потоке и выполнять агрегацию. Если логика обработки отличается для разных тем, задайте concurrentThreads как 4, а затем выполните агрегацию среди 4 потоков. Вы можете проверить документацию spark структурированной потоковой передачи на предмет использования нескольких тем.
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
<--- your aggregation logic here --->