Как объединить два ряда набора данных в один ряд в искре, используя Java - PullRequest
0 голосов
/ 08 марта 2019

Я читаю транзакции из темы Кафки в формате json. затем я применил некоторые преобразования, чтобы получить агрегаты на основе txn_status. Ниже приведена схема.

root | - window: struct (nullable = true) | | - начало: отметка времени (nullable = true) | | - конец: отметка времени (nullable = true) | - txn_status: string (nullable = true) | - count: long (nullable = false)

Мой пакетный вывод, как показано ниже, после применения группировки для заданного окно. [! [введите описание изображения здесь] [1]] [1]

но я хочу вывод, как показано ниже в формате json.

{
       “start_end_time”: “28/12/2018 11:32:00.000”,
       “count_Total” : 6
       “count_RCVD” : 5,
       “count_FAILED”: 1
  }


> how to combine two rows in a spark dataset.
> 
> 
>   [1]: https://i.stack.imgur.com/sCJuX.jpg

1 Ответ

0 голосов
/ 08 марта 2019

В соответствии с изображением, которое вы показали, я создал фрейм данных или временную таблицу и предоставил решение для вашего вопроса.

Scala Code:

case class txn_rec(txn_status: String, count: Int, start_end_time: String)

var txDf=sc.parallelize(Array(new txn_rec("FAIL",9,"2019-03-08 016:40:00, 2019-03-08 016:57:00"), 
    new txn_rec("RCVD",161,"2019-03-08 016:40:00, 2019-03-08 016:57:00"))).toDF

txDf.createOrReplaceTempView("temp")

var resDF=spark.sql("select start_end_time, (select sum(count) from temp) as total_count , (select count from temp where txn_status='RCVD') as rcvd_count,(select count from temp where txn_status='FAIL') as failed_count  from temp group by start_end_time")

resDF.show

resDF.toJSON.collectAsList.toString

Вы можете увидеть результат, как показано на снимке экрана.

Output-1

Output-2

...