У меня возникает одна проблема при записи моего потокового фрейма с искрой в kafka.Я пишу фрейм данных в виде структуры JSON.Следующий способ, которым я пользуюсь для записи
val df =df_agg.select($"Country",$"plan",$"value")
df.selectExpr("to_json(struct(*)) AS value").writeStream.format("kafka").option("topic", "topicname").option("kafka.bootstrap.servers", "ddd.dl.uk.ddd.com:8002").option("sasl.kerberos.service.name","kafka").option("checkpointLocation", "/user/dddff/ddd/").option("kafka.security.protocol","SASL_PLAINTEXT").option("Partitioner.class","DefaultPartitioner").start().awaitTermination()
Проблема заключается в том, что когда мое значение для столбца "страна" пусто, поле даже не записывается.Например, я получаю dataframe df как
US,postpaid,300
CAN,prepaid,30
,postpaid,400
мой вывод на kafka
{"country":"US","plan":postpaid,"value":300}
{"country":"CAN","plan":0.0,"value":30}
{"plan":postpaid,"value":400}
Но мой ожидаемый вывод
{"country":"US","plan":postpaid,"value":300}
{"country":"CAN","plan":0.0,"value":30}
{"country":"","plan":postpaid,"value":400}
Как мне этого добиться?пожалуйста помогите