Создайте функцию Spark udf для перебора массива байтов и преобразования ее в числовой - PullRequest
0 голосов
/ 27 ноября 2018

У меня есть Dataframe с массивом байтов в spark (python)

DF.select(DF.myfield).show(1, False)
+----------------+                                                              
|myfield         |
+----------------+
|[00 8F 2B 9C 80]|
+----------------+

Я пытаюсь преобразовать этот массив в строку

'008F2B9C80'

, затем в числовойзначение

int('008F2B9C80',16)/1000000
> 2402.0

Я нашел образец udf, поэтому я уже могу извлечь часть массива следующим образом:

u = f.udf(lambda a: format(a[1],'x'))
DF.select(u(DF['myfield'])).show()
+------------------+                                                            
|<lambda>(myfield) |
+------------------+
|                8f|
+------------------+

Теперь, как выполнить итерацию по всему массиву?Можно ли выполнить все операции, которые мне нужно, чтобы закодировать в функции udf?

Может быть, есть лучший способ выполнить приведение ???

Спасибо за вашу помощь

Ответы [ 3 ]

0 голосов
/ 05 декабря 2018

Я также нашел решение для Python

from pyspark.sql.functions import udf
spark.udf.register('ByteArrayToDouble', lambda x: int.from_bytes(x, byteorder='big', signed=False) / 10e5)
spark.sql('select myfield, ByteArrayToDouble(myfield) myfield_python, convert_binary(hex(myfield))/1000000 myfield_scala from my_table').show(1, False)
+-------------+-----------------+----------------+
|myfield      |myfield_python   |myfield_scala   |
+-------------+-----------------+----------------+
|[52 F4 92 80]|1391.76          |1391.76         |
+-------------+-----------------+----------------+
only showing top 1 row

Теперь я могу сравнить два решения

Спасибо за вашу драгоценную помощь

0 голосов
/ 10 декабря 2018

Я сталкивался с этим вопросом, отвечая на ваш самый новый.

Предположим, у вас есть df как

+--------------------+
|             myfield|
+--------------------+
|[00, 8F, 2B, 9C, 80]|
|    [52, F4, 92, 80]|
+--------------------+

Теперь вы можете использовать следующую лямбда-функцию

def func(val):
    return int("".join(val), 16)/1000000
func_udf = udf(lambda x: func(x), FloatType())

И для создания вывода используйте

df = df.withColumn("myfield1", func_udf("myfield"))

Это дает,

+--------------------+--------+
|             myfield|myfield1|
+--------------------+--------+
|[00, 8F, 2B, 9C, 80]|  2402.0|
|    [52, F4, 92, 80]| 1391.76|
+--------------------+--------+
0 голосов
/ 28 ноября 2018

Вот решение scala df.Вам необходимо импортировать scala.math.BigInteger

scala> val df = Seq((Array("00","8F","2B","9C","80"))).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: array<string>]

scala> df.withColumn("idstr",concat_ws("",'id)).show
+--------------------+----------+
|                  id|     idstr|
+--------------------+----------+
|[00, 8F, 2B, 9C, 80]|008F2B9C80|
+--------------------+----------+


scala> import scala.math.BigInt
import scala.math.BigInt

scala> def convertBig(x:String):String = BigInt(x.sliding(2,2).map( x=> Integer.parseInt(x,16)).map(_.toByte).toArray).toString
convertBig: (x: String)String

scala> val udf_convertBig =  udf( convertBig(_:String):String )
udf_convertBig: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.withColumn("idstr",concat_ws("",'id)).withColumn("idBig",udf_convertBig('idstr)).show(false)
+--------------------+----------+----------+
|id                  |idstr     |idBig     |
+--------------------+----------+----------+
|[00, 8F, 2B, 9C, 80]|008F2B9C80|2402000000|
+--------------------+----------+----------+


scala>

Не существует искрового эквивалента для BigInteger в scala, поэтому я преобразовываю результат udf () в строку.

...