Я хотел бы получить данные за 1 день из концентратора событий Azure, применить логику и скопировать их в базу данных cosmos.Я могу получить данные из EventHub, но данные в потоковом режиме.Мне нужно получать данные только для временного окна (скажем, только на один день / или на 5 часов).
Ниже приведен код, который я пытался получить из Azure EventHub.
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object FromEventHub{
val spark = SparkSession
.builder
.appName("FromEventHubToCosmos")
.getOrCreate()
import spark.implicits._
val connectionString = ConnectionStringBuilder()
.setNamespaceName("NAMESPACE_NAME")
.setEventHubName("EVENTHUB_NAME")
.setSasKeyName("KEY_NAME")
.setSasKey("KEY")
.build
val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEnqueuedTime(currTime.minus(Duration.ofHours(5))))
.setEndingPosition(EventPosition.fromEnqueuedTime(currTime))
val reader = spark
.read
.format("eventhubs")
.options(ehConf.toMap)
.load()
val newDF = reader.withColumn("Offset", $"offset".cast(LongType)).withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType)).withColumn("Timestamp", $"enqueuedTime".cast(LongType)).withColumn("Body", $"body".cast(StringType)).select("Offset", "Time (readable)", "Timestamp", "Body")
newDF.show()
}
Я использовал setStartingPosition 5 часов раньше, но в Scala данные продолжают потоковую передачу из Eventhub.Мне просто нужны данные из концентратора событий до момента выполнения кода.
Есть ли способ ограничить данные из концентратора событий с помощью временного окна или другими способами?
Как управлять данными, доступными в данныхкадр для применения некоторой логики .?