Существует проблема с вашими данными, целое число 0300 не будет загружаться как нужный формат, для меня оно загружено как 192, поэтому сначала вам нужно загрузить его как строку, вам просто нужно назначить типы данных с использованием схемы при выполнении Загрузка. Обратитесь к документации . Например. для .csv:
from pyspark.sql import DataFrameReader
from pyspark.sql.types import *
schema = StructType([StructField("Year", StringType(), True), StructField("month", StringType(), True), StructField("date", StringType(), True), StructField("hhmm", StringType(), True)])
dd = DataFrameReader.csv(path='your/data/path', schema=schema)
После этого вам нужно исправить формат данных и преобразовать его в метку времени:
from pyspark.sql import functions as F
dd = spark.createDataFrame([('2019','2','13','1030'),('2018','2','14','1000'),('2029','12','13','300')],["Year","month","date","hhmm"])
dd = (dd.withColumn('month', F.when(F.length(F.col('month')) == 1, F.concat(F.lit('0'), F.col('month'))).otherwise(F.col('month')))
.withColumn('date', F.when(F.length(F.col('date')) == 1, F.concat(F.lit('0'), F.col('date'))).otherwise(F.col('date')))
.withColumn('hhmm', F.when(F.length(F.col('hhmm')) == 1, F.concat(F.lit('000'), F.col('hhmm')))
.when(F.length(F.col('hhmm')) == 2, F.concat(F.lit('00'), F.col('hhmm')))
.when(F.length(F.col('hhmm')) == 3, F.concat(F.lit('0'), F.col('hhmm')))
.otherwise(F.col('hhmm')))
.withColumn('time', F.to_timestamp(F.concat(*dd.columns), format='yyyyMMddHHmm'))
)
dd.show()
+----+-----+----+----+-------------------+
|Year|month|date|hhmm| time|
+----+-----+----+----+-------------------+
|2019| 02| 13|1030|2019-02-13 10:30:00|
|2018| 02| 14|1000|2018-02-14 10:00:00|
|2029| 12| 13|0300|2029-12-13 03:00:00|
+----+-----+----+----+-------------------+