Условное разбиение в Spark Структурированные потоковые / Spark SQL - PullRequest
0 голосов
/ 13 июня 2018

Я пытаюсь сделать условный взрыв в Spark Structured Streaming.

Например, мой потоковый фрейм данных выглядит следующим образом (полностью собирая данные здесь).Я хочу разбить массив сотрудников на отдельные строки массивов, когда contingent = 1.Когда contingent = 0, мне нужно, чтобы массив был как есть.

|----------------|---------------------|------------------|
|     Dept ID    |     Employees       |    Contingent    |
|----------------|---------------------|------------------|
|          1     | ["John", "Jane"]    |       1          |
|----------------|---------------------|------------------|
|          4     | ["Amy", "James"]    |       0          |
|----------------|---------------------|------------------|
|          2     | ["David"]           |       1          |
|----------------|---------------------|------------------|

Итак, мой вывод должен выглядеть так (мне не нужно отображать столбец contingent:

|----------------|---------------------|
|     Dept ID    |     Employees       |
|----------------|---------------------|
|          1     | ["John"]            |
|----------------|---------------------|
|          1     | ["Jane"]            |
|----------------|---------------------|
|          4     | ["Amy", "James"]    |
|----------------|---------------------|
|          2     | ["David"]           |
|----------------|---------------------|

Есть несколько проблем, с которыми я сейчас сталкиваюсь:

  1. Условное взрывание массивов
  2. взрыв массивов в массивы (в данном случае вместо строк)

В Hive была концепция UDTF (определяемые пользователем табличные функции), которая позволяла бы мне это делать. Интересно, есть ли что-нибудь сопоставимое с этим?

Ответы [ 2 ]

0 голосов
/ 13 июня 2018

Используйте flatMap, чтобы взорваться и указать любое условие, которое вы хотите.

case class Department (Dept_ID: String, Employees: Array[String], Contingent: Int)
case class DepartmentExp (Dept_ID: String, Employees: Array[String])

val ds = df.as[Department]

ds.flatMap(dept => {
  if (dept.Contingent == 1) {
    dept.Employees.map(emp => DepartmentExp(dept.Dept_ID, Array(emp)))
  } else {
    Array(DepartmentExp(dept.Dept_ID, dept.Employees))
  }
}).as[DepartmentExp]
0 голосов
/ 13 июня 2018

Вы можете использовать when / case класс с explode для взрыва массива.

df
  .withColumn(
    "Employees",
     when(col("contingent") === 1, explode(col("Employees")).otherwise(col("Employees"))))
  .drop("contingent")

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...