Я пытаюсь прочитать данные из концентратора событий в Databricks и хочу дать ему структуру col1, col2 и т. Д.
Проблема: я вижу, что только 1-я запись имеет правильную структуру и не загружает остальные данные.
Данные в концентраторе событий выглядят так же, как показано ниже - есть 3 записи, и каждая запись разбита на 2 строки после столбца DateTime ->
корпус
24,5300,123456,1, ПЛАН-QD, ПМТ, 10/09/15
00: 00,1253323, ПРОЦЕНТЫ, КРЕДИТНЫЙ-AS, NULL
32,1300,12458,2, ПЛАН, PMT, 25/09/15
00: 00,12532123, ПРОЦЕНТЫ, Ссуда, NULL
36,1400,19458,25, план, PMTS, 25/11/15
00: 00,92532163, ПРОЦЕНТЫ, КРЕДИТНЫЙ-DS, NULL
Заголовки столбцов (отсутствуют в evnt-хабе и только для справки, но при выводе в таблицу они должны быть там) ->
идентификатор, Bal, accnum, активный, план, состояние, DateTime, Тип кредит, Где
Мой код как ниже-
import org.apache.spark.eventhubs ._
import org.apache.spark.sql.types ._
import org.apache.spark.sql.functions ._
import spark.implicits ._
val connectionString = ConnectionStringBuilder ("моя строка подключения"). SetEventHubName ("oth-Transactions"). Build // эта строка подключения для чтения из eventhub
val customEventhubParameters = EventHubsConf (connectionString)
val ConsumerDF = spark.readStream.format ("eventhubs"). Options (customEventhubParameters.toMap) .option ("checkpointLocation", "/tmp/checkpoint").load()
val OTHDF = ConsumerDF.select ($ "body" приведено "string")
val OTHDF2 = OTHDF.withColumn ("temp", split (col ("body"), "\,")). Select (
(0 until 52).map(i => col("temp").getItem(i).as(s"col$i")): _*
)
OTHDF2.printSchema
OTHDF2.writeStream.format ("delta"). OutputMode ("append"). Option ("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("Table_Name")*
Могу ли я получить несколько советов о том, как я могу прочитать все эти записи и поместить их в таблицу в виде блоков данных?
Заранее спасибо !!