Что-нибудь в Spark SQL похоже на CONDITIONAL_CHANGE_EVENT? - PullRequest
0 голосов
/ 14 мая 2018

Vertica имеет аналитическую функцию CONDITIONAL_CHANGE_EVENT, которая выполняет что-то вроде

rank("column-name") OVER (PARTITION BY category ORDER BY revenue DESC) as rank

https://my.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/SQLReferenceManual/Functions/Analytic/CONDITIONAL_CHANGE_EVENTAnalytic.htm%3FTocPath%3DSQL%2520Reference%2520Manual%7CSQL%2520Functions%7CAnalytic%2520Functions%7C_____9

Я хочу знать, есть ли простой способ подражать этой функции в Spark. Смотрите простой пример ниже

RAW Data

Session_ID,Device_ID,Channel_Time,Channel
1,1,4/9/2018 15:00:00,A
1,1,4/9/2018 15:01:00,A
1,1,4/9/2018 15:02:00,B
1,1,4/9/2018 15:03:00,B
1,1,4/9/2018 15:04:00,B
1,1,4/9/2018 15:05:00,C
1,1,4/9/2018 15:06:00,C
1,1,4/9/2018 15:07:00,A
1,1,4/9/2018 15:08:00,A
1,1,4/9/2018 15:09:00,B
1,1,4/9/2018 15:10:00,B

Я хочу сгенерировать идентификатор группы для указанного выше ввода, применив что-то вроде rank (channel) over (разбиение по Session_ID, порядок Device_ID по Channel_Time) , который недоступен в SPARK.

Ожидаемый результат

Session_ID,Device_ID,Channel_Time,Channel,Group-ID
1,1,4/9/2018 15:00:00,A,1
1,1,4/9/2018 15:01:00,A,1
1,1,4/9/2018 15:02:00,B,2
1,1,4/9/2018 15:03:00,B,2
1,1,4/9/2018 15:04:00,B,2
1,1,4/9/2018 15:05:00,C,3
1,1,4/9/2018 15:06:00,C,3
1,1,4/9/2018 15:07:00,A,4
1,1,4/9/2018 15:08:00,A,4
1,1,4/9/2018 15:09:00,B,5
1,1,4/9/2018 15:10:00,B,5

Чтобы достичь этого в искре, мне нужно выполнить 4 преобразования, как показано ниже, есть лучший / простой способ сделать это

public class ConditionalTrueEvent {

    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder()
                .appName(ConditionalTrueEvent.class.getName())
                .master("local[*]").getOrCreate();

        Dataset<Row> eventsDataSet = sparkSession.read()
                .option("header", "true")
                .csv("D:\\dev\\workspace\\java\\simple-kafka\\data\\test.csv");
        eventsDataSet.createOrReplaceTempView("rawView");
        sparkSession.sqlContext().sql("select * from rawView").show();

        Dataset<Row> channel_changed = sparkSession.sqlContext().sql("select * , " +
                " row_number() over group_1 as row_number_by_session_device , " +
                " (case when (lag(Channel,1,'XXX') over group_1 != Channel) then 1 else 0 end ) as channel_changed " +
                " from rawView " +
                "window group_1 as (partition by Session_ID , Device_ID order by Channel_Time )");
        channel_changed.createOrReplaceTempView("channel_changed");

        Dataset<Row> channel_changed_filled = sparkSession.sqlContext().sql("select * , " +
                "  ( case when channel_changed = 1 then row_number_by_session_device else 0 end ) as channel_changed_filled_row_num " +
                " from channel_changed " +
                "window group_1 as (partition by Session_ID , Device_ID order by Channel_Time )");
        channel_changed_filled.createOrReplaceTempView("channel_changed_filled");

        Dataset<Row> channel_changed_final = sparkSession.sqlContext().sql("select * , " +
                " ( case when channel_changed_filled_row_num = 0 then max(channel_changed_filled_row_num) over group_1 else channel_changed_filled_row_num end ) as Group_ID " +
                " from channel_changed_filled " +
                "window group_1 as (partition by Session_ID , Device_ID order by Channel_Time )");
        channel_changed_final.createOrReplaceTempView("channel_changed_final");

        sparkSession.close();
    }
}

Выход Spark

+----------+---------+-----------------+-------+----------------------------+---------------+------------------------------+--------+
|Session_ID|Device_ID|     Channel_Time|Channel|row_number_by_session_device|channel_changed|channel_changed_filled_row_num|Group_ID|
+----------+---------+-----------------+-------+----------------------------+---------------+------------------------------+--------+
|         1|        1|4/9/2018 15:00:00|      A|                           1|              1|                             1|       1|
|         1|        1|4/9/2018 15:01:00|      A|                           2|              0|                             0|       1|
|         1|        1|4/9/2018 15:02:00|      B|                           3|              1|                             3|       3|
|         1|        1|4/9/2018 15:03:00|      B|                           4|              0|                             0|       3|
|         1|        1|4/9/2018 15:04:00|      B|                           5|              0|                             0|       3|
|         1|        1|4/9/2018 15:05:00|      C|                           6|              1|                             6|       6|
|         1|        1|4/9/2018 15:06:00|      C|                           7|              0|                             0|       6|
|         1|        1|4/9/2018 15:07:00|      A|                           8|              1|                             8|       8|
|         1|        1|4/9/2018 15:08:00|      A|                           9|              0|                             0|       8|
|         1|        1|4/9/2018 15:09:00|      B|                          10|              1|                            10|      10|
|         1|        1|4/9/2018 15:10:00|      B|                          11|              0|                             0|      10|
+----------+---------+-----------------+-------+----------------------------+---------------+------------------------------+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...