Поэтому я написал себе вспомогательную функцию для преобразования схемы PySpark Dataframe в kudu.schema.Schema. Надеюсь, это кому-нибудь поможет. Обратная связь приветствуется!
Примечание, возможно, вы захотите добавить или изменить отображение типов данных.
import kudu
from kudu.client import Partitioning
def convert_to_kudu_schema(df_schema, primary_keys):
builder = kudu.schema.SchemaBuilder()
data_type_map = {
"StringType":kudu.string,
"LongType":kudu.int64,
"IntegerType":kudu.int32,
"FloatType":kudu.float,
"DoubleType":kudu.double,
"BooleanType":kudu.bool,
"TimestampType":kudu.unixtime_micros,
}
for sf in df_schema:
pk = False
nullable=sf.nullable
if (sf.name in primary_keys):
pk = True
nullable = False
builder.add_column(
name=sf.name,
nullable=nullable,
type_=data_type_map[str(sf.dataType)]
)
builder.set_primary_keys(primary_keys)
return builder.build()
Вы можете назвать это так:
kudu_schema = convert_to_kudu_schema(df.schema,primary_keys=["key1","key2"])
Я все еще открыт для более элегантного решения. ;)