В структурированной потоковой передаче блоков данных Azure (записная книжка Scala, подключенная к концентратору IoT Azure) Я открываю поток на конечной точке, совместимой с концентратором событий, в концентраторе IoT Azure.Затем я анализирую входящий поток на основе структурированной схемы и создаю 3 запроса (groupBy) в одном потоке.В большинстве случаев (кажется, не всегда) я получаю исключение по одному из запросов на отображение значения эпохи в разделе.(см. ниже) Я использую выделенную группу потребителей, о которой не читает ни одно другое приложение.Итак, я думаю, что будет поддерживаться открытие 1 потока и создание нескольких потоковых запросов против него?
Есть предложения, объяснения или идеи для решения этой проблемы?(Я хотел бы избежать создания 3 групп потребителей и повторного определения потока 3 раза)
Пример исключения:
org.apache.spark.SparkException: задание прервано из-заОшибка этапа: Задача 3 на этапе 1064.0 завершилась неудачно 4 раза, последний сбой: Потерянная задача 3.3 на этапе 1064.0 (TID 24790, 10.139.64.10, исполнитель 7): java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: создается новый приемник с более высоким периодом '0', следовательно, текущий приемник с периодом '0' отключается.Если вы воссоздаете приемник, убедитесь, что используется более высокая эпоха.Идентификатор отслеживания: xxxx, SystemTracker: iothub-name | databricks-db, метка времени: 2019-02-18T15: 25: 19, errorContext [NS: гггг, PATH: savanh-traffic-camera2 / ConsumerGroups / databricks-db / Partitions / 3,REFERENCE_ID: a0e445_7319_G2_1550503505013, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
Это мой код: (очищено)
// Define schema and create incoming camera eventstream
val cameraEventSchema = new StructType()
.add("TrajectId", StringType)
.add("EventTime", StringType)
.add("Country", StringType)
.add("Make", StringType)
val iotHubParameters =
EventHubsConf(cameraHubConnectionString)
.setConsumerGroup("databricks-db")
.setStartingPosition(EventPosition.fromEndOfStream)
val incomingStream = spark.readStream.format("eventhubs").options(iotHubParameters.toMap).load()
// Define parsing query selecting the required properties from the incoming telemetry data
val cameraMessages =
incomingStream
.withColumn("Offset", $"offset".cast(LongType))
.withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $"enqueuedTime".cast(LongType))
.withColumn("Body", $"body".cast(StringType))
// Select the event hub fields so we can work with them
.select("Offset", "Time (readable)", "Timestamp", "Body")
// Parse the "Body" column as a JSON Schema which we defined above
.select(from_json($"Body", cameraEventSchema) as "cameraevents")
// Now select the values from our JSON Structure and cast them manually to avoid problems
.select(
$"cameraevents.TrajectId".cast("string").alias("TrajectId"),
$"cameraevents.EventTime".cast("timestamp").alias("EventTime"),
$"cameraevents.Country".cast("string").alias("Country"),
$"cameraevents.Make".cast("string").alias("Make")
)
.withWatermark("EventTime", "10 seconds")
val groupedDataFrame =
cameraMessages
.groupBy(window($"EventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
display(groupedDataFrame)
val makeDataFrame =
cameraMessages
.groupBy("Make")
.agg(count("*") as 'count)
.sort($"count".desc)
display(makeDataFrame)
val countryDataFrame =
cameraMessages
.groupBy("Country")
.agg(count("*") as 'count)
.sort($"count".desc)
display(countryDataFrame)