Конвертировать столбцы данных PySpark в строки с определенными условиями - PullRequest
0 голосов
/ 25 декабря 2018

Ниже приведен пример ввода моих данных.Может быть несколько столбцов, начиная с C, с различными оценками.

Выход будет следовать этой логике - столбцы A, B и E будут фиксированными для каждой строки для определенного значения A. Каждый столбец ввода(C, E ... X) будет соответствовать каждой строке.Если мы встречаем нуль, нам нужно отбросить это и искать в следующей строке. Как только C или D встретятся для любого конкретного значения A, я перейду к следующему значению A. Короче, нам нужнонаименьшие значения C и d для каждого значения A.

Input dataframe Output

1 Ответ

0 голосов
/ 26 декабря 2018

Вы сказали: ' Короче говоря, нам нужны наименьшие значения C и D для каждого значения A. ' Таким образом, по этой логике я вычислил минимальные значения C и D для конкретногоA. Ваш 3-й ряд выходных данных не совпадает с моим, потому что минимум для 130 D был 100,09.Если в логике есть некоторые изменения, вы можете сделать соответствующие изменения в соответствии с вашими потребностями.

from pyspark.sql.types import StringType, FloatType 
from pyspark.sql import Row, window
from pyspark.sql.functions import array, col, explode, struct, lit

schema = StructType([StructField('A', StringType()), StructField('B',FloatType()), 
                    StructField('C',FloatType()),StructField('D',FloatType()),
                    StructField('E',FloatType())])
rows = [Row(A='123',B=None,C=100.22,D=None,E=3501.88), Row(A='123',B=None,C=102.212,D=101.2187,E=3502.88),
        Row(A='123',B=None,C=103.22,D=103.22,E=3503.22), Row(A='130', B=None, C=None, D=101.22, E=355.0),
        Row(A='130',B=None,C=None,D=102.28,E=356.8), Row(A='130',B=None,C=100.09,D=100.09,E=357.8)]
df = spark.createDataFrame(rows, schema)
df.show()
+---+----+-------+--------+-------+
|  A|   B|      C|       D|      E|
+---+----+-------+--------+-------+
|123|null| 100.22|    null|3501.88|
|123|null|102.212|101.2187|3502.88|
|123|null| 103.22|  103.22|3503.22|
|130|null|   null|  101.22|  355.0|
|130|null|   null|  102.28|  356.8|
|130|null| 100.09|  100.09|  357.8|
+---+----+-------+--------+-------+

#This function is used to explode the DataFrame
def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

df = to_long(df[['A','C','D','E']], ['A','E'])
#df.show()
df = df.select(col('A'), col('Key').alias('XX'), col('val').alias('Score'), col('E').alias('ZZ'))
#df.show()
df = df.where(col("Score").isNotNull())
#df.show()

df.registerTempTable('table_view')
df1=sqlContext.sql(
    'select A, XX, min(Score) over (partition by A) as Score, ZZ from table_view'
)
df.registerTempTable('table_view')
df1=sqlContext.sql(
    'SELECT A, XX, Score, ZZ from (select *, min(Score) over (partition by A, XX) as minScore FROM table_view) M where Score = minScore'
)
df1.show()
+---+---+--------+-------+
|  A| XX|   Score|     ZZ|
+---+---+--------+-------+
|123|  C|  100.22|3501.88|
|123|  D|101.2187|3502.88|
|130|  C|  100.09|  357.8|
|130|  D|  100.09|  357.8|
+---+---+--------+-------+
...