Замените столбец вычисляемым столбцом с тем же именем в Spark SQL - PullRequest
0 голосов
/ 10 декабря 2018

Я читаю файлы из своего набора данных и загружаю их в фрейм данных. Загруженные данные имеют некоторые поля, которые не совпадают с типом в исходной базе данных, из-за проблемы приведения в kafka ( здесь )

Итак, я загружаю данные из S3 с неправильным типом данных (bunary) и преобразую каждый столбец в другой с помощью функции UDF

Затем я переименовываю новые столбцы, чтобы заменить старые, чтобы сохранитьта же структура в моих исходных и целевых базах данных

Шаги:

До:

myTable
|
+-- myField1 (binary)
+-- myField2 (binary)
+-- myField3 (binary)

Промежуточное состояние 1 (приведение с функцией UDF):

myTable
|
+-- myField1 (binary)
+-- myField1_new (numeric)
+-- myField2 (binary)
+-- myField2_new (numeric)
+-- myField3 (binary)
+-- myField3_new (numeric)

Промежуточное состояние 2 (отбросить старый столбец):

myTable
|
+-- myField1_new (numeric)
+-- myField2_new (numeric)
+-- myField3_new (numeric)

Конечное состояние (переименовать вычисляемый столбец):

myTable
|
+-- myField1 (numeric)
+-- myField1 (numeric)
+-- myField1 (numeric)

Вот синтаксис, который я использую:

spark.sql('select *,
            MyUDF(myfield1) myfield1_new,
            MyUDF(myfield2) myfield2_new,
            MyUDF(myfield3) myfield3_new
            from my_table')
.drop('myfield1').withColumnRenamed('myfield1_new', 'myfield1')
.drop('myfield2').withColumnRenamed('myfield2_new', 'myfield2')
.drop('myfield3').withColumnRenamed('myfield3_new', 'myfield3')
.show(1, False)

Моя проблема в том, что процесс действительно очень медленный из-за 439 полей для вычисления в реальной рабочей таблице (439 !!!)

Есть ли способ сделать это быстрее?На лету переименовываете или как?

Спасибо за помощь

1 Ответ

0 голосов
/ 10 декабря 2018

Я видел предыдущую ветку на этот вопрос.

Расширение этого как, предположим, у вас есть df как

+--------------------+
|             myfield|
+--------------------+
|[00, 8F, 2B, 9C, 80]|
|    [52, F4, 92, 80]|
+--------------------+

РЕДАКТИРОВАТЬ: Поскольку формат столбца myfield равен bytearray(b'\x00'), способ преобразования следующий (как указано @Ftagn).В противном случае, если это список строк, используйте commented return.

def func(val):
    return int.from_bytes(val, byteorder='big', signed=False) / 1000000
    # return int("".join(val), 16)/1000000
func_udf = udf(lambda x: func(x), FloatType())

. Для создания вывода используйте

df = df.withColumn("myfield1", func_udf("myfield"))

. Это дает

+--------------------+--------+
|             myfield|myfield1|
+--------------------+--------+
|[00, 8F, 2B, 9C, 80]|  2402.0|
|    [52, F4, 92, 80]| 1391.76|
+--------------------+--------+

Вместо этого, если вы используете,

df = df.withColumn("myfield", func_udf("myfield"))

вы получите,

+-------+
|myfield|
+-------+
| 2402.0|
|1391.76|
+-------+
...