Pyspark код для выбора данных из таблицы HIVE и записи в HDFS в виде формата паркета - PullRequest
0 голосов
/ 12 октября 2018

Я пытаюсь выбрать данные из разделенной таблицы HIVE (разделенной на столбец - label_yyyy_mm_dd) для выбранного диапазона дат и записать добавление в HDFS в виде файла паркета.Однако получаю ошибку.Ниже приведен код и ошибка.

from pyspark.sql.functions 
import current_date, date_format, date_sub from datetime import datetime, timedelta import datetime 
  q  = """select label_yyyy_mm_dd
        ,label_yyyy_mm
        ,q_media_name
        ,a_accepted
        ,a_end_ts
        ,a_media_name
        ,a_resource_name
        ,a_start_ts
        ,k_callpurpose
        ,k_srf
        ,q_entry_ordinal
        ,q_interaction_id
        ,q_interaction_type
        ,q_ixn_resource_id
        ,q_resource_name
        ,a_consult_rcv_warm_engage_time
        ,a_consult_rcv_warm_hold_time
        ,a_consult_rcv_warm_wrap_time
        ,a_customer_handle_count
        ,a_customer_talk_duration
        ,a_interaction_resource_id
        ,a_interaction_id
        ,a_wrap_time
        a_technical_result
        ,k_ixn_type
        ,k_ixn_type_source
        ,k_transfer_count
        ,k_language
        ,k_agentauth
        ,k_auth,k_rg
        ,k_channel
        ,k_gms_result
        ,k_connid
        ,k_rbcprimaryid
        ,k_agent_id
        ,a_interaction_resource_ordinal 
    from prod_T0V0_cct0.cct0_gim_measures_gold A 
    inner join prod_T0V0_cct0.yle0_gim_date_time B on A.a_start_date_time_key = B.date_time_key     where label_yyyy_mm_dd
>='2017/03/07'      AND label_yyyy_mm_dd <='2017/03/31'"""   spark.sql(q).write.mode('append').parquet('hdfs:/prod/11323/app/H9A0/data/T0V0/DIG/test.parquet/label_yyyy_mm_dd=%s' %label_yyyy_mm_dd)

Сообщение об ошибке: -

NameError                                 Traceback (most recent call last)
<ipython-input-4-e695e7530d80> in <module>()
     42         where label_yyyy_mm_dd >='2017/03/07'
     43         AND label_yyyy_mm_dd <='2017/03/31'"""
---> 44 spark.sql(q).write.mode('append').parquet('hdfs:/prod/11323/app/H9A0/data/T0V0/DIG/test.parquet/label_yyyy_mm_dd=%s'%label_yyyy_mm_dd)

NameError: name 'label_yyyy_mm_dd' is not defined

Ответы [ 2 ]

0 голосов
/ 16 октября 2018

Вы загружаете полный фрейм данных в q фрейм данных.Таким образом, когда вы передаете

%label_yyyy_mm_dd 

, он не смог прочитать этот столбец, попробуйте следующее:

label_yyyy_mm_dd = q.select(“label_yyyy_mm_dd”) \
                  .rdd.map(lambda x:str(x[“label_yyyy_mm_dd”])).collect()

(выполните сбор, если в этом столбце есть несколько значений, в противном случае, если вам нужно только первое значениепросто замените .first ()

, но собирать не рекомендуется, так как это подразумевает загрузку.

0 голосов
/ 12 октября 2018

Сначала у вас есть

q = """select label_yyyy_mm_dd
    ,label_yyyy_mm
    ,q_media_name and so on'''

Создать DataFrame, в котором есть столбцы из 'q'.

df = spark.sql(q)

Затем выберите столбец 'label_yyyy_mm_dd' из DataFrame 'df'

label_yyyy_mm_dd = df.select('label_yyyy_mm_dd')

Преобразовать их в строку, приняв первое значение

label_yyyy_mm_dd_coll = ",".join(str("{0}".format(value.label_yyyy_mm_dd )) for 
value in label_yyyy_mm_dd.take(1))

Pass, переменная для ее записи.

df.write.mode('append').parquet('hdfs:/prod/11323/app/H9A0/data/T0V0/DIG/test.parquet/label_yyyy_mm_dd=%s' % label_yyyy_mm_dd_coll)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...