Как создать новый столбец меток времени в файле CSV с помощью Spark - PullRequest
0 голосов
/ 24 апреля 2020

У меня есть образец CSV-файла со столбцами, как показано ниже.

col1,col2
1,57.5
2,24.0
3,56.7
4,12.5
5,75.5

Я хочу новый столбец Timestamp в формате HH:mm:ss, и отметка времени должна увеличиваться на секунды, как показано ниже.

col1,col2,ts
1,57.5,00:00:00
2,24.0,00:00:01
3,56.7,00:00:02
4,12.5,00:00:03
5,75.5,00:00:04

Заранее благодарим за помощь.

1 Ответ

1 голос
/ 24 апреля 2020

Я могу предложить решение на основе pyspark. Реализация scala должна быть почти прозрачной.

Моя идея состоит в том, чтобы создать столбец, заполненный уникальными временными метками (здесь 1980, например, но это не имеет значения) и добавить секунды на основе вашего первого столбца (номер строки). ). Затем вы просто переформатируете отметку времени, чтобы увидеть только часы

import pyspark.sql.functions as psf
df = (df
 .withColumn("ts", psf.unix_timestamp(timestamp=psf.lit('1980-01-01 00:00:00'), format='YYYY-MM-dd HH:mm:ss'))
 .withColumn("ts", psf.col("ts") + psf.col("i") - 1)
 .withColumn("ts", psf.from_unixtime("ts", format='HH:mm:ss'))
)
df.show(2)
+---+----+---------+
|  i|   x|       ts|
+---+----+---------+
|  1|57.5| 00:00:00|
|  2|24.0| 00:00:01|
+---+----+---------+
only showing top 2 rows

Генерация данных

df = spark.createDataFrame([(1,57.5),
(2,24.0),
(3,56.7),
(4,12.5),
(5,75.5)], ['i','x'])
df.show(2)
+---+----+
|  i|   x|
+---+----+
|  1|57.5|
|  2|24.0|
+---+----+
only showing top 2 rows

Обновление: если у вас нет номера строки в вашем CSV (из вашего комментария)

В этом случае вам потребуется функция row_number.

Это не просто для числовых строк в Spark, поскольку данные распределены по независимым разделам и расположениям. Порядок, наблюдаемый в csv, не будет соблюдаться spark при отображении строк файла в разделы. Я думаю, что было бы лучше не использовать Spark для нумерации ваших строк в CSV, если важен порядок в файле. Шаг предварительной обработки, основанный на pandas со всеми oop над всеми вашими файлами, по одному за раз, может заставить его работать.

В любом случае, я могу предложить вам решение , если вы не возражаете против того, чтобы порядок строк отличался от того, который указан в вашем csv на диске .

import pyspark.sql.window as psw
w = psw.Window.partitionBy().orderBy("x")
(df
 .drop("i")
  .withColumn("i", psf.row_number().over(w))
  .withColumn("Timestamp", psf.unix_timestamp(timestamp=psf.lit('1980-01-01 00:00:00'), format='YYYY-MM-dd HH:mm:ss'))
 .withColumn("Timestamp", psf.col("Timestamp") + psf.col("i") - 1)
 .withColumn("Timestamp", psf.from_unixtime("Timestamp", format='HH:mm:ss'))
 .show(2)
     )
+----+---+---------+
|   x|  i|Timestamp|
+----+---+---------+
|12.5|  1| 00:00:00|
|24.0|  2| 00:00:01|
+----+---+---------+
only showing top 2 rows

In с точки зрения эффективности это плохо (это все равно что собирать все данные в master), потому что вы не используете partitionBy. На этом шаге использование Spark является излишним.

Вы также можете использовать временный столбец и использовать этот для заказа. В этом конкретном примере он выдаст ожидаемый результат, но не уверен, что в целом он отлично работает

w2 = psw.Window.partitionBy().orderBy("temp")
(df
 .drop("i")
 .withColumn("temp", psf.lit(1))
  .withColumn("i", psf.row_number().over(w2))
  .withColumn("Timestamp", psf.unix_timestamp(timestamp=psf.lit('1980-01-01 00:00:00'), format='YYYY-MM-dd HH:mm:ss'))
 .withColumn("Timestamp", psf.col("Timestamp") + psf.col("i") - 1)
 .withColumn("Timestamp", psf.from_unixtime("Timestamp", format='HH:mm:ss'))
 .show(2)
     )
+----+----+---+---------+
|   x|temp|  i|Timestamp|
+----+----+---+---------+
|57.5|   1|  1| 00:00:00|
|24.0|   1|  2| 00:00:01|
+----+----+---+---------+
only showing top 2 rows
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...