Я столкнулся с некоторой задержкой при обработке событий с помощью потоковой аналитики.Я работаю над проектом IoT, и шлюз отправляет сообщения на IoTHub.Поскольку мы имеем дело с сообщениями другого типа, через конечную точку концентратора событий мы маршрутизируем сообщения.Наше задание Stream Analytics выберет события из концентратора событий и начнет обработку и отправку в очередь служебной шины.
После запуска задания в течение следующей минуты сообщения отправляются в очередь служебной шины.Таким образом, мы сравнили разницу во времени между временем создания концентратора событий и временем процесса потоковой аналитики.Первоначально он будет выполняться в течение 5-10 секунд.
Но через 1 час эта задержка увеличится до 15 секунд и будет продолжаться.Таким образом, через 6 или 7 часов время создания и доставки сообщения в очередь шины обслуживания составит более 1 минуты.Поскольку наше приложение напрямую основано на данных в реальном времени, мне может потребоваться перезапустить задание через 7 часов (что не является постоянным решением).
В концентраторе событий я использую 4 раздела и 3 потоковых модуля,Поскольку использование потоковых единиц очень низкое (16%), поэтому увеличение потоковых единиц не является решением этой проблемы.
Пожалуйста, помогите в этом вопросе.
My Query Analytics Query Giving Ниже:
С
сырое сообщение AS
(
ВЫБРАТЬ
digitaleventhubstreaminputonlineclassaforrawdata. *,
GetMetadataPropertyValue (digitaleventhubstreama)*
'EventHub.IoTConnectionDeviceId') as iotdevice
FROM digitaleventhubstreaminputonlineclassaforrawdata
Partition By PartitionId
)
,
AS
(
SELECT
rawmessage. *,
digitalbloreferenceinputnmea. *,
digitalblorereferenceinputwidget. *,
digitalblobreferenceinputscalingfactor. *
FROM rawmessage Partition By PartitionId
ВЛЕВО ПРИСОЕДИНЯЙТЕСЬ к digitalblobreferenceinputnmea ON rawmessage.vessel_id =digitalblobreferenceinputnmea.vessel_id
ВЛЕВО ПРИСОЕДИНЯЕТСЯ
)
,
обработанное сообщение AS
(
SELECT
event.vessel_id as vessel_id_fk,
event.iotdevice в качестве device_id_fk,
event.PartitionId в качестве partitionId, UDF.getEpochTime (event.EventProcessedUtcTime) в качестве EventProcessedUtcTime,
UDF.getAnalyticsProcessTimeTime (arg) AnalyticsProcessTimeTimeTime (arg) как arg ()1075 *
ОТ процесса обработки сообщений как события PartitionId PartitionId
)
- вывод записывается в очередь служебной шины для проталкивания сокета
SELECT * INTO digitalqueueoutputonlineclassasocketdata ОТ обработанного сообщения РазделениеПо PartitionId