Конвертировать HQL в SparkSQL - PullRequest
1 голос
/ 22 апреля 2019

Я пытаюсь конвертировать HQL в Spark.

У меня следующий запрос (работает в Hue с редактором Hive):

select reflect('java.util.UUID', 'randomUUID') as id,
    tt.employee, 
    cast( from_unixtime(unix_timestamp (date_format(current_date(),'dd/MM/yyyy HH:mm:ss'), 'dd/MM/yyyy HH:mm:ss')) as timestamp) as insert_date,
    collect_set(tt.employee_detail) as employee_details,
    collect_set( tt.emp_indication ) as employees_indications,
    named_struct ('employee_info', collect_set(tt.emp_info),
        'employee_mod_info', collect_set(tt.emp_mod_info),
        'employee_comments', collect_set(tt.emp_comment) )
        as emp_mod_details,
    from (
        select views_ctr.employee,
        if ( views_ctr.employee_details.so is not null, views_ctr.employee_details, null ) employee_detail,
        if ( views_ctr.employee_info.so is not null, views_ctr.employee_info, null ) emp_info,
        if ( views_ctr.employee_comments.so is not null, views_ctr.employee_comments, null ) emp_comment,
        if ( views_ctr.employee_mod_info.so is not null, views_ctr.employee_mod_info, null ) emp_mod_info,
        if ( views_ctr.emp_indications.so is not null, views_ctr.emp_indications, null ) employees_indication,
        from 
        ( select * from views_sta where emp_partition=0 and employee is not null ) views_ctr
        ) tt
        group by employee
        distribute by employee

Во-первых, я пытаюсь написать это в spark.sql следующим образом:

sparkSession.sql("select reflect('java.util.UUID', 'randomUUID') as id, tt.employee,    cast( from_unixtime(unix_timestamp (date_format(current_date(),'dd/MM/yyyy HH:mm:ss'), 'dd/MM/yyyy HH:mm:ss')) as timestamp) as insert_date,    collect_set(tt.employee_detail) as employee_details,    collect_set( tt.emp_indication ) as employees_indications,  named_struct ('employee_info', collect_set(tt.emp_info),        'employee_mod_info', collect_set(tt.emp_mod_info),      'employee_comments', collect_set(tt.emp_comment) )      as emp_mod_details, from (      select views_ctr.employee,      if ( views_ctr.employee_details.so is not null, views_ctr.employee_details, null ) employee_detail,     if ( views_ctr.employee_info.so is not null, views_ctr.employee_info, null ) emp_info,      if ( views_ctr.employee_comments.so is not null, views_ctr.employee_comments, null ) emp_comment,       if ( views_ctr.employee_mod_info.so is not null, views_ctr.employee_mod_info, null ) emp_mod_info,      if ( views_ctr.emp_indications.so is not null, views_ctr.emp_indications, null ) employees_indication,      from        ( select * from views_sta where emp_partition=0 and employee is not null ) views_ctr        ) tt        group by employee       distribute by employee")

Но я получил следующее исключение:

Исключение в потоке "main" org.apache.spark.SparkException: Job прервано из-за сбоя этапа: задача не сериализуема: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String $ IntWrapper -объект не сериализуем (класс: org.apache.spark.unsafe.types.UTF8String $ IntWrapper, значение: org.apache.spark.unsafe.types.UTF8String$IntWrapper@30cfd641)

Если я пытаюсь выполнить свой запрос без функции collect_set, это может сработать, это может привести к сбою, так как типы столбцов структуры в моей таблице?

Как мне написать свой HQL-запрос в Spark / исправить мое исключение?

...