Spark / PySpark collect_set с двоичным столбцом - PullRequest
2 голосов
/ 02 апреля 2019

Некоторые тестовые данные с двумя столбцами: первый двоичный файл (с использованием буквенно-цифровых байтов в этом примере), второй целое число:

from pyspark.sql.types import *
from pyspark.sql import functions as F

df = spark.createDataFrame([
    (bytearray(b'0001'), 1),
    (bytearray(b'0001'), 1),
    (bytearray(b'0001'), 2),
    (bytearray(b'0002'), 2)
],
schema=StructType([
    StructField("bin", BinaryType()),
    StructField("number", IntegerType())
]))

Использование collect_set для группировки по столбцу целых чисел и последующего удаления дубликатовне работает, потому что байтовые массивы не поддерживают хеширование.Следовательно:

(
    df
    .groupBy('number')
    .agg(F.collect_set("bin").alias('bin_array'))
    .show()
)

+------+------------+
|number|   bin_array|
+------+------------+
|     1|[0001, 0001]|
|     2|[0001, 0002]|
+------+------------+

Один хакерский вариант - встроить двоичный массив в структуру, а затем снова развернуть их все, но я подозреваю, что это приведет к огромному количеству выделений и будет очень дорогим (пока)хотя на самом деле это профилировано):

def unstruct_array(input):
    return [x.bin for x in input]

unstruct_array_udf = F.udf(unstruct_array, ArrayType(BinaryType()))

(
    df
    .withColumn("bin", F.struct("bin"))
    .groupBy('number')
    .agg(F.collect_set("bin").alias('bin_array'))
    .withColumn('bin_array', unstruct_array_udf('bin_array'))
    .show()
)

+------+------------+                                                           
|number|   bin_array|
+------+------------+
|     1|      [0001]|
|     2|[0001, 0002]|
+------+------------+

Если я попробую множество поисковых запросов Google вокруг двоичных типов и Spark, то есть разные ответы, в которых говорится, что вам нужно обернуть массивы, если вам нужно хеширование.Предложения включают в себя пользовательскую оболочку или вызывая toSeq Scala, который создает Scala WrappedArray.Например:

ReduceByKey с байтовым массивом в качестве ключа

Как использовать байтовый массив в качестве ключа в СДР?

Итак, параметры включают в себя:

  1. Отображение базового RDD, чтобы сделать двоичное поле WrappedArray.Не знаете, как это сделать в Python?
  2. Создание оболочки Python для массива, а затем каким-то образом хэшировать базовый массив Java в Python?Хотя я не уверен, что это имеет какое-либо преимущество перед использованием структуры?
  3. Я мог бы обернуть структуру, а затем никогда не развернуть, что было бы немного более эффективным с точки зрения обработки, но тогда, вероятно, сделало бы файлы партера больше и большедорого разбирать во всех последующих задачах

1 Ответ

1 голос
/ 02 апреля 2019

Вот хак, который , вероятно, будет более эффективным, чем упаковка и распаковка.Вы можете просто вызвать метод distinct заранее.

df.show()
+-------------+------+
|          bin|number|
+-------------+------+
|[30 30 30 31]|     1|
|[30 30 30 31]|     1|
|[30 30 30 31]|     2|
|[30 30 30 32]|     2|
+-------------+------+

df.distinct().show()
+-------------+------+
|          bin|number|
+-------------+------+
|[30 30 30 31]|     1|
|[30 30 30 31]|     2|
|[30 30 30 32]|     2|
+-------------+------+

Обратите внимание, что я, вероятно, не использую ту же версию Spark, что и ваша (у меня 2.2.1) для отображения двоичных массивов, похоже, отличается.

Тогда для collect_set, это просто сводится к:

df.distinct().groupBy("number").agg(F.collect_set("bin"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...