Создайте массив типов данных struct из объединения двух таблиц Hive. - PullRequest
0 голосов
/ 30 октября 2018

У меня есть две таблицы в Улей -

 emp(empid int,empname string,deptid string)
 dept(deptid string, deptname string)

Пример данных

Таблица emp в Hive имеет схему empid int, строку empname, строку deptid

 1,Monami Sen,D01
 2,Tarun Sen,D02
 3,Shovik Sen,D03
 4, Rita Roy,D02
 5,Farhan,D01

В таблице Dept в Hive есть строка депонирования схемы, строка deptname

 D01,Finance
 D02,IT
 D03,Accounts
 D04,Admin

Мне нужно создать другую таблицу улья, которая должна иметь следующую схему -

dept id string, dept name string, emp_details array<struct<emp_id:string,emp_name string>>

Массив атрибута struct должен содержать все данные о сотрудниках - empid и empname, принадлежащие определенному отделу, а окончательный кадр данных должен быть преобразован в формат JSON.

Желаемый вывод:

{"deptid":"D01","deptname":"IT","empdetails":[{"empid":1,"empname":"Monami Sen"}]}
{"deptid":"D02","deptname":"Accounts","empdetails":[{"empid":2,"empname":"Rita Roy"}, 
{"empid":5,"empname":"Rijul Shah"}]}
{"deptid":"D03","deptname":"Finance","empdetails":[{"empid":3,"empname":"Shovik Sen"},{"empid":4,"empname":"Arghya Ghosh"}]}
{"deptid":"D04","deptname":"Adminstration","empdetails":[]}

Мне нужно использовать Spark версии 1.6 и Scala 2.10 для кодирования. Наборы данных огромны, поэтому для лучшей производительности потребуется эффективная обработка кода.

Не могли бы вы помочь мне с предложениями по коду?

1 Ответ

0 голосов
/ 30 октября 2018

Я бы предложил выполнить объединение left_outer с последующим агрегированием groupBy/collect_list и преобразованием toJSON, как показано ниже:

val empDF = Seq(
  (1, "Monami Sen", "D01"),
  (2, "Tarun Sen", "D02"),
  (3, "Shovik Sen", "D03"),
  (4, "Rita Roy", "D02"),
  (5, "Farhan", "D01")
).toDF("empid", "empname", "deptid")

val deptDF = Seq(
  ("D01", "Finance"),
  ("D02", "IT"),
  ("D03", "Accounts"),
  ("D04", "Admin")
).toDF("deptid", "deptname")

deptDF.join(empDF, Seq("deptid"), "left_outer").
  groupBy("deptid", "deptname").
  agg(collect_list(struct($"empid", $"empname")).as("empdetails")).
  toJSON.
  show(false)
// +----------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                 |
// +----------------------------------------------------------------------------------------------------------------------+
// |{"deptid":"D03","deptname":"Accounts","empdetails":[{"empid":3,"empname":"Shovik Sen"}]}                              |
// |{"deptid":"D02","deptname":"IT","empdetails":[{"empid":4,"empname":"Rita Roy"},{"empid":2,"empname":"Tarun Sen"}]}    |
// |{"deptid":"D01","deptname":"Finance","empdetails":[{"empid":5,"empname":"Farhan"},{"empid":1,"empname":"Monami Sen"}]}|
// |{"deptid":"D04","deptname":"Admin","empdetails":[{}]}                                                                 |
// +----------------------------------------------------------------------------------------------------------------------+

Для Spark 1.6 рассмотрите возможность агрегирования с помощью Spark SQL (поскольку collect_list не поддерживает непримитивные типы полей в Spark DataFrame API):

deptDF.join(empDF, Seq("deptid"), "left_outer").
  createOrReplaceTempView("joined_table")

val resultDF = sqlContext.sql("""
  select deptid, deptname, collect_list(struct(empid, empname)) as empdetails
  from joined_table
  group by deptid, deptname
""")

resultDF.toJSON.
  show(false)
...