См. Изображение ниже, чтобы проверить график концентратора событий.
Изображение
Невозможно получить сообщения из концентратора событий после запуска в течение некоторого времени, но как только мыперезапустите абонента, мы получили данные только за 4-5 минут.
var maxEventTrigger: Long = Constants.maxEventTrigger.toLong;
val customEventhubParameters = EventHubsConf(connStr)
.setMaxEventsPerTrigger(maxEventTrigger);
customEventhubParameters.setConsumerGroup("tenant-1234")
val incomingStream = spark.readStream.format("eventhubs")
.options(customEventhubParameters.toMap)
.load();
logger.info("Data has been fetched from event hub successfully");
val messages = incomingStream.withColumn("Offset", $ "offset".cast(LongType))
.withColumn("Time (readable)", $ "enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $ "enqueuedTime".cast(LongType))
.withColumn("Body", $ "body".cast(StringType))
.select("Offset", "Time (readable)", "Timestamp", "Body")