Проблема с производительностью Spark - запись разделов в S3 как отдельных файлов - PullRequest
1 голос
/ 01 августа 2020

Я выполняю искровое задание, задача которого - сканировать большой файл и разбивать его на файлы меньшего размера. Файл имеет формат Json Lines, и я пытаюсь разделить его по определенному столбцу ( id ) и сохранить каждый раздел как отдельный файл на S3. Размер файла составляет около 12 ГБ, но существует около 500000 различных значений id . Запрос занимает почти 15 часов. Что я могу сделать для повышения производительности? Spark - плохой выбор для такой задачи? Обратите внимание, что у меня есть возможность убедиться, что источник представляет собой фиксированное количество строк на id .

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.sql.functions import udf, substring, instr, locate
from datetime import datetime, timedelta

    
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Get parameters that were passed to the job
args = getResolvedOptions(sys.argv, ['INPUT_FOLDER', 'OUTPUT_FOLDER', 'ID_TYPE', 'DATASET_DATE'])

id_type = args["ID_TYPE"]
output_folder = "{}/{}/{}".format(args["OUTPUT_FOLDER"], id_type, args["DATASET_DATE"])
input_folder = "{}/{}/{}".format(args["INPUT_FOLDER"], id_type, args["DATASET_DATE"])


INS_SCHEMA = StructType([
    StructField("camera_capture_timestamp", StringType(), True),
    StructField(id_type, StringType(), True),
    StructField("image_uri", StringType(), True)
])


data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA)

data = data.withColumn("fnsku_1", F.col("fnsku"))

data.coalesce(1).write.partitionBy(["fnsku_1"]).mode('append').json(output_folder)   

Я также пробовал переразбивать вместо слияния.

Я использую AWS Клей

Ответы [ 2 ]

3 голосов
/ 03 августа 2020

Пожалуйста, рассмотрите следующие варианты как один из возможных. Было бы здорово увидеть, помогло ли это :)

Во-первых, если вы объедините, как сказал @Lamanus в комментариях, это означает, что вы уменьшите количество разделов, следовательно, также уменьшите количество задач записи , следовательно, перетасуйте все данные в 1 задачу. Это может быть первый фактор, который нужно улучшить.

Чтобы решить проблему, ie. записать файл для каждого раздела и сохранить уровень распараллеливания, вы можете изменить logi c на следующий:

object TestSoAnswer extends App {

  private val testSparkSession = SparkSession.builder()
    .appName("Demo groupBy and partitionBy").master("local[*]")
    .getOrCreate()
  import testSparkSession.implicits._

  // Input dataset with 5 partitions
  val dataset = testSparkSession.sparkContext.parallelize(Seq(
    TestData("a", 0), TestData("a", 1), TestData("b", 0), TestData("b", 1),
    TestData("c", 1), TestData("c", 2)
  ), 5).toDF("letter", "number")

  dataset.as[TestData].groupByKey(row => row.letter)
    .flatMapGroups {
      case (_, values) => values
    }.write.partitionBy("letter").mode("append").json("/tmp/test-parallel-write")

}

case class TestData(letter: String, number: Int)

Как это работает?

Сначала код выполняет перемешайте, чтобы собрать все строки, относящиеся к определенному ключу c ( то же, что и для разделения ) в одни и те же разделы. Таким образом, он будет выполнять запись сразу во все строки, принадлежащие ключу. Некоторое время go я написал сообщение в блоге о partitionBy методе . Грубо говоря, внутренне он будет сортировать записи в данном разделе, а затем записывать их одну за другой в файл.

Таким образом мы получаем план, подобный этому, где только 1 перемешивание, поэтому обработка требует больших затрат присутствует операция:

== Physical Plan ==
*(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#22, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#23]
+- MapGroups TestSoAnswer$$$Lambda$1236/295519299@55c50f52, value#18.toString, newInstance(class TestData), [value#18], [letter#3, number#4], obj#21: TestData
   +- *(1) Sort [value#18 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#18, 200), true, [id=#15]
         +- AppendColumnsWithObject TestSoAnswer$$$Lambda$1234/1747367695@6df11e91, [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#3, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#4], [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#18]
            +- Scan[obj#2]

Результат выполнения TestSoAnswer дважды выглядит следующим образом:

test-parallel-write % ls
_SUCCESS letter=a letter=b letter=c
test-parallel-write % ls letter=a
part-00170-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00170-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=b
part-00161-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00161-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=c
part-00122-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00122-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

Вы также можете контролировать количество записей, записанных на файл с помощью эта конфигурация .

Изменить: не видел комментарий @mazaneicha, но вы действительно можете попробовать с repartition("partitioning column")! Это даже более ясно, чем выражение группировки.

Best,

Bartosz.

0 голосов
/ 03 августа 2020

Если вы не собираетесь использовать Spark ни ​​для чего другого, кроме как для разделения файла на меньшие версии самого себя, то я бы сказал, что Spark - плохой выбор. Вам было бы лучше сделать это в AWS, следуя подходу, подобному тому, который приведен в этом сообщении о переполнении стека

Предполагая, что у вас есть экземпляр EC2, вы бы запустили что-то вот так:

aws s3 cp s3://input_folder/12GB.json - | split -l 1000 - output.
aws s3 cp output.* s3://output_folder/

Если вы хотите выполнить дальнейшую обработку данных в Spark, вам нужно перераспределить данные на фрагменты от 128 МБ до 1 ГБ . При стандартном (мгновенном) сжатии вы обычно получаете 20% от исходного размера файла. Итак, в вашем случае: между (12/5) ~ 3 и (12/5/8) ~ 20 разделами, поэтому:

data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA) 

dataPart = data.repartition(12)

На самом деле это не особенно большой набор данных для Spark и должен

Сохранение в виде паркета дает вам хорошую точку восстановления, а повторное считывание данных будет очень быстрым. Общий размер файла составит около 2,5 ГБ.

...