Оптимизировать доступ к строкам и преобразование в pyspark - PullRequest
1 голос
/ 30 мая 2020

У меня есть большой набор данных (5 ГБ) в виде Джейсона в ведре S3. Мне нужно преобразовать схему данных и записать преобразованные данные обратно в S3 с помощью сценария ETL.

Поэтому я использую краулер для обнаружения схемы и загрузки данных в фрейм данных pyspark, а также изменения схемы . Теперь я перебираю каждую строку в фрейме данных и конвертирую ее в словарь. Удалите пустые столбцы, затем преобразуйте словарь в строку и запишите обратно в S3. Ниже приведен код:

#df is the pyspark dataframe
columns = df.columns
print(columns)
s3 = boto3.resource('s3')
cnt = 1

for row in df.rdd.toLocalIterator():
    data = row.asDict(True)

    for col_name in columns:
        if data[col_name] is None:
            del data[col_name]

    content = json.dumps(data)
    object = s3.Object('write-test-transaction-transformed', str(cnt)).put(Body=content)
    cnt = cnt+1
print(cnt)

Я использовал toLocalIterator. Выполняется ли приведенный выше код последовательно? если да, то как его оптимизировать? Есть ли лучший подход для выполнения вышеуказанного logi c?

Ответы [ 3 ]

1 голос
/ 30 мая 2020

предполагается, что каждая строка в наборе данных имеет формат json строковый формат

import pyspark.sql.functions as F

def drop_null_cols(data):
    import json
    content = json.loads(data)
    for key, value in list(content.items()):
        if value is None:
            del content[key]

    return json.dumps(content)

drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())

df = spark.createDataFrame(
    ["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",
     "{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",
     "{\"name\":null, \"age\":31, \"city\":\"London\"}"],
    "string"
).toDF("data")

df.select(
    drop_null_cols_udf("data").alias("data")
).show(10,False)

Если входной фрейм данных имеет столбцы, а выходные данные должны быть не пустыми столбцами json

df = spark.createDataFrame(
        [('Ranga', 25, 'Hyderabad'),
         ('John', None, 'New York'),
         (None, 31, 'London'),
        ],
        ['name', 'age', 'city']
    )

df.withColumn(
    "data", F.to_json(F.struct([x for x in df.columns]))
).select(
    drop_null_cols_udf("data").alias("data")
).show(10, False)

#df.write.format("csv").save("s3://path/to/file/) -- save to s3

, что дает

+-------------------------------------------------+
|data                                             |
+-------------------------------------------------+
|{"name": "Ranga", "age": 25, "city": "Hyderabad"}|
|{"name": "John", "city": "New York"}             |
|{"age": 31, "city": "London"}                    |
+-------------------------------------------------+
1 голос
/ 30 мая 2020

Я буду следовать приведенному ниже подходу (написанному на scala, но его можно реализовать на python с минимальными изменениями) -

  1. Найдите количество наборов данных и назовите его totalCount
val totalcount = inputDF.count()

Найдите count(col) для всех столбцов фрейма данных и получите карту полей для их количества

  • Здесь для всех столбцов входного фрейма данных подсчет вычисляется
  • Обратите внимание, что count(anycol) возвращает количество строк, для которых все указанные столбцы не равны нулю. Например - если столбец имеет значение 10 строк, и если, скажем, 5 значений равны null, то счетчик (столбец) становится 5
  • Извлечь первую строку как Map[colName, count(colName)], обозначенную как fieldToCount
val cols = inputDF.columns.map { inputCol =>
      functions.count(col(inputCol)).as(inputCol)
    }
// Returns the number of rows for which the supplied column are all non-null.
    // count(null) returns 0
    val row = dataset.select(cols: _*).head()
    val fieldToCount = row.getValuesMap[Long]($(inputCols))

Получите удаляемые столбцы

  • Используйте карту, созданную на шаге 2, и отметьте столбец, имеющий количество меньше totalCount, в качестве удаляемого столбца
  • выберите все столбцы, которые имеют count == totalCount из входного кадра данных и сохраните обработанный выходной кадр данных в любом месте в любом формате в соответствии с требованиями.
  • Обратите внимание, что this approach will remove all the column having at least one null value
val fieldToBool = fieldToCount.mapValues(_ < totalcount)
val processedDF = inputDF.select(fieldToBool.filterNot(_._2).map(_.1) :_*)
// save this processedDF anywhere in any format as per requirement

Я считаю, что этот подход будет работать лучше, чем подход, который у вас есть сейчас

0 голосов
/ 30 мая 2020

Я решил вышеуказанную проблему. Мы можем просто запросить фрейм данных для нулевых значений. df = df.filter (df.column.isNotNull ()), тем самым удаляя все строки, в которых присутствует null. Итак, если есть n столбцов, нам нужно 2 ^ n запросов, чтобы отфильтровать все возможные комбинации. В моем случае было 10 столбцов, поэтому всего 1024 запроса, что приемлемо, поскольку sql запросов распараллеливаются.

...