Преобразование столбца динамической даты в другой формат в фрейме данных pyspark - PullRequest
1 голос
/ 17 июня 2019
  1. У меня есть датафрейм

df = spark.createDataFrame([(1,2,3,{'dt_created':'2018-06-29T11:43:57.530Z','rand_col1':'val1'}),(4,5,6,{'rand_col2':'val2','rand_col3':'val3'}),(7,8,9,{'dt_uploaded':'2018-06-19T11:43:57.530Z','rand_col1':'val2'})]

  1. Столбец json может иметь или не иметь столбец даты, а ключ даты является динамическим
  2. Я хотел бы проверить, соответствует ли какое-либо значение в json формату даты, и если оно совпадает, хотел бы преобразовать его в другой формат

1 Ответ

1 голос
/ 17 июня 2019

Это можно решить простым способом с помощью функции UDF

Метод 1

Этот код пытается найти дату в вашем JSON, а затем преобразовать ее в новую дату и время(в моем примере я поместил его в новый столбец)

import re
from datetime import datetime

import pyspark.sql.functions as f
from pyspark.shell import spark

@f.udf()
def parse(column: dict):
    for value in column.values():
        if re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z', value):
            return datetime \
                .strptime(value, '%Y-%m-%dT%H:%M:%S.%fZ') \
                .strftime('%Y-%m-%d')

    return None

df = spark.createDataFrame([(1, 2, 3, {'dt_created': '2018-06-29T11:43:57.530Z', 'rand_col1': 'val1'}),
                            (4, 5, 6, {'rand_col2': 'val2', 'rand_col3': 'val3'}),
                            (7, 8, 9, {'dt_uploaded': '2018-06-19T11:43:57.530Z', 'rand_col1': 'val2'})],
                           ['A', 'B', 'C', 'D'])

df = df.withColumn('parse_dt', parse(f.col('D')))
df.show()

Вывод:

+---+---+---+--------------------+----------+
|  A|  B|  C|                   D|  parse_dt|
+---+---+---+--------------------+----------+
|  1|  2|  3|[dt_created -> 20...|2018-06-29|
|  4|  5|  6|[rand_col2 -> val...|      null|
|  7|  8|  9|[dt_uploaded -> 2...|2018-06-19|
+---+---+---+--------------------+----------+

Метод 2

Если вы хотите толькозаменить дату внутри JSON:

import re
from datetime import datetime

import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import MapType, StringType


@f.udf(returnType=MapType(StringType(), StringType()))
def parse(column: dict):
    for key, value in column.items():
        if re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z', value):
            column[key] = datetime \
                .strptime(value, '%Y-%m-%dT%H:%M:%S.%fZ') \
                .strftime('%Y-%m-%d')

    return column


df = spark.createDataFrame([(1, 2, 3, {'dt_created': '2018-06-29T11:43:57.530Z', 'rand_col1': 'val1'}),
                            (4, 5, 6, {'rand_col2': 'val2', 'rand_col3': 'val3'}),
                            (7, 8, 9, {'dt_uploaded': '2018-06-19T11:43:57.530Z', 'rand_col1': 'val2'})],
                           ['A', 'B', 'C', 'D'])

df = df.withColumn('D', parse(f.col('D')))
df.show(truncate=False)

Вывод:

+---+---+---+----------------------------------------------+
|A  |B  |C  |D                                             |
+---+---+---+----------------------------------------------+
|1  |2  |3  |[dt_created -> 2018-06-29, rand_col1 -> val1] |
|4  |5  |6  |[rand_col2 -> val2, rand_col3 -> val3]        |
|7  |8  |9  |[dt_uploaded -> 2018-06-19, rand_col1 -> val2]|
+---+---+---+----------------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...