Здесь я хочу отсортировать данные по каждому идентификатору на основе даты. В этих отсортированных данных начинайте со второй строки и сравнивайте каждую строку со строкой над ней и сообщайте, в какой столбец было внесено изменение. Сохраните эту информацию в новом столбце.
Сравнение должно происходить только в том случае, если идентификатор совпадает, иначе результат будет пустой ячейкой.
Что я пробовал:
1) разделить данные на основе идентификатора.
2) вызвать MapPartition.
3) внутри mappartition преобразовать раздел в список и отсортировать по id и дате
4) создайте скользящее windoe в этом отсортированном списке и сравните 0-й и 1-й ряд в каждом окне. создать объект класса case со строкой результата.
Поскольку первая строка каждого раздела не будет сравниваться ни с чем, добавьте объект класса case для этой строки с пустой строкой resutl_string.
val df_after_comparison=df_from_source.repartition(df_from_source("id")).mapPartitions(partition =>{
if(partition != null ) {
val iter=partition.toList.sortBy(Row => (Row.getAs[String]("id"), Row1.getAs[String]("date")))
if (iter.length == 1) {
iter.map(x => new newdata(x.getAs[String]("id"), x.getAs[String]("attribute1"),x.getAs[String]("date"), x.getAs[String]("attribute2"), "")).toList.iterator
}
else {
val par = iter.sliding(2, 1).toList.map(x => {
var result_string = ""
if (x(0).getAs[String]("id") != x(1).getAs[String]("id")) {
new newdata(x(1).getAs[String]("id"), x(1).getAs[String]("attribute1"), x(1).getAs[String]("date"), x(1).getAs[String]("attribute2"), "")
}
else {
if (x(0).getAs[String]("attribute1") != x(1).getAs[String]("attribute1")) {
result_string = result_string + " attribute1 "
}
if (x(0).getAs[String]("date") != x(1).getAs[String]("date")) {
result_string = result_string + " date "
}
if (x(0).getAs[String]("attribute2") != x(1).getAs[String]("attribute2")) {
result_string = result_string + " attribute2 "
}
new newdata(x(1).getAs[String]("id"), x(1).getAs[String]("attribute1"), x(1).getAs[String]("panel_date1"), x(1).getAs[String]("attribute2"), result_string)
}
}
)
(new newdata(iter.head.getAs[String]("id"), iter.head.getAs[String]("attribute1"), iter.head.getAs[String]("panel_date1"), iter.head.getAs[String]("attribute2"), "") +: par).iterator
}
}
else{
Iterator.empty
}
})
Это нормально работает в локальном режиме, но не работает в кластере. Любая помощь приветствуется.
Пример ожидаемого выхода:
+ ----------------- + ---------- + ------------------- + --------------- + -------------------- +
| Идентификатор | attribute1 | дата | attribute2 | result_string |
+ ----------------- + ---------- + ----------------- - + --------------- + -------------------- +
| 10000010000000001 | 1 | 2016-01-01 00: 00: 00 | ITEM_3 | | // первая строка для этого идентификатора, поэтому нет сравнения
| 10000010000000001 | 3 | 2016-01-02 00: 00: 00 | ITEM_4 | атрибут1 дата ... | // изменение в 3 столбцах
| 10000010000000001 | 3 | 2016-02-24 00: 00: 00 | ITEM_5 | атрибут даты2 |
| 10000010000000001 | 3 | 2016-02-31 00: 00: 00 | ITEM_6 | атрибут даты2 |
| 10000010000000001 | 3 | 2016-03-02 00: 00: 00 | ITEM_7 | атрибут даты2 |
| 10000010000000001 | 3 | 2016-03-03 00: 00: 00 | ITEM_8 | атрибут даты2 |
| 10000010000000001 | 3 | 2016-03-08 00: 00: 00 | ITEM_9 | атрибут даты2 |
| 10000010000000001 | 3 | 2016-03-18 00: 00: 00 | ITEM_10 | атрибут даты2 |
| 10000010000000004 | 1 | 2016-04-03 00: 00: 00 | ITEM_1 | | // новый идентификатор, поэтому строка-результата пуста
| 10000010000000006 | 1 | 2016-01-01 00: 00: 00 | ITEM_2 | |
| 10000010000000006 | 2 | 2016-04-05 00: 00: 00 | ITEM_1 | атрибут1 дата ... |
| 10000010000000030 | 2 | 2016-04-04 00: 00: 00 | ITEM_1 | |
+ ----------------- + ---------- + ----------------- - + --------------- + -------------------- +