Я пытаюсь конвертировать HQL в Spark.
У меня следующий запрос (работает в Hue с редактором Hive):
select reflect('java.util.UUID', 'randomUUID') as id,
tt.employee,
cast( from_unixtime(unix_timestamp (date_format(current_date(),'dd/MM/yyyy HH:mm:ss'), 'dd/MM/yyyy HH:mm:ss')) as timestamp) as insert_date,
collect_set(tt.employee_detail) as employee_details,
collect_set( tt.emp_indication ) as employees_indications,
named_struct ('employee_info', collect_set(tt.emp_info),
'employee_mod_info', collect_set(tt.emp_mod_info),
'employee_comments', collect_set(tt.emp_comment) )
as emp_mod_details,
from (
select views_ctr.employee,
if ( views_ctr.employee_details.so is not null, views_ctr.employee_details, null ) employee_detail,
if ( views_ctr.employee_info.so is not null, views_ctr.employee_info, null ) emp_info,
if ( views_ctr.employee_comments.so is not null, views_ctr.employee_comments, null ) emp_comment,
if ( views_ctr.employee_mod_info.so is not null, views_ctr.employee_mod_info, null ) emp_mod_info,
if ( views_ctr.emp_indications.so is not null, views_ctr.emp_indications, null ) employees_indication,
from
( select * from views_sta where emp_partition=0 and employee is not null ) views_ctr
) tt
group by employee
distribute by employee
Во-первых, я пытаюсь написать это в spark.sql
следующим образом:
sparkSession.sql("select reflect('java.util.UUID', 'randomUUID') as id, tt.employee, cast( from_unixtime(unix_timestamp (date_format(current_date(),'dd/MM/yyyy HH:mm:ss'), 'dd/MM/yyyy HH:mm:ss')) as timestamp) as insert_date, collect_set(tt.employee_detail) as employee_details, collect_set( tt.emp_indication ) as employees_indications, named_struct ('employee_info', collect_set(tt.emp_info), 'employee_mod_info', collect_set(tt.emp_mod_info), 'employee_comments', collect_set(tt.emp_comment) ) as emp_mod_details, from ( select views_ctr.employee, if ( views_ctr.employee_details.so is not null, views_ctr.employee_details, null ) employee_detail, if ( views_ctr.employee_info.so is not null, views_ctr.employee_info, null ) emp_info, if ( views_ctr.employee_comments.so is not null, views_ctr.employee_comments, null ) emp_comment, if ( views_ctr.employee_mod_info.so is not null, views_ctr.employee_mod_info, null ) emp_mod_info, if ( views_ctr.emp_indications.so is not null, views_ctr.emp_indications, null ) employees_indication, from ( select * from views_sta where emp_partition=0 and employee is not null ) views_ctr ) tt group by employee distribute by employee")
Но я получил следующее исключение:
Исключение в потоке "main" org.apache.spark.SparkException: Job
прервано из-за сбоя этапа: задача не сериализуема:
java.io.NotSerializableException:
org.apache.spark.unsafe.types.UTF8String $ IntWrapper
-объект не сериализуем (класс: org.apache.spark.unsafe.types.UTF8String $ IntWrapper, значение:
org.apache.spark.unsafe.types.UTF8String$IntWrapper@30cfd641)
Если я пытаюсь выполнить свой запрос без функции collect_set
, это может сработать, это может привести к сбою, так как типы столбцов структуры в моей таблице?
Как мне написать свой HQL-запрос в Spark / исправить мое исключение?