Как определить схему для Pyspark createDataFrame (rdd, schema)? - PullRequest
1 голос
/ 13 июля 2020

Я посмотрел spark-rdd в фрейм данных .

Я прочитал свой gziped json в rdd

rdd1 =sc.textFile('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')

Я хочу преобразовать его в искровой фрейм данных . Первый метод из связанного вопроса SO не работает. Это первая строка из файла.

{"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000", "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "", "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20", "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "", "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656, "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}

Как вывести схему?

В ответе говорится:

schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])

Почему range (32)?

Ответы [ 2 ]

3 голосов
/ 13 июля 2020

Чтобы ответить на ваш вопрос, диапазон (32) просто указывает количество столбцов, к которым класс StrucField может быть применен для требуемой схемы. В вашем случае 30 столбцов. На основе ваших данных мне удалось создать фрейм данных, используя ниже logi c:

from pyspark.sql.functions import *
from pyspark.sql.types import *

data_json = {"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000",
          "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "",
          "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20",
          "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "",
          "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656,
          "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
column_names = [x for x in data_json.keys()]
row_data = [([x for x in data_json.values()])]

input = []
for i in column_names:
  if str(type(data_json[i])).__contains__('str') :
    input.append(StructField(str(i), StringType(), True))
  elif str(type(data_json[i])).__contains__('int') and len(str(data_json[i])) <= 8:
         input.append(StructField(str(i), IntegerType(), True))
  else :
      input.append(StructField(str(i), LongType(), True))
  
schema = StructType(input)
data = spark.createDataFrame(row_data, schema)
data.show()

Вывод

# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
# |code_event|code_event_system|company_id|          date_event|     date_event_real|ecode_class|ecode_event|eperiod_event|  etl_date|event_no|group_no|          name_event|    name_event_short|odd_coefficient|odd_coefficient_entry|odd_coefficient_user|odd_ekey|odd_name|odd_status|odd_type|odd_voidfactor|odd_win_types|special_bet_value|  ticket_id|       id_update|topic_group|  kafka_key|  kafka_epoch|kafka_partition|         kafka_topic|
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
# |   1092406|            LOTTO|         2|2020-05-27 12:00:...|0001-01-01 00:00:...|           |        183|             |2020-05-27|       1|       0|Ungaria Putto - 8/20|Ungaria Putto - 8/20|              1|                    1|                   1|      11|      11|          |      11|             0|             |                 |899M-E2X93P|8000001036823656|       cwg5|899M-E2X93P|1590580609424|              0|tickets-calculated_2|
# +----------+-----------------+----------+--------------------+--------------------+-----------+-----------+-------------+----------+--------+--------+--------------------+--------------------+---------------+---------------------+--------------------+--------+--------+----------+--------+--------------+-------------+-----------------+-----------+----------------+-----------+-----------+-------------+---------------+--------------------+
2 голосов
/ 13 июля 2020

range(32) в этом примере является просто примером - они генерируют схему с 32 столбцами, каждый из которых имеет номер в качестве имени. Если вы действительно хотите определить схему, вам необходимо явно определить каждый столбец:

from pyspark.sql.types import *
schema = StructType([
    StructField('code_event', IntegerType(), True),
    StructField('code_event_system', StringType(), True),
    ...
    ])

Но лучше было бы избегать использования RDD API и напрямую читать файл в фрейм данных с помощью следующего кода (см. документацию ):

>>> data = spark.read.json('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
>>> data.printSchema()
root
 |-- code_event: string (nullable = true)
 |-- code_event_system: string (nullable = true)
 |-- company_id: string (nullable = true)
 |-- date_event: string (nullable = true)
 |-- date_event_real: string (nullable = true)
 |-- ecode_class: string (nullable = true)
 |-- ecode_event: string (nullable = true)
 |-- eperiod_event: string (nullable = true)
 |-- etl_date: string (nullable = true)
....
...