Играю со Spark уже около 5 месяцев, так что наверняка еще новичок.
У меня есть работа, на которую я хотел бы обратиться за помощью в выявлении узких мест и способов их устранения.
6 узлов, 30 гигабайт оперативной памяти, 8 VCPU каждый. У нас также работают Hive, Imapala и некоторые другие вещи, так что есть немало накладных расходов.
Версия Spark 2.2.1
По сути, задание берет плоский файл .CSV и присоединяет его к сохраненному файлу .AVRO, а затем выполняет некоторые вычисления, ищет «совпадения», разбивает их на части и записывает их в SQL Server. Отклонение данных - это очень очень реальная проблема, и чтобы обойти это, я разделюсь, чтобы предотвратить гибель исполнителей. Работа работает, и довольно хорошо, но я стремлюсь сделать ее лучше, быстрее и способна масштабироваться еще дальше.
Данные из таблицы A организованы следующим образом, таблица B идентична по структуре. Таблица A состоит из 26 миллионов строк, и если считать, то таблица B может содержать 50 000 строк - 1 миллион.
Таблица A - файл AVRO, размером около 3 ГБ, все строки.
+---------+--------+------+
|nameA |zipcodeA| IDA |
+---------+--------+------+
|ABC Paint|10001 | |
|ABC Cards|10001 | |
|Mikes Tow|10001 |140000|
|Bobs Stuf|07436 | |
|NOT KNOWN|19428 | |
|NOT KNOWN|08852 |160000|
|Sub SHOP |90001 | |
|BURGER RA|90001 |140000|
+---------+--------+------+
TableB - плоский CSV, размер варьируется, но никогда не превышает концерт (пока) и все строки
+---------+--------+------+
|nameB |zipcodeB| IDB |
+---------+--------+------+
|ABC Paint|10001 |100000|
|ABC Card |10001 |120000|
|Mikes Tow|10001 |140000|
|BOS STUFF|07436 |160000|
|XYZ CORP |19428 |100000|
|92122211 |08852 |160000|
|Sub SHOP |90001 |120000|
|BURGER RA|90001 |140000|
+---------+--------+------+
appName = "FINDTHEMATCHES"
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)
spark = (sql.SparkSession.builder \
.appName(appName)
.getOrCreate())
tableB = spark.read.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema", "false") \
.csv("s3a://bucket/file.txt")
tableA= spark.read.format("com.databricks.spark.avro") \
.option("inferSchema", "false") \
.load("s3a://bucket2/part-m-00000.avro")
tableAFinal= tableA.select('companyA', 'zipcodeA', 'IDA')
ta = tableAFinal.alias('cfa')
tb = tableB .alias('cfb')
# Here we do a one to many join in order to get all the records sorted lined up.
zipcodematches = ta.join(tb, ta.zipcodeA== tb.zipcodeB, how='inner')
zipematchpart = zipcodematches.repartition(500, 'IDB')
jarow = udf(jaro, FloatType()) ## This is a massive function which I left out since I don't want a 10 page SO question but will put in if requested..
# The function calculates Jaro distance between name strings between the two dataframes, and then gives us our new column
matchcompanies = zipmatchpart.withColumn('MATCHBUS', jarow('companyB', 'companyA'))
matches = matchcompanies.withColumn('MATCHES', f.when(((matchcompanies .MATCHBUS >= 0.91)), 1).otherwise(0))
matches = matches1.where(col('MATCHES') == 1)
Теперь нам нужен способ отфильтровать совпадения и несоответствия, чтобы мы собирали отдельные записи IDA.
IDsA = [x.IDA for x in matches.select('IDA').distinct().collect()]
Удалите дубликаты, если они существуют, и затем запишите в таблицу, которая позже обновит наши другие таблицы.
matchesunique = matches.dropDuplicates(['IDA'])
matchesunique.write.jdbc(
url=sqlurl,
table='dbo.existingrecords',
mode='overwrite',
properties=sqlproperties)
Соберите все не совпадающие матчи, попавшие под счет Jaro 0,91. Там будет много дубликатов записей
nonmatches = matches.where(col('MATCHES') == 0)
Отфильтруйте их, чтобы, если идентификатор существует в кадре данных "попадания", он также не может быть НОВОЙ записью.
nonmatchesfiltered = nonmatches.filter(~nonmatches.IDB.isin(IDsA))
Теперь удалите дубликаты, так как мы вставляем одну новую запись, а не несколько неудачных попыток.
nonmatchesunique = nonmatchesfiltered.dropDuplicates(['IDB'])
nonmatchesunique.write.jdbc(
url=sqlurl,
table='dbo.newrecordstoinsert',
mode='overwrite',
properties=sqlproperties)
spark.stop()
Что я ищу, так это то, могу ли я делать вещи по-другому и более эффективно, и видит ли кто-нибудь чистое «нет» в работе Spark? Я всегда задавался вопросом, был ли мой метод устранения дубликатов попаданий и не попаданий слишком дорогостоящей операцией, и есть ли способы очистить его или что-то еще. Одно из предложений коллег было извлекать данные из Hive, а не использовать плоские файлы, однако это на самом деле значительно ухудшило производительность , что было удивительно, поскольку оно сохранялось как AVRO в S3.