Цикл Pyspark DataFrame - PullRequest
0 голосов
/ 16 апреля 2020

Я новичок в Python и DataFrame. Здесь я пишу Python код для запуска задания ETL в AWS Glue. Пожалуйста, найдите тот же фрагмент кода ниже.

test_DyF = glueContext.create_dynamic_frame.from_catalog(database="teststoragedb", table_name="testtestfile_csv")
test_dataframe = test_DyF.select_fields(['empid','name']).toDF()

теперь вышеупомянутый test_dataframe имеет тип pyspark.sql.dataframe.DataFrame

Теперь мне нужно l oop через вышеупомянутый test_dataframe . Насколько я вижу, я мог видеть только collect или toLocalIterator. Пожалуйста, найдите приведенный ниже пример кода

for row_val in test_dataframe.collect():

Но оба эти метода очень медленные и неэффективные. Я не могу использовать pandas, поскольку он не поддерживается AWS Glue.

Пожалуйста, найдите шаги, которые я делаю

информация об источнике:

productid|matchval|similar product|similar product matchval
product A|100|product X|100
product A|101|product Y|101
product B|100|product X|100
product C|102|product Z|102

ожидаемый результат :

product |similar products
product A|product X, product Y
product B|product X
product C|product Z

Это код, который я пишу

  1. Я получаю отдельный кадр данных источника с productID
  2. L oop через этот отдельный набор фреймов данных

    а) получить список соответствий для продукта из источника

    б) определить аналогичный продукт на основе фильтров соответствия

    c ) l oop через, чтобы получить объединенную строку ---> Этот l oop с использованием rdd.collect влияет на производительность

Можете ли вы поделиться каким-либо лучшим предложением о том, что можно сделать?

1 Ответ

0 голосов
/ 17 апреля 2020

пожалуйста, опишите, какие логики c вы хотите попробовать. Цикл DF может быть выполнен с помощью подхода SQL, или вы также можете следовать нижеприведенному подходу RDD

def my_function(each_record):
#my_logic

#loop through for each command. 
df.rdd.foreach(my_function)

Добавлен следующий код, основанный на вашем вводе

df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
seq = ['product X','product Y','product Z']
df2 = df.groupBy("productid").pivot("similar_product",seq).count()

+---------+---------+---------+---------+
|productid|product X|product Y|product Z|
+---------+---------+---------+---------+
|product B|        1|     null|     null|
|product A|        1|        1|     null|
|product C|     null|     null|        1|
+---------+---------+---------+---------+

Окончательный подход, который соответствует вашему требование

df = spark.read.csv ("/ mylocation / 61250775.csv", header = True, inferSchema = True, sep = "|") df.printSchema ()

>>> df.printSchema()
root
 |-- id: string (nullable = true)
 |-- matchval1: integer (nullable = true)
 |-- similar: string (nullable = true)
 |-- matchval3: integer (nullable = true)


from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list
dfx = df.groupBy("id").agg(concat_ws(",", collect_list("similar")).alias("Similar_Items")).select(col("id"), col("Similar_Items"))
dfx.show()

+---------+-------------------+
|       id|      Similar_Items|
+---------+-------------------+
|product B|          product X|
|product A|product X,product Y|
|product C|          product Z|
+---------+-------------------+
...