к сожалению, не было ни тестовых данных, ни ожидаемых вами результатов, чтобы я мог поиграть, поэтому я не могу дать точный правильный ответ.
@ Комментарий астероида действителен, как мы видим количество заданий для каждого этапа равно 1. Обычно поток Kafka использует приемник для использования темы; и каждый получатель создает только одну задачу. Один из подходов состоит в том, чтобы использовать несколько приемников / разделить разделы / увеличить ресурсы (количество ядер) для увеличения параллелизма.
Если это все еще не работает, другой способ - использовать Kafka API для createDirectStream. В соответствии с документацией https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html этот создает входной поток, который напрямую получает сообщения от Kafka Brokers без использования какого-либо получателя.
Я предварительно подготовил пример кода для создания прямого потока ниже. Возможно, вы захотите узнать об этом, чтобы настроить свои собственные предпочтения.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "KAFKASERVER",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"startingOffsets" -> "earliest",
"endingOffsets" -> "latest"
)
val topics = Array(TOPIC1)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val schema = StructType(StructField('data', StringType, True))
val df = spark.createDataFrame([], schema)
val dstream = stream.map(_.value())
dstream.forEachRDD(){rdd:RDD[String], time:Time} => {
val tdf = spark.read.schema(schema).json(rdd)
df = df.union(tdf)
df.createOrReplaceTempView("TABLE1")
}
Некоторые материалы по теме:
https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-2/ (Прокрутите вниз до раздела Код потребителя Kafka . Другой раздел не имеет значения)
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html (Spark Do c для создания прямого потока)
Удачи!