Spark dataframe: сравнение строк с рядами - PullRequest
0 голосов
/ 16 января 2019

Здесь я хочу отсортировать данные по каждому идентификатору на основе даты. В этих отсортированных данных начинайте со второй строки и сравнивайте каждую строку со строкой над ней и сообщайте, в какой столбец было внесено изменение. Сохраните эту информацию в новом столбце.

Сравнение должно происходить только в том случае, если идентификатор совпадает, иначе результат будет пустой ячейкой.

Что я пробовал: 1) разделить данные на основе идентификатора. 2) вызвать MapPartition. 3) внутри mappartition преобразовать раздел в список и отсортировать по id и дате 4) создайте скользящее windoe в этом отсортированном списке и сравните 0-й и 1-й ряд в каждом окне. создать объект класса case со строкой результата.

Поскольку первая строка каждого раздела не будет сравниваться ни с чем, добавьте объект класса case для этой строки с пустой строкой resutl_string.

val df_after_comparison=df_from_source.repartition(df_from_source("id")).mapPartitions(partition =>{
    if(partition != null ) {
      val iter=partition.toList.sortBy(Row => (Row.getAs[String]("id"), Row1.getAs[String]("date")))
      if (iter.length == 1) {
        iter.map(x => new newdata(x.getAs[String]("id"), x.getAs[String]("attribute1"),x.getAs[String]("date"), x.getAs[String]("attribute2"), "")).toList.iterator
      }
      else {
        val par = iter.sliding(2, 1).toList.map(x => {
          var result_string = ""
          if (x(0).getAs[String]("id") != x(1).getAs[String]("id")) {
               new newdata(x(1).getAs[String]("id"), x(1).getAs[String]("attribute1"), x(1).getAs[String]("date"), x(1).getAs[String]("attribute2"), "")
          }
          else {
            if (x(0).getAs[String]("attribute1") != x(1).getAs[String]("attribute1")) {
                result_string = result_string + " attribute1 "
            }
            if (x(0).getAs[String]("date") != x(1).getAs[String]("date")) {
                result_string = result_string + " date "
            }
            if (x(0).getAs[String]("attribute2") != x(1).getAs[String]("attribute2")) {
                result_string = result_string + " attribute2 "
            }
                new newdata(x(1).getAs[String]("id"), x(1).getAs[String]("attribute1"), x(1).getAs[String]("panel_date1"), x(1).getAs[String]("attribute2"), result_string)
            }
        }
        )
            (new newdata(iter.head.getAs[String]("id"), iter.head.getAs[String]("attribute1"), iter.head.getAs[String]("panel_date1"), iter.head.getAs[String]("attribute2"), "") +: par).iterator
        }
    }
    else{
        Iterator.empty
    }

})

Это нормально работает в локальном режиме, но не работает в кластере. Любая помощь приветствуется. Пример ожидаемого выхода: + ----------------- + ---------- + ------------------- + --------------- + -------------------- +

| Идентификатор | attribute1 | дата | attribute2 | result_string |

+ ----------------- + ---------- + ----------------- - + --------------- + -------------------- +

| 10000010000000001 | 1 | 2016-01-01 00: 00: 00 | ITEM_3 | | // первая строка для этого идентификатора, поэтому нет сравнения

| 10000010000000001 | 3 | 2016-01-02 00: 00: 00 | ITEM_4 | атрибут1 дата ... | // изменение в 3 столбцах

| 10000010000000001 | 3 | 2016-02-24 00: 00: 00 | ITEM_5 | атрибут даты2 |

| 10000010000000001 | 3 | 2016-02-31 00: 00: 00 | ITEM_6 | атрибут даты2 |

| 10000010000000001 | 3 | 2016-03-02 00: 00: 00 | ITEM_7 | атрибут даты2 |

| 10000010000000001 | 3 | 2016-03-03 00: 00: 00 | ITEM_8 | атрибут даты2 |

| 10000010000000001 | 3 | 2016-03-08 00: 00: 00 | ITEM_9 | атрибут даты2 |

| 10000010000000001 | 3 | 2016-03-18 00: 00: 00 | ITEM_10 | атрибут даты2 |

| 10000010000000004 | 1 | 2016-04-03 00: 00: 00 | ITEM_1 | | // новый идентификатор, поэтому строка-результата пуста

| 10000010000000006 | 1 | 2016-01-01 00: 00: 00 | ITEM_2 | |

| 10000010000000006 | 2 | 2016-04-05 00: 00: 00 | ITEM_1 | атрибут1 дата ... |

| 10000010000000030 | 2 | 2016-04-04 00: 00: 00 | ITEM_1 | |

+ ----------------- + ---------- + ----------------- - + --------------- + -------------------- +

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...