Как отобразить массив Spark, не взорвав его? - PullRequest
0 голосов
/ 25 марта 2020

МОЙ случай - у меня есть столбец массива, который я хотел бы отфильтровать. Рассмотрим следующее:

+------------------------------------------------------+
|                                                column|
+------------------------------------------------------+
|[prefix1-whatever, prefix2-whatever, prefix4-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix5-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
+------------------------------------------------------+

Я бы хотел отфильтровать только столбцы, содержащие префикс-4, префикс-5, префикс-6, префикс-7, [...]. Таким образом, использование оператора «или» здесь не масштабируется.

Конечно, я могу просто:

val prefixesList = List("prefix-4", "prefix-5", "prefix-6", "prefix-7")

df
.withColumn("prefix", explode($"column"))
.withColumn("prefix", split($"prefix", "\\-").getItem(0))
.withColumn("filterColumn", $"prefix".inInCollection(prefixesList))

Но это включает взрыв, которого я хочу избежать. Мой план сейчас состоит в том, чтобы определить столбец массива из prefixesList, а затем использовать array_intersect для его фильтрации - чтобы это работало, хотя мне нужно избавиться от части -whatever (которая, очевидно, отличается за каждую запись). Если бы это был массив Scala, я мог бы легко сделать карту поверх него. Но, будучи Spark Array, я не знаю, возможно ли это.


TL; DR У меня есть кадр данных, содержащий столбец массива. Я пытаюсь манипулировать им и фильтровать его без взрыва (потому что, если я действительно взорвусь, мне придется позже манипулировать им, чтобы обратить вспять взрыв, и я бы хотел этого избежать).

Может Я достигну этого без взрыва? Если да, то как?

Ответы [ 3 ]

1 голос
/ 25 марта 2020

Не уверен, правильно ли я понял ваш вопрос: вы хотите сохранить все строки, которые не содержат префиксов, в prefixesList?

Если это так, вы можете написать свою собственную функцию фильтра:

def filterPrefixes (row: Row) : Boolean = {
  for( s <- row.getSeq[String](0)) {
    for( p <- Seq("prefix4", "prefix5", "prefix6", "prefix7")) {
      if( s.startsWith(p) ) {
        return false
      }
    }
  }
  return true
}

и затем использовать его в качестве аргумента для вызова фильтра:

df.filter(filterPrefixes _)
  .show(false)

print

+------------------------------------------------------+
|column                                                |
+------------------------------------------------------+
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
|[prefix1-whatever, prefix2-whatever, prefix3-whatever]|
+------------------------------------------------------+
1 голос
/ 25 марта 2020

Вы можете достичь этого, используя SQL API. Если вы хотите сохранить только строки, содержащие любое из значений prefix-4, prefix-5, prefix-6, prefix-7, вы можете использовать функцию arrays_overlap. В противном случае, если вы хотите сохранить строки, содержащие все ваши значения, вы можете попробовать array_intersect, а затем проверить, равен ли его размер количеству ваших значений.

 val df = Seq(
  Seq("prefix1-a", "prefix2-b", "prefix3-c", "prefix4-d"),
  Seq("prefix4-e", "prefix5-f", "prefix6-g", "prefix7-h", "prefix8-i"),
  Seq("prefix6-a", "prefix7-b", "prefix8-c", "prefix9-d"),
  Seq("prefix8-d", "prefix9-e", "prefix10-c", "prefix12-a")
).toDF("arr")


val schema = StructType(Seq(
  StructField("arr", ArrayType.apply(StringType)),
  StructField("arr2", ArrayType.apply(StringType))
))
val encoder = RowEncoder(schema)

val df2 = df.map(s =>
  (s.getSeq[String](0).toArray, s.getSeq[String](0).map(s => s.substring(0, s.indexOf("-"))).toArray)
).map(s => RowFactory.create(s._1, s._2))(encoder)


val prefixesList = Array("prefix4", "prefix5", "prefix6", "prefix7")
val prefixesListSize = prefixesList.size
val prefixesListCol = lit(prefixesList)

df2.select('arr,'arr2,
  arrays_overlap('arr2,prefixesListCol).as("OR"),
  (size(array_intersect('arr2,prefixesListCol)) === prefixesListSize).as("AND")
).show(false)

это даст вам:

+-------------------------------------------------------+---------------------------------------------+-----+-----+
|arr                                                    |arr2                                         |OR   |AND  |
+-------------------------------------------------------+---------------------------------------------+-----+-----+
|[prefix1-a, prefix2-b, prefix3-c, prefix4-d]           |[prefix1, prefix2, prefix3, prefix4]         |true |false|
|[prefix4-e, prefix5-f, prefix6-g, prefix7-h, prefix8-i]|[prefix4, prefix5, prefix6, prefix7, prefix8]|true |true |
|[prefix6-a, prefix7-b, prefix8-c, prefix9-d]           |[prefix6, prefix7, prefix8, prefix9]         |true |false|
|[prefix8-d, prefix9-e, prefix10-c, prefix12-a]         |[prefix8, prefix9, prefix10, prefix12]       |false|false|
+-------------------------------------------------------+---------------------------------------------+-----+-----+

, так что, наконец, вы можете использовать:

df2.filter(size(array_intersect('arr2,prefixesListCol)) === prefixesListSize).show(false)

, и вы получите результат ниже:

+-------------------------------------------------------+---------------------------------------------+
|arr                                                    |arr2                                         |
+-------------------------------------------------------+---------------------------------------------+
|[prefix4-e, prefix5-f, prefix6-g, prefix7-h, prefix8-i]|[prefix4, prefix5, prefix6, prefix7, prefix8]|
+-------------------------------------------------------+---------------------------------------------+
1 голос
/ 25 марта 2020

Преобразовать Dataframe в Dataset[Array[String]] относительно просто и отобразить эти массивы как целые элементы. Основная идея c заключается в том, что вы можете легко перебирать свой список массивов без необходимости выравнивать весь набор данных.

val df = Seq(Seq("prefix1-whatever", "prefix2-whatever", "prefix4-whatever"),
             Seq("prefix1-whatever", "prefix2-whatever", "prefix3-whatever"),
             Seq("prefix1-whatever", "prefix2-whatever", "prefix5-whatever"),
             Seq("prefix1-whatever", "prefix2-whatever", "prefix3-whatever")
).toDF("column")

val pl = List("prefix4", "prefix5", "prefix6", "prefix7")

val df2 = df.as[Array[String]].map(a => {
    a.flatMap(s => {
        val start = s.split("-")(0)
        if(pl.contains(start)) {
            Some(s)
        } else {
            None
        }
    })
}).toDF("column")

df2.show(false)

Код, приведенный выше, приводит к:

+------------------+
|column            |
+------------------+
|[prefix4-whatever]|
|[]                |
|[prefix5-whatever]|
|[]                |
+------------------+

Я не совсем уверен, как это будет сопоставлять производительность с фактическим выравниванием и объединением набора данных. Это не позволяет оптимизировать катализатор, но позволяет избежать ненужной перестановки данных.

PS Я исправил небольшую проблему в вашем списке префиксов, поскольку "prefix-N" не соответствовал шаблону данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...