Существует много различий между SAS и python и pyspark. Вот некоторые основные моменты:
Мы преобразуем данные, определяя шаги в конвейере выполнения.
(ABC
# Filter the data in the pipeline to just rows where A is 1234.
.filter(ABC.A == 1234)
# We can only create one column at a time, but these columns
# can be nested. We have to define the transformation that produces this column separately.
.withColumn('X', my_udf(ABC.A)))
Здесь мы определяем функцию для выполнения нашего преобразования в конвейере выполнения:
def my_func(A):
# Exit early if A is not greater than or equal to zero.
if A < 0:
return (None, None, None, None) # NOTE: we must return 4 columns.
C = A * 50000 # This is the same in almost every language ;)
padded = C.zfill(14) # Save our padded string ("z-filled") to be reused
X1 = int(padded[0:2]) # Here we slice the string to get a substring from ,and including, the 0 index to, but not including, the 2 index
X2 = int(padded[2:4])
X3 = int(padded[0] + padded[2])
X4 = int(padded[1] + padded[3])
return (X1, X2, X3, X4) # We return all four values packed in a "tuple". They'll be nested below our parent column in the new dataset.
Здесь мы определяем типы и столбцы, которые будет возвращать наша функция преобразования.
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType
schema = StructType([ # A struct is a data structure that holds other datastructures
StructField("X1", IntegerType()),
StructField("X2", IntegerType()),
StructField("X3", IntegerType()),
StructField("X4", IntegerType())
])
my_udf = udf(my_func, schema) # We define a function for use in pyspark, by combining a python function with a pyspark schema.
Вот как выглядит схема.
root
|-- X: struct (nullable = true)
| |-- X1: int (nullable = true)
| |-- X2: int (nullable = true)
| |-- X3: int (nullable = true)
| |-- X4: int (nullable = true)