Некоторые тестовые данные с двумя столбцами: первый двоичный файл (с использованием буквенно-цифровых байтов в этом примере), второй целое число:
from pyspark.sql.types import *
from pyspark.sql import functions as F
df = spark.createDataFrame([
(bytearray(b'0001'), 1),
(bytearray(b'0001'), 1),
(bytearray(b'0001'), 2),
(bytearray(b'0002'), 2)
],
schema=StructType([
StructField("bin", BinaryType()),
StructField("number", IntegerType())
]))
Использование collect_set для группировки по столбцу целых чисел и последующего удаления дубликатовне работает, потому что байтовые массивы не поддерживают хеширование.Следовательно:
(
df
.groupBy('number')
.agg(F.collect_set("bin").alias('bin_array'))
.show()
)
+------+------------+
|number| bin_array|
+------+------------+
| 1|[0001, 0001]|
| 2|[0001, 0002]|
+------+------------+
Один хакерский вариант - встроить двоичный массив в структуру, а затем снова развернуть их все, но я подозреваю, что это приведет к огромному количеству выделений и будет очень дорогим (пока)хотя на самом деле это профилировано):
def unstruct_array(input):
return [x.bin for x in input]
unstruct_array_udf = F.udf(unstruct_array, ArrayType(BinaryType()))
(
df
.withColumn("bin", F.struct("bin"))
.groupBy('number')
.agg(F.collect_set("bin").alias('bin_array'))
.withColumn('bin_array', unstruct_array_udf('bin_array'))
.show()
)
+------+------------+
|number| bin_array|
+------+------------+
| 1| [0001]|
| 2|[0001, 0002]|
+------+------------+
Если я попробую множество поисковых запросов Google вокруг двоичных типов и Spark, то есть разные ответы, в которых говорится, что вам нужно обернуть массивы, если вам нужно хеширование.Предложения включают в себя пользовательскую оболочку или вызывая toSeq Scala, который создает Scala WrappedArray.Например:
ReduceByKey с байтовым массивом в качестве ключа
Как использовать байтовый массив в качестве ключа в СДР?
Итак, параметры включают в себя:
- Отображение базового RDD, чтобы сделать двоичное поле WrappedArray.Не знаете, как это сделать в Python?
- Создание оболочки Python для массива, а затем каким-то образом хэшировать базовый массив Java в Python?Хотя я не уверен, что это имеет какое-либо преимущество перед использованием структуры?
- Я мог бы обернуть структуру, а затем никогда не развернуть, что было бы немного более эффективным с точки зрения обработки, но тогда, вероятно, сделало бы файлы партера больше и большедорого разбирать во всех последующих задачах