Spark Streaming Job работает очень медленно - PullRequest
0 голосов
/ 17 апреля 2019

enter image description hereenter image description here Я выполняю задание потоковой передачи искры в моем регионе, и для одной партии требуется приблизительно 4-5 минут. Может кто-нибудь подсказать, в чем может быть проблема с приведенным ниже кодом?

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import avg, window, from_json, from_unixtime, unix_timestamp
import uuid

schema = StructType([
    StructField("source", StringType(), True),
    StructField("temperature", FloatType(), True),
    StructField("time", StringType(), True)
])

spark = SparkSession \
    .builder.master("local[8]") \
    .appName("poc-app") \
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 5)    

df1 = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "poc") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

df2 = df1.select(from_json("value", schema).alias(
    "sensors")).select("sensors.*")

df3=df2.select(df2.source,df2.temperature,from_unixtime(unix_timestamp(df2.time, 'yyyy-MM-dd HH:mm:ss')).alias('time'))
df4 = df3.groupBy(window(df3.time, "2 minutes","1 minutes"), df3.source).count()

query1 = df4.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/tmp/temporary-" + str(uuid.uuid4())) \
.start() 

query1.awaitTermination()

1 Ответ

1 голос
/ 17 апреля 2019

с помощью мини-пакетной потоковой передачи вы обычно хотите уменьшить количество выходных разделов ... так как вы выполняете некоторую агрегацию (широкое преобразование) каждый раз, когда вы сохраняете это, по умолчанию будет 200 разделов на диск из-за

spark.conf.get("spark.sql.shuffle.partitions")

попробуйте уменьшить этот конфиг до меньшего выходного раздела и поместить его в начало вашего кода, чтобы при выполнении агрегации вывести 5 разделов на диск

spark.conf.set("spark.sql.shuffle.partitions", 5)

, вы также можете почувствоватьпросматривая количество файлов в каталоге выходного потока записи, а также определяя количество разделов в агрегированном df

df3.rdd.getNumPartitions()

btw, так как вы используете локальный режим для тестирования, попробуйте установить local [8]вместо локального [4], поэтому он увеличивает параллелизм на ваших ядрах процессора (я предполагаю, что у вас есть 4)

...