Spark Job Tuning / Улучшения - PullRequest
0 голосов
/ 12 мая 2019

Играю со Spark уже около 5 месяцев, так что наверняка еще новичок.

У меня есть работа, на которую я хотел бы обратиться за помощью в выявлении узких мест и способов их устранения.

6 узлов, 30 гигабайт оперативной памяти, 8 VCPU каждый. У нас также работают Hive, Imapala и некоторые другие вещи, так что есть немало накладных расходов.

Версия Spark 2.2.1

По сути, задание берет плоский файл .CSV и присоединяет его к сохраненному файлу .AVRO, а затем выполняет некоторые вычисления, ищет «совпадения», разбивает их на части и записывает их в SQL Server. Отклонение данных - это очень очень реальная проблема, и чтобы обойти это, я разделюсь, чтобы предотвратить гибель исполнителей. Работа работает, и довольно хорошо, но я стремлюсь сделать ее лучше, быстрее и способна масштабироваться еще дальше.

Данные из таблицы A организованы следующим образом, таблица B идентична по структуре. Таблица A состоит из 26 миллионов строк, и если считать, то таблица B может содержать 50 000 строк - 1 миллион.

Таблица A - файл AVRO, размером около 3 ГБ, все строки.

+---------+--------+------+    
|nameA    |zipcodeA| IDA  |
+---------+--------+------+
|ABC Paint|10001   |      |
|ABC Cards|10001   |      |
|Mikes Tow|10001   |140000|
|Bobs Stuf|07436   |      |
|NOT KNOWN|19428   |      |
|NOT KNOWN|08852   |160000|
|Sub SHOP |90001   |      |
|BURGER RA|90001   |140000|
+---------+--------+------+

TableB - плоский CSV, размер варьируется, но никогда не превышает концерт (пока) и все строки

+---------+--------+------+
|nameB    |zipcodeB|  IDB |
+---------+--------+------+
|ABC Paint|10001   |100000|
|ABC Card |10001   |120000|
|Mikes Tow|10001   |140000|
|BOS STUFF|07436   |160000|
|XYZ CORP |19428   |100000|
|92122211 |08852   |160000|
|Sub SHOP |90001   |120000|
|BURGER RA|90001   |140000|
+---------+--------+------+

    appName = "FINDTHEMATCHES"
    conf = SparkConf().setAppName(appName)
    sc = SparkContext(conf=conf)
    spark = (sql.SparkSession.builder \
                  .appName(appName)
                  .getOrCreate())

    tableB = spark.read.option("header", "true") \
          .option("delimiter", "|") \
          .option("inferSchema", "false") \
          .csv("s3a://bucket/file.txt")

    tableA= spark.read.format("com.databricks.spark.avro") \
           .option("inferSchema", "false") \
            .load("s3a://bucket2/part-m-00000.avro")

    tableAFinal= tableA.select('companyA', 'zipcodeA', 'IDA')

    ta = tableAFinal.alias('cfa')
    tb = tableB .alias('cfb')
    # Here we do a one to many join in order to get all the records sorted lined up.  
    zipcodematches = ta.join(tb, ta.zipcodeA== tb.zipcodeB, how='inner')
    zipematchpart = zipcodematches.repartition(500, 'IDB')

    jarow = udf(jaro, FloatType()) ## This is a massive function which I left out since I don't want a 10 page SO question but will put in if requested..
    # The function calculates Jaro distance between name strings between the two dataframes, and then gives us our new column
    matchcompanies = zipmatchpart.withColumn('MATCHBUS', jarow('companyB', 'companyA')) 

    matches = matchcompanies.withColumn('MATCHES', f.when(((matchcompanies .MATCHBUS >= 0.91)), 1).otherwise(0))


    matches = matches1.where(col('MATCHES') == 1)

Теперь нам нужен способ отфильтровать совпадения и несоответствия, чтобы мы собирали отдельные записи IDA.

    IDsA = [x.IDA for x in matches.select('IDA').distinct().collect()]    

Удалите дубликаты, если они существуют, и затем запишите в таблицу, которая позже обновит наши другие таблицы.

   matchesunique = matches.dropDuplicates(['IDA'])

   matchesunique.write.jdbc(
            url=sqlurl,
            table='dbo.existingrecords',
            mode='overwrite',
            properties=sqlproperties)

Соберите все не совпадающие матчи, попавшие под счет Jaro 0,91. Там будет много дубликатов записей

    nonmatches = matches.where(col('MATCHES') == 0)

Отфильтруйте их, чтобы, если идентификатор существует в кадре данных "попадания", он также не может быть НОВОЙ записью.

    nonmatchesfiltered =  nonmatches.filter(~nonmatches.IDB.isin(IDsA))

Теперь удалите дубликаты, так как мы вставляем одну новую запись, а не несколько неудачных попыток.

    nonmatchesunique = nonmatchesfiltered.dropDuplicates(['IDB'])

    nonmatchesunique.write.jdbc(
            url=sqlurl,
            table='dbo.newrecordstoinsert',
            mode='overwrite',
            properties=sqlproperties)

    spark.stop()

Что я ищу, так это то, могу ли я делать вещи по-другому и более эффективно, и видит ли кто-нибудь чистое «нет» в работе Spark? Я всегда задавался вопросом, был ли мой метод устранения дубликатов попаданий и не попаданий слишком дорогостоящей операцией, и есть ли способы очистить его или что-то еще. Одно из предложений коллег было извлекать данные из Hive, а не использовать плоские файлы, однако это на самом деле значительно ухудшило производительность , что было удивительно, поскольку оно сохранялось как AVRO в S3.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...