Spark Streaming - соединение с несколькими потоками kafka выполняется медленно - PullRequest
1 голос
/ 04 марта 2020

У меня есть 3 потока kafka, каждый из которых имеет по 600 тыс. Записей, для обработки простых соединений между потоками требуется более 10 минут.

Конфигурация Spark Cluster:

Spark Master UI

Вот так я читаю потоки кафки в tempviews в spark (scala)

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")

Я присоединяюсь к 3 ТАБЛИЦАМ, используя искровую искру sql

select COL1, COL2 from TABLE1   
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK

Выполнение задания:

Job UI

Не упускаю ли я какую-то конфигурацию на свече, на которую я должен обратить внимание?

Ответы [ 2 ]

0 голосов
/ 16 марта 2020

к сожалению, не было ни тестовых данных, ни ожидаемых вами результатов, чтобы я мог поиграть, поэтому я не могу дать точный правильный ответ.

@ Комментарий астероида действителен, как мы видим количество заданий для каждого этапа равно 1. Обычно поток Kafka использует приемник для использования темы; и каждый получатель создает только одну задачу. Один из подходов состоит в том, чтобы использовать несколько приемников / разделить разделы / увеличить ресурсы (количество ядер) для увеличения параллелизма.

Если это все еще не работает, другой способ - использовать Kafka API для createDirectStream. В соответствии с документацией https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html этот создает входной поток, который напрямую получает сообщения от Kafka Brokers без использования какого-либо получателя.

Я предварительно подготовил пример кода для создания прямого потока ниже. Возможно, вы захотите узнать об этом, чтобы настроить свои собственные предпочтения.

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "KAFKASERVER",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "startingOffsets" -> "earliest",
  "endingOffsets" -> "latest"
)

val topics = Array(TOPIC1)
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
val schema = StructType(StructField('data', StringType, True))
val df = spark.createDataFrame([], schema)
val dstream = stream.map(_.value())
dstream.forEachRDD(){rdd:RDD[String], time:Time} => {
    val tdf = spark.read.schema(schema).json(rdd)
    df = df.union(tdf)
    df.createOrReplaceTempView("TABLE1")
}

Некоторые материалы по теме:

https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-2/ (Прокрутите вниз до раздела Код потребителя Kafka . Другой раздел не имеет значения)

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html (Spark Do c для создания прямого потока)

Удачи!

0 голосов
/ 10 марта 2020

Я нахожу ту же проблему. И я обнаружил, что соединение между потоком и потоком требует больше памяти, как я представляю И проблема исчезнет, ​​когда я увеличу количество ядер на исполнителя.

...