потоковая аналитика: задержка обработки событий, проблемы с производительностью - PullRequest
0 голосов
/ 22 мая 2018

Я столкнулся с некоторой задержкой при обработке событий с помощью потоковой аналитики.Я работаю над проектом IoT, и шлюз отправляет сообщения на IoTHub.Поскольку мы имеем дело с сообщениями другого типа, через конечную точку концентратора событий мы маршрутизируем сообщения.Наше задание Stream Analytics выберет события из концентратора событий и начнет обработку и отправку в очередь служебной шины.

После запуска задания в течение следующей минуты сообщения отправляются в очередь служебной шины.Таким образом, мы сравнили разницу во времени между временем создания концентратора событий и временем процесса потоковой аналитики.Первоначально он будет выполняться в течение 5-10 секунд.

Но через 1 час эта задержка увеличится до 15 секунд и будет продолжаться.Таким образом, через 6 или 7 часов время создания и доставки сообщения в очередь шины обслуживания составит более 1 минуты.Поскольку наше приложение напрямую основано на данных в реальном времени, мне может потребоваться перезапустить задание через 7 часов (что не является постоянным решением).

В концентраторе событий я использую 4 раздела и 3 потоковых модуля,Поскольку использование потоковых единиц очень низкое (16%), поэтому увеличение потоковых единиц не является решением этой проблемы.

Пожалуйста, помогите в этом вопросе.

My Query Analytics Query Giving Ниже:

С

сырое сообщение AS

(

ВЫБРАТЬ

digitaleventhubstreaminputonlineclassaforrawdata. *,

GetMetadataPropertyValue (digitaleventhubstreama)*

'EventHub.IoTConnectionDeviceId') as iotdevice

FROM digitaleventhubstreaminputonlineclassaforrawdata

Partition By PartitionId

)

,

AS

(

SELECT

rawmessage. *,

digitalbloreferenceinputnmea. *,

digitalblorereferenceinputwidget. *,

digitalblobreferenceinputscalingfactor. *

FROM rawmessage Partition By PartitionId

ВЛЕВО ПРИСОЕДИНЯЙТЕСЬ к digitalblobreferenceinputnmea ON rawmessage.vessel_id =digitalblobreferenceinputnmea.vessel_id

ВЛЕВО ПРИСОЕДИНЯЕТСЯ

)

,

обработанное сообщение AS

(

SELECT

event.vessel_id as vessel_id_fk,

event.iotdevice в качестве device_id_fk,

event.PartitionId в качестве partitionId, UDF.getEpochTime (event.EventProcessedUtcTime) в качестве EventProcessedUtcTime,

UDF.getAnalyticsProcessTimeTime (arg) AnalyticsProcessTimeTimeTime (arg) как arg ()1075 *

ОТ процесса обработки сообщений как события PartitionId PartitionId

)

- вывод записывается в очередь служебной шины для проталкивания сокета

SELECT * INTO digitalqueueoutputonlineclassasocketdata ОТ обработанного сообщения РазделениеПо PartitionId

1 Ответ

0 голосов
/ 03 июня 2018

Мы собрали тикет, и они решили его. Это ошибка от Microsoft, и они развернули изменения во всех регионах. Задержка произошла при объединении эталонного ввода.

...