Как сделать пустые значения доступными во фрейме данных при записи в kafka при потоковой передаче искры - PullRequest
0 голосов
/ 03 марта 2019

У меня возникает одна проблема при записи моего потокового фрейма с искрой в kafka.Я пишу фрейм данных в виде структуры JSON.Следующий способ, которым я пользуюсь для записи

    val df =df_agg.select($"Country",$"plan",$"value")  
    df.selectExpr("to_json(struct(*)) AS value").writeStream.format("kafka").option("topic", "topicname").option("kafka.bootstrap.servers", "ddd.dl.uk.ddd.com:8002").option("sasl.kerberos.service.name","kafka").option("checkpointLocation", "/user/dddff/ddd/").option("kafka.security.protocol","SASL_PLAINTEXT").option("Partitioner.class","DefaultPartitioner").start().awaitTermination()

Проблема заключается в том, что когда мое значение для столбца "страна" пусто, поле даже не записывается.Например, я получаю dataframe df как

    US,postpaid,300
    CAN,prepaid,30
     ,postpaid,400

мой вывод на kafka

    {"country":"US","plan":postpaid,"value":300}
    {"country":"CAN","plan":0.0,"value":30}
    {"plan":postpaid,"value":400}

Но мой ожидаемый вывод

    {"country":"US","plan":postpaid,"value":300}
    {"country":"CAN","plan":0.0,"value":30}
    {"country":"","plan":postpaid,"value":400}

Как мне этого добиться?пожалуйста помогите

...