Vertica имеет аналитическую функцию CONDITIONAL_CHANGE_EVENT, которая выполняет что-то вроде
rank("column-name") OVER (PARTITION BY category ORDER BY revenue DESC) as rank
https://my.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/SQLReferenceManual/Functions/Analytic/CONDITIONAL_CHANGE_EVENTAnalytic.htm%3FTocPath%3DSQL%2520Reference%2520Manual%7CSQL%2520Functions%7CAnalytic%2520Functions%7C_____9
Я хочу знать, есть ли простой способ подражать этой функции в Spark. Смотрите простой пример ниже
RAW Data
Session_ID,Device_ID,Channel_Time,Channel
1,1,4/9/2018 15:00:00,A
1,1,4/9/2018 15:01:00,A
1,1,4/9/2018 15:02:00,B
1,1,4/9/2018 15:03:00,B
1,1,4/9/2018 15:04:00,B
1,1,4/9/2018 15:05:00,C
1,1,4/9/2018 15:06:00,C
1,1,4/9/2018 15:07:00,A
1,1,4/9/2018 15:08:00,A
1,1,4/9/2018 15:09:00,B
1,1,4/9/2018 15:10:00,B
Я хочу сгенерировать идентификатор группы для указанного выше ввода, применив что-то вроде rank (channel) over (разбиение по Session_ID, порядок Device_ID по Channel_Time) , который недоступен в SPARK.
Ожидаемый результат
Session_ID,Device_ID,Channel_Time,Channel,Group-ID
1,1,4/9/2018 15:00:00,A,1
1,1,4/9/2018 15:01:00,A,1
1,1,4/9/2018 15:02:00,B,2
1,1,4/9/2018 15:03:00,B,2
1,1,4/9/2018 15:04:00,B,2
1,1,4/9/2018 15:05:00,C,3
1,1,4/9/2018 15:06:00,C,3
1,1,4/9/2018 15:07:00,A,4
1,1,4/9/2018 15:08:00,A,4
1,1,4/9/2018 15:09:00,B,5
1,1,4/9/2018 15:10:00,B,5
Чтобы достичь этого в искре, мне нужно выполнить 4 преобразования, как показано ниже, есть лучший / простой способ сделать это
public class ConditionalTrueEvent {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName(ConditionalTrueEvent.class.getName())
.master("local[*]").getOrCreate();
Dataset<Row> eventsDataSet = sparkSession.read()
.option("header", "true")
.csv("D:\\dev\\workspace\\java\\simple-kafka\\data\\test.csv");
eventsDataSet.createOrReplaceTempView("rawView");
sparkSession.sqlContext().sql("select * from rawView").show();
Dataset<Row> channel_changed = sparkSession.sqlContext().sql("select * , " +
" row_number() over group_1 as row_number_by_session_device , " +
" (case when (lag(Channel,1,'XXX') over group_1 != Channel) then 1 else 0 end ) as channel_changed " +
" from rawView " +
"window group_1 as (partition by Session_ID , Device_ID order by Channel_Time )");
channel_changed.createOrReplaceTempView("channel_changed");
Dataset<Row> channel_changed_filled = sparkSession.sqlContext().sql("select * , " +
" ( case when channel_changed = 1 then row_number_by_session_device else 0 end ) as channel_changed_filled_row_num " +
" from channel_changed " +
"window group_1 as (partition by Session_ID , Device_ID order by Channel_Time )");
channel_changed_filled.createOrReplaceTempView("channel_changed_filled");
Dataset<Row> channel_changed_final = sparkSession.sqlContext().sql("select * , " +
" ( case when channel_changed_filled_row_num = 0 then max(channel_changed_filled_row_num) over group_1 else channel_changed_filled_row_num end ) as Group_ID " +
" from channel_changed_filled " +
"window group_1 as (partition by Session_ID , Device_ID order by Channel_Time )");
channel_changed_final.createOrReplaceTempView("channel_changed_final");
sparkSession.close();
}
}
Выход Spark
+----------+---------+-----------------+-------+----------------------------+---------------+------------------------------+--------+
|Session_ID|Device_ID| Channel_Time|Channel|row_number_by_session_device|channel_changed|channel_changed_filled_row_num|Group_ID|
+----------+---------+-----------------+-------+----------------------------+---------------+------------------------------+--------+
| 1| 1|4/9/2018 15:00:00| A| 1| 1| 1| 1|
| 1| 1|4/9/2018 15:01:00| A| 2| 0| 0| 1|
| 1| 1|4/9/2018 15:02:00| B| 3| 1| 3| 3|
| 1| 1|4/9/2018 15:03:00| B| 4| 0| 0| 3|
| 1| 1|4/9/2018 15:04:00| B| 5| 0| 0| 3|
| 1| 1|4/9/2018 15:05:00| C| 6| 1| 6| 6|
| 1| 1|4/9/2018 15:06:00| C| 7| 0| 0| 6|
| 1| 1|4/9/2018 15:07:00| A| 8| 1| 8| 8|
| 1| 1|4/9/2018 15:08:00| A| 9| 0| 0| 8|
| 1| 1|4/9/2018 15:09:00| B| 10| 1| 10| 10|
| 1| 1|4/9/2018 15:10:00| B| 11| 0| 0| 10|
+----------+---------+-----------------+-------+----------------------------+---------------+------------------------------+--------+