запись данных JSON в hdfs с использованием потоковой структурированной искры - PullRequest
0 голосов
/ 24 сентября 2018

Поток из Kafka смешивает схему, как показано ниже

{
   "header":{
      "batch_id":"CustomerService_0_667_742",
      "entity":"ActionItem",
      "time":1536419113,
      "key":[
         {
            "actionItemKey":"\"536870923\""
         }
      ],
      "message_type":"transmessage"
   },
   "body":{
      "actionItemKey":"536870923",
      "actionItemSourceId":"536870923",
      "taskId":"1807271",
      "actionItemTitle":"test",
      "activeFlag":"1",
      "startDate":"2018-07-27T07:44:57Z",
      "dueDate":"2018-08-03T07:44:57Z",
      "completionDate":"1753-01-01T05:50:36Z",
      "originatorEmployeeKey":"10001",
      "ownerEmployeeKey":"10001",
      "actionItemTypeKey":"288",
      "actionItemStatusKey":"32",
      "actionItemPriorityKey":"296",
      "customerServiceActivityStateKey":"Not Started",
      "dml_action":"U",
      "source_update_time__":"2018-09-08T15:05:13Z",
      "source_query_time__":"2018-09-08T15:05:13Z",
      "sourceSystemId":""
   }
}
{
   "header":{
      "batch_id":"Invoice_0_39550_48481",
      "entity":"TaxRate",
      "time":1536419007,
      "key":[
         {
            "taxRateKey":"\"1\""
         }
      ],
      "message_type":"refmessage"
   },
   "body":{
      "taxCodeKey":"TX1",
      "taxRate":5.0000,
      "taxRateKey":"1",
      "taxRuleCode":"R1",
      "taxAuthorityCode":"COUNTRY",
      "taxTypeId":"VAT",
      "effectiveDate":"2000-01-01T06:00:00Z",
      "taxRateId":"1",
      "dml_action":"U",
      "source_update_time__":"2018-09-08T15:03:27Z",
      "source_query_time__":"2018-09-08T15:03:27Z",
      "sourceSystemId":""
   }
}

У нас есть более 200 таблиц с разными схемами, я не хочу указывать схему в Spark для каждой из них.Я хочу использовать структурированную потоковую передачу Spark, чтобы сохранить эти таблицы в HDFS, разделенную по имени сущности и current_date, как в формате json, и ниже приведен фрагмент кода.

val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", DevConfig.BrokerHosts)
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .as[String]

после чтения значений в виде строки и сохранения в hdfsзначения заключены в двойные кавычки.

как сохранить JSON в том формате, который был в Кафке?

...