Чтение данных из EventHub в таблицу в DataBricks с использованием scala - PullRequest
0 голосов
/ 30 октября 2018

Я пытаюсь прочитать данные из концентратора событий в Databricks и хочу дать ему структуру col1, col2 и т. Д.

Проблема: я вижу, что только 1-я запись имеет правильную структуру и не загружает остальные данные.

Данные в концентраторе событий выглядят так же, как показано ниже - есть 3 записи, и каждая запись разбита на 2 строки после столбца DateTime ->

корпус 24,5300,123456,1, ПЛАН-QD, ПМТ, 10/09/15 00: 00,1253323, ПРОЦЕНТЫ, КРЕДИТНЫЙ-AS, NULL 32,1300,12458,2, ПЛАН, PMT, 25/09/15 00: 00,12532123, ПРОЦЕНТЫ, Ссуда, NULL 36,1400,19458,25, план, PMTS, 25/11/15 00: 00,92532163, ПРОЦЕНТЫ, КРЕДИТНЫЙ-DS, NULL


Заголовки столбцов (отсутствуют в evnt-хабе и только для справки, но при выводе в таблицу они должны быть там) -> идентификатор, Bal, accnum, активный, план, состояние, DateTime, Тип кредит, Где

Мой код как ниже-

import org.apache.spark.eventhubs ._

import org.apache.spark.sql.types ._

import org.apache.spark.sql.functions ._

import spark.implicits ._

val connectionString = ConnectionStringBuilder ("моя строка подключения"). SetEventHubName ("oth-Transactions"). Build // эта строка подключения для чтения из eventhub

val customEventhubParameters = EventHubsConf (connectionString)

val ConsumerDF = spark.readStream.format ("eventhubs"). Options (customEventhubParameters.toMap) .option ("checkpointLocation", "/tmp/checkpoint").load()

val OTHDF = ConsumerDF.select ($ "body" приведено "string")

val OTHDF2 = OTHDF.withColumn ("temp", split (col ("body"), "\,")). Select (

(0 until 52).map(i => col("temp").getItem(i).as(s"col$i")): _*

)

OTHDF2.printSchema

OTHDF2.writeStream.format ("delta"). OutputMode ("append"). Option ("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("Table_Name")*

Могу ли я получить несколько советов о том, как я могу прочитать все эти записи и поместить их в таблицу в виде блоков данных?

Заранее спасибо !!

1 Ответ

0 голосов
/ 31 октября 2018

Вы разделяете столбцы тела, используя "\" вместо ",", пожалуйста, попробуйте.

val OTHDF2 = OTHDF.withColumn("temp", split(col("body"), ","))
               .select($"_tmp".getItem(0).as("id")
                      ,$"_tmp".getItem(1).as("Bal")
                      ,$"_tmp".getItem(2).as("accnum")
                      ,$"_tmp".getItem(3).as("active")
                      ,$"_tmp".getItem(4).as("plan")
                      ,$"_tmp".getItem(5).as("Status")
                      ,$"_tmp".getItem(6).cast("timestamp").as("DateTime")
                      ,$"_tmp".getItem(7).as("Type")
                      ,$"_tmp".getItem(8).as("Loan")
                      ,$"_tmp".getItem(9).as("Where")
                     )
               .drop("_tmp")
               .writeStream
               .format("csv")
               .outputMode("append")
               .option("checkpointLocation", "/FileStore/checkpointLocation.csv")
               .option("path", "/FileStore/data.csv")
               .start()
...