Вы были в значительной степени там, просто when().psf.when()
не работает, если вы используете напрямую, тогда он работает.
from pyspark.sql import functions as psf
from pyspark.sql.functions import when
df = sqlContext.createDataFrame(
[
["2019-01-12"],
["20190112"],
["12/01/2019 11:22:11"],
["12-Jan-19"]
], ["my_ts"])
target_df = df \
.withColumn(
'my_new_ts',
when(
psf.to_timestamp(psf.col("my_ts"), "dd/MM/yyyy HH:mm:ss").isNotNull(),
psf.to_timestamp("my_ts", "dd/MM/yyyy HH:mm:ss")
) \
.when(
psf.to_timestamp(psf.col("my_ts"), "dd-MMM-yy").isNotNull(),
psf.to_timestamp("my_ts", "dd-MMM-yy")
) \
.when(
psf.to_timestamp(psf.col("my_ts"), "yyyyMMdd").isNotNull(),
psf.to_timestamp("my_ts", "yyyyMMdd")
) \
.otherwise(None)
)
df.show()
target_df.show()
Вывод:
+-------------------+
| my_ts|
+-------------------+
| 2019-01-12|
| 20190112|
|12/01/2019 11:22:11|
| 12-Jan-19|
+-------------------+
+-------------------+-------------------+
| my_ts| my_new_ts|
+-------------------+-------------------+
| 2019-01-12| null|
| 20190112|2019-01-12 00:00:00|
|12/01/2019 11:22:11|2019-01-12 11:22:11|
| 12-Jan-19|2019-01-12 00:00:00|
+-------------------+-------------------+
Кроме того, если вы хотите более краткую версию, вы можете использовать psf.coalesce
:
from pyspark.sql import functions as psf
target_df = df.select("*",
psf.coalesce(
psf.to_timestamp("my_ts", "dd/MM/yyyy HH:mm:ss"),
psf.to_timestamp("my_ts", "dd-MMM-yy"),
psf.to_timestamp("my_ts", "yyyyMMdd")
).alias("my_new_ts"))