Я могу предложить решение на основе pyspark
. Реализация scala
должна быть почти прозрачной.
Моя идея состоит в том, чтобы создать столбец, заполненный уникальными временными метками (здесь 1980, например, но это не имеет значения) и добавить секунды на основе вашего первого столбца (номер строки). ). Затем вы просто переформатируете отметку времени, чтобы увидеть только часы
import pyspark.sql.functions as psf
df = (df
.withColumn("ts", psf.unix_timestamp(timestamp=psf.lit('1980-01-01 00:00:00'), format='YYYY-MM-dd HH:mm:ss'))
.withColumn("ts", psf.col("ts") + psf.col("i") - 1)
.withColumn("ts", psf.from_unixtime("ts", format='HH:mm:ss'))
)
df.show(2)
+---+----+---------+
| i| x| ts|
+---+----+---------+
| 1|57.5| 00:00:00|
| 2|24.0| 00:00:01|
+---+----+---------+
only showing top 2 rows
Генерация данных
df = spark.createDataFrame([(1,57.5),
(2,24.0),
(3,56.7),
(4,12.5),
(5,75.5)], ['i','x'])
df.show(2)
+---+----+
| i| x|
+---+----+
| 1|57.5|
| 2|24.0|
+---+----+
only showing top 2 rows
Обновление: если у вас нет номера строки в вашем CSV (из вашего комментария)
В этом случае вам потребуется функция row_number
.
Это не просто для числовых строк в Spark, поскольку данные распределены по независимым разделам и расположениям. Порядок, наблюдаемый в csv, не будет соблюдаться spark
при отображении строк файла в разделы. Я думаю, что было бы лучше не использовать Spark
для нумерации ваших строк в CSV, если важен порядок в файле. Шаг предварительной обработки, основанный на pandas
со всеми oop над всеми вашими файлами, по одному за раз, может заставить его работать.
В любом случае, я могу предложить вам решение , если вы не возражаете против того, чтобы порядок строк отличался от того, который указан в вашем csv на диске .
import pyspark.sql.window as psw
w = psw.Window.partitionBy().orderBy("x")
(df
.drop("i")
.withColumn("i", psf.row_number().over(w))
.withColumn("Timestamp", psf.unix_timestamp(timestamp=psf.lit('1980-01-01 00:00:00'), format='YYYY-MM-dd HH:mm:ss'))
.withColumn("Timestamp", psf.col("Timestamp") + psf.col("i") - 1)
.withColumn("Timestamp", psf.from_unixtime("Timestamp", format='HH:mm:ss'))
.show(2)
)
+----+---+---------+
| x| i|Timestamp|
+----+---+---------+
|12.5| 1| 00:00:00|
|24.0| 2| 00:00:01|
+----+---+---------+
only showing top 2 rows
In с точки зрения эффективности это плохо (это все равно что собирать все данные в master), потому что вы не используете partitionBy
. На этом шаге использование Spark
является излишним.
Вы также можете использовать временный столбец и использовать этот для заказа. В этом конкретном примере он выдаст ожидаемый результат, но не уверен, что в целом он отлично работает
w2 = psw.Window.partitionBy().orderBy("temp")
(df
.drop("i")
.withColumn("temp", psf.lit(1))
.withColumn("i", psf.row_number().over(w2))
.withColumn("Timestamp", psf.unix_timestamp(timestamp=psf.lit('1980-01-01 00:00:00'), format='YYYY-MM-dd HH:mm:ss'))
.withColumn("Timestamp", psf.col("Timestamp") + psf.col("i") - 1)
.withColumn("Timestamp", psf.from_unixtime("Timestamp", format='HH:mm:ss'))
.show(2)
)
+----+----+---+---------+
| x|temp| i|Timestamp|
+----+----+---+---------+
|57.5| 1| 1| 00:00:00|
|24.0| 1| 2| 00:00:01|
+----+----+---+---------+
only showing top 2 rows