Возможно ли легко создать таблицу Kudu из PySpark Dataframe? - PullRequest
0 голосов
/ 31 октября 2018

В идеале будет работать следующий код:

import kudu 
from kudu.client import Partitioning

df = … #some spark dataframe 

# Connect to Kudu master server 
client = kudu.connect(host=‘…‘, port=7051)

# infer schema from spark dataframe
schema = df.schema 

# Define partitioning schema 
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) 

# Create new table 
client.create_table('dev.some_example', schema, partitioning)

Но client.create_table ожидает kudu.schema.Schema, а не структуру из фрейма данных. Однако в Scala вы можете сделать это (от https://kudu.apache.org/docs/developing.html):

kuduContext.createTable(
"dev.some_example", df.schema, Seq("key"),
new CreateTableOptions()
    .setNumReplicas(1)
    .addHashPartitions(List("key").asJava, 3))

Теперь мне стало интересно, могу ли я сделать то же самое с PySpark, не определяя вручную каждый столбец с помощью построителя схемы kudu?

1 Ответ

0 голосов
/ 31 октября 2018

Поэтому я написал себе вспомогательную функцию для преобразования схемы PySpark Dataframe в kudu.schema.Schema. Надеюсь, это кому-нибудь поможет. Обратная связь приветствуется!

Примечание, возможно, вы захотите добавить или изменить отображение типов данных.

import kudu
from kudu.client import Partitioning
def convert_to_kudu_schema(df_schema, primary_keys):
    builder = kudu.schema.SchemaBuilder()
    data_type_map = {
        "StringType":kudu.string,
        "LongType":kudu.int64,
        "IntegerType":kudu.int32,
        "FloatType":kudu.float,
        "DoubleType":kudu.double,
        "BooleanType":kudu.bool,
        "TimestampType":kudu.unixtime_micros,
    }

    for sf in df_schema:
        pk = False
        nullable=sf.nullable
        if (sf.name in primary_keys): 
            pk = True
            nullable = False

        builder.add_column(
            name=sf.name,
            nullable=nullable,
            type_=data_type_map[str(sf.dataType)]
        )
    builder.set_primary_keys(primary_keys)
    return builder.build()

Вы можете назвать это так:

kudu_schema = convert_to_kudu_schema(df.schema,primary_keys=["key1","key2"])

Я все еще открыт для более элегантного решения. ;)

...