1. Оконные функции
Вы можете использовать оконные функции, чтобы заполнить пробелы и интерполировать значения.
Давайте начнем с примера кадра данных:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
from pyspark.sql import Window
import numpy as np
df = spark.createDataFrame(
[[float(t)/10., float(v)] for t, v in zip(np.random.randint(0, 1000, 20), np.random.randint(100, 200, 20))],
schema=pst.StructType([pst.StructField(c, pst.FloatType()) for c in ['position', 'value']])) \
.withColumn('position_round', psf.round('position'))
+--------+-----+--------------+
|position|value|position_round|
+--------+-----+--------------+
| 68.5|121.0| 69.0|
| 76.3|126.0| 76.0|
| 88.3|150.0| 88.0|
| 59.0|197.0| 59.0|
| 20.7|119.0| 21.0|
| 0.1|167.0| 0.0|
| 20.1|177.0| 20.0|
| 81.9|199.0| 82.0|
| 63.6|163.0| 64.0|
| 32.4|115.0| 32.0|
| 43.6|130.0| 44.0|
| 11.9|175.0| 12.0|
| 68.2|176.0| 68.0|
| 28.9|184.0| 29.0|
| 46.3|199.0| 46.0|
| 9.7|155.0| 10.0|
| 57.8|163.0| 58.0|
| 83.6|173.0| 84.0|
| 16.2|169.0| 16.0|
| 87.1|127.0| 87.0|
+--------+-----+--------------+
Чтобы заполнить пробелы, мы создадим диапазон целых чисел:
start, end = list(df.agg(psf.min('position_round'), psf.max('position_round')).collect()[0])
pos_df = spark.range(start=start, end=end, step=1) \
.withColumnRenamed('id', 'position_round')
Теперь мы можем объединить два кадра данных:
w1 = Window.orderBy('position_round')
w2 = Window.partitionBy('group').orderBy('position_round')
df_resample = df \
.select(
'*',
psf.lead('position_round', 1).over(w1).alias('next_position'),
psf.lead('value', 1).over(w1).alias('next_value')) \
.join(pos_df, on='position_round', how='right') \
.withColumn('group', psf.sum((~psf.isnull('position')).cast('int')).over(w1)) \
.select(
'*',
(psf.row_number().over(w2) - 1).alias('i'),
psf.first(psf.col('next_position') - psf.col('position_round')).over(w2).alias('dx'),
psf.first('value').over(w2).alias('value0'),
psf.first(psf.col('next_value') - psf.col('value')).over(w2).alias('dy')) \
.withColumn(
'value_round',
psf.when((psf.col('dx') > 0) | psf.isnull('next_value'), psf.col('value0') + psf.col('i') * psf.col('dy') / psf.col('dx')) \
.otherwise(psf.col('value')))
- Первая функция окна заключается в сохранении
next_value
и next_position
, чтобы в дальнейшем можно было вычислять наши dx
и dy
- Затем нам нужно идентифицировать каждый пробел с отдельным
group
id, чтобы мы могли интерполировать значения для каждого отдельного линейного сегмента
- И последнее, но не менее важное: мы объединяем все необходимые нам элементы
- длина зазора:
dx
- дельта в значениях:
dy
- текущий индекс строки в промежутке
i
Теперь мы можем вычислить value_round
, интерполяцию value
в позиции position_round
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
|position_round|position|value|next_position|next_value|group| i| dx|value0| dy|value_round|
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
| 0| 0.1|167.0| 10.0| 155.0| 1| 0|10.0| 167.0|-12.0| 167.0|
| 1| null| null| null| null| 1| 1|10.0| 167.0|-12.0| 165.8|
| 2| null| null| null| null| 1| 2|10.0| 167.0|-12.0| 164.6|
| 3| null| null| null| null| 1| 3|10.0| 167.0|-12.0| 163.4|
| 4| null| null| null| null| 1| 4|10.0| 167.0|-12.0| 162.2|
| 5| null| null| null| null| 1| 5|10.0| 167.0|-12.0| 161.0|
| 6| null| null| null| null| 1| 6|10.0| 167.0|-12.0| 159.8|
| 7| null| null| null| null| 1| 7|10.0| 167.0|-12.0| 158.6|
| 8| null| null| null| null| 1| 8|10.0| 167.0|-12.0| 157.4|
| 9| null| null| null| null| 1| 9|10.0| 167.0|-12.0| 156.2|
| 10| 9.7|155.0| 12.0| 175.0| 2| 0| 2.0| 155.0| 20.0| 155.0|
| 11| null| null| null| null| 2| 1| 2.0| 155.0| 20.0| 165.0|
| 12| 11.9|175.0| 16.0| 169.0| 3| 0| 4.0| 175.0| -6.0| 175.0|
| 13| null| null| null| null| 3| 1| 4.0| 175.0| -6.0| 173.5|
| 14| null| null| null| null| 3| 2| 4.0| 175.0| -6.0| 172.0|
| 15| null| null| null| null| 3| 3| 4.0| 175.0| -6.0| 170.5|
| 16| 16.2|169.0| 20.0| 177.0| 4| 0| 4.0| 169.0| 8.0| 169.0|
| 17| null| null| null| null| 4| 1| 4.0| 169.0| 8.0| 171.0|
| 18| null| null| null| null| 4| 2| 4.0| 169.0| 8.0| 173.0|
| 19| null| null| null| null| 4| 3| 4.0| 169.0| 8.0| 175.0|
+--------------+--------+-----+-------------+----------+-----+---+----+------+-----+-----------+
2. UDF
Если вы не хотите использовать оконные функции, вы можете написать UDF
, чтобы выполнить интерполяцию в python
, а затем вернуть массив кортежей (position, value):
def interpolate(pos, next_pos, value, next_value):
if pos == next_pos or next_value is None:
return [(pos, value)]
return [[pos + i, value + i * (next_value - value) / (next_pos - pos)] for i in range(int(next_pos - pos))]
interpolate_udf = psf.udf(interpolate, pst.ArrayType(pst.StructType([pst.StructField(c, pst.FloatType()) for c in ['position_round', 'value_round']])))
Обратите внимание, что кортежи имеют тип StructType
, чтобы упростить "выравнивание" кортежей по столбцам.
w1 = Window.orderBy('position_round')
df_udf = df \
.select(
'*',
psf.lead('position_round', 1).over(w1).alias('next_position'),
psf.lead('value', 1).over(w1).alias('next_value')) \
.withColumn('tmp', psf.explode(interpolate_udf('position_round', 'next_position', 'value', 'next_value'))) \
.select('*', 'tmp.*').drop('tmp')
Вот что мы получаем:
+--------+-----+--------------+-------------+----------+--------------+----------+
|position|value|position_round|next_position|next_value|position_round|value_round|
+--------+-----+--------------+-------------+----------+--------------+----------+
| 0.1|167.0| 0.0| 10.0| 155.0| 0.0| 167.0|
| 0.1|167.0| 0.0| 10.0| 155.0| 1.0| 165.8|
| 0.1|167.0| 0.0| 10.0| 155.0| 2.0| 164.6|
| 0.1|167.0| 0.0| 10.0| 155.0| 3.0| 163.4|
| 0.1|167.0| 0.0| 10.0| 155.0| 4.0| 162.2|
| 0.1|167.0| 0.0| 10.0| 155.0| 5.0| 161.0|
| 0.1|167.0| 0.0| 10.0| 155.0| 6.0| 159.8|
| 0.1|167.0| 0.0| 10.0| 155.0| 7.0| 158.6|
| 0.1|167.0| 0.0| 10.0| 155.0| 8.0| 157.4|
| 0.1|167.0| 0.0| 10.0| 155.0| 9.0| 156.2|
| 9.7|155.0| 10.0| 12.0| 175.0| 10.0| 155.0|
| 9.7|155.0| 10.0| 12.0| 175.0| 11.0| 165.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 12.0| 175.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 13.0| 173.5|
| 11.9|175.0| 12.0| 16.0| 169.0| 14.0| 172.0|
| 11.9|175.0| 12.0| 16.0| 169.0| 15.0| 170.5|
| 16.2|169.0| 16.0| 20.0| 177.0| 16.0| 169.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 17.0| 171.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 18.0| 173.0|
| 16.2|169.0| 16.0| 20.0| 177.0| 19.0| 175.0|
+--------+-----+--------------+-------------+----------+--------------+----------+