Извлекайте данные из Azure EventHub для временного окна в Scala, используя IntelliJ - PullRequest
0 голосов
/ 23 сентября 2019

Я хотел бы получить данные за 1 день из концентратора событий Azure, применить логику и скопировать их в базу данных cosmos.Я могу получить данные из EventHub, но данные в потоковом режиме.Мне нужно получать данные только для временного окна (скажем, только на один день / или на 5 часов).

Ниже приведен код, который я пытался получить из Azure EventHub.

import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession


object FromEventHub{



 val spark = SparkSession
    .builder
    .appName("FromEventHubToCosmos")
    .getOrCreate()

  import spark.implicits._

  val connectionString = ConnectionStringBuilder()
    .setNamespaceName("NAMESPACE_NAME")
    .setEventHubName("EVENTHUB_NAME")
    .setSasKeyName("KEY_NAME")
    .setSasKey("KEY")
    .build

  val currTime = Instant.now
  val ehConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEnqueuedTime(currTime.minus(Duration.ofHours(5))))
  .setEndingPosition(EventPosition.fromEnqueuedTime(currTime))

  val reader =  spark
    .read
    .format("eventhubs")
    .options(ehConf.toMap)
    .load()


val newDF = reader.withColumn("Offset", $"offset".cast(LongType)).withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType)).withColumn("Timestamp", $"enqueuedTime".cast(LongType)).withColumn("Body", $"body".cast(StringType)).select("Offset", "Time (readable)", "Timestamp", "Body")

newDF.show()


}

Я использовал setStartingPosition 5 часов раньше, но в Scala данные продолжают потоковую передачу из Eventhub.Мне просто нужны данные из концентратора событий до момента выполнения кода.

  1. Есть ли способ ограничить данные из концентратора событий с помощью временного окна или другими способами?

  2. Как управлять данными, доступными в данныхкадр для применения некоторой логики .?

Ответы [ 2 ]

0 голосов
/ 26 сентября 2019

Я также столкнулся с этой проблемой, когда задание Spark продолжало потоковую передачу и не завершалось.Если это поможет, проблему удалось решить, запустив код в блокноте Azure Databricks .Странно, но работа там заканчивается. Databricks Community Edition , также бесплатное.

0 голосов
/ 23 сентября 2019

Вы можете удалить часть потока и сделать это пакетным способом.(setEndingPosition работает только для пакетных запросов, как показано ниже).

  val reader =  spark
    .read
    .format("eventhubs")
    .options(ehConf.toMap)
    .load()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...