Поток из Kafka смешивает схему, как показано ниже
{
"header":{
"batch_id":"CustomerService_0_667_742",
"entity":"ActionItem",
"time":1536419113,
"key":[
{
"actionItemKey":"\"536870923\""
}
],
"message_type":"transmessage"
},
"body":{
"actionItemKey":"536870923",
"actionItemSourceId":"536870923",
"taskId":"1807271",
"actionItemTitle":"test",
"activeFlag":"1",
"startDate":"2018-07-27T07:44:57Z",
"dueDate":"2018-08-03T07:44:57Z",
"completionDate":"1753-01-01T05:50:36Z",
"originatorEmployeeKey":"10001",
"ownerEmployeeKey":"10001",
"actionItemTypeKey":"288",
"actionItemStatusKey":"32",
"actionItemPriorityKey":"296",
"customerServiceActivityStateKey":"Not Started",
"dml_action":"U",
"source_update_time__":"2018-09-08T15:05:13Z",
"source_query_time__":"2018-09-08T15:05:13Z",
"sourceSystemId":""
}
}
{
"header":{
"batch_id":"Invoice_0_39550_48481",
"entity":"TaxRate",
"time":1536419007,
"key":[
{
"taxRateKey":"\"1\""
}
],
"message_type":"refmessage"
},
"body":{
"taxCodeKey":"TX1",
"taxRate":5.0000,
"taxRateKey":"1",
"taxRuleCode":"R1",
"taxAuthorityCode":"COUNTRY",
"taxTypeId":"VAT",
"effectiveDate":"2000-01-01T06:00:00Z",
"taxRateId":"1",
"dml_action":"U",
"source_update_time__":"2018-09-08T15:03:27Z",
"source_query_time__":"2018-09-08T15:03:27Z",
"sourceSystemId":""
}
}
У нас есть более 200 таблиц с разными схемами, я не хочу указывать схему в Spark для каждой из них.Я хочу использовать структурированную потоковую передачу Spark, чтобы сохранить эти таблицы в HDFS, разделенную по имени сущности и current_date, как в формате json, и ниже приведен фрагмент кода.
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", DevConfig.BrokerHosts)
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
после чтения значений в виде строки и сохранения в hdfsзначения заключены в двойные кавычки.
как сохранить JSON в том формате, который был в Кафке?