Я выполняю запрос потоковой структурированной искры на источнике потока кинезиса.Он читает (без триггера времени) из kinesis, вычисляет агрегацию и отправляет результат (outputMode "update") с помощью пользовательского приемника DynamodB.
spark версия: 2.1.1 и исходный код kinesis lib: https://github.com/maropu/spark-kinesis-sql-asl
Я хочу обработать 1 500 событий в секунду (я создаю фальшивые события, используя KinesisDataGenerator: https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)
Все работает на AWS EMR, через оболочку spark (это POC)
Я немного настроился и пришел к этим параметрам:
- 2 кинезисных осколка
- 2 000 WCU для динамической базы данных
- 1 одиночный кластерный узел m4.4xlarge (16 процессоров64 go)
Все работает нормально в течение примерно получаса, и кажется, что внезапно происходит некоторая задержка, каждая партия занимает все больше и больше времени, пока не будет ProvisionedThroughputExceededException (количество операций чтения превышает лимит)
У меня нет проблем с DynamodB (пики WCU около 1500 с 2000 подготовкой, без дросселированных запросов на запись)
Считывания Kinesis довольно стабильны,но до тех пор, пока процесс замедляется, все больше и больше событий считываются одновременно из кинезиса, и он вылетает.
Я добавил до 8 осколков, и то же самое происходит, я выигрываю лишь немного времени, прежде чемProvisionedThroughputExceededException (nb: число процессоров экземпляра> чем число осколков кинезиса)
Я изменил пропускную способность загрузки кинесиса с 500 до 1500 событий / сек, и то же самое происходит примерно в половине часа.
Использование большего экземпляра дает те же результаты ... Я пытался с меньшим количеством событий (100 различных случайных событий на входе против 600 000 различных): то же самое
Относительно приемника динамодаба: вычисляется вторая агрегацияна результат потока запросов (потому что искровая структурированная потоковая передача не допускает множественных агрегаций).
Сквозной процесс состоит из: «Чтение из kinesis -> Потоковый запрос (агрегация 1) -> Dynamodb Sink (агрегация 2 + DynamodB вставки)».Я использую «update» outputMode, чтобы в приемник отправлялись только строки, обновленные из таблицы результатов.
Я добавил несколько журналов в метод addBatch приемника:
- числовходные строки (результат потокового запроса)
- количество запросов на вставку (результат потокового запроса после моего второго накопления, второе число всегда равно или меньше первого)
- истекшее время для всегопроцесс обработки.
Для кинезисной загрузки 500 событий / сек я получил что-то вроде этого:
....
addBatch(4) -> input rows 3574 output insertRequests 3449
addBatch(4) -> Elapsed time: 6.983257993seconds
addBatch(5) -> input rows 4052 output insertRequests 3909
addBatch(5) -> Elapsed time: 5.83499691seconds
addBatch(6) -> input rows 2745 output insertRequests 2657
addBatch(6) -> Elapsed time: 4.879952608seconds
addBatch(7) -> input rows 3081 output insertRequests 2969
addBatch(7) -> Elapsed time: 3.939775103seconds
addBatch(8) -> input rows 2010 output insertRequests 1925
addBatch(8) -> Elapsed time: 5.151868592seconds
addBatch(9) -> input rows 2977 output insertRequests 2869
addBatch(9) -> Elapsed time: 4.722617963seconds
addBatch(10) -> input rows 2530 output insertRequests 2448
addBatch(10) -> Elapsed time: 5.562205997seconds
....
addBatch(100) -> input rows 2996 output insertRequests 2864
addBatch(100) -> Elapsed time: 4.873477912seconds
addBatch(101) -> input rows 2535 output insertRequests 2458
addBatch(101) -> Elapsed time: 4.284861452seconds
addBatch(102) -> input rows 2477 output insertRequests 2388
addBatch(102) -> Elapsed time: 5.174349386seconds
addBatch(103) -> input rows 3942 output insertRequests 3791
addBatch(103) -> Elapsed time: 4.957703028seconds
addBatch(104) -> input rows 3490 output insertRequests 3366
addBatch(104) -> Elapsed time: 8.761093424seconds
.....
addBatch(224) -> Elapsed time: 4.475522722seconds
addBatch(225) -> input rows 2508 output insertRequests 2406
addBatch(225) -> Elapsed time: 5.819129461seconds
addBatch(226) -> input rows 3009 output insertRequests 2914
addBatch(226) -> Elapsed time: 10.994768506seconds
addBatch(227) -> input rows 4755 output insertRequests 4557
addBatch(227) -> Elapsed time: 11.034992149seconds
addBatch(228) -> input rows 6739 output insertRequests 6397
addBatch(228) -> Elapsed time: 12.409657235seconds
addBatch(229) -> input rows 5049 output insertRequests 4888
addBatch(229) -> Elapsed time: 12.584365428seconds
addBatch(230) -> input rows 7758 output insertRequests 7485
addBatch(230) -> Elapsed time: 18.631239639seconds
addBatch(231) -> input rows 9554 output insertRequests 9030
addBatch(231) -> Elapsed time: 18.426685181seconds
addBatch(232) -> input rows 8822 output insertRequests 8382
addBatch(232) -> Elapsed time: 15.920548608seconds
addBatch(233) -> input rows 9251 output insertRequests 8759
addBatch(233) -> Elapsed time: 19.984515183seconds
addBatch(234) -> input rows 10336 output insertRequests 9923
addBatch(234) -> Elapsed time: 20.487669168seconds
....
addBatch(247) -> Elapsed time: 25.539402107seconds
addBatch(248) -> input rows 12863 output insertRequests 12269
addBatch(248) -> Elapsed time: 27.541254374seconds
addBatch(249) -> input rows 13744 output insertRequests 13069
Для кинезисной загрузки 1000 событий / сек я получилчто-то вроде этого:
....
addBatch(8) -> input rows 5247 output insertRequests 5023
addBatch(8) -> Elapsed time: 5.162705257seconds
addBatch(9) -> input rows 7484 output insertRequests 7106
addBatch(9) -> Elapsed time: 5.829125017seconds
....
addBatch(19) -> input rows 13981 output insertRequests 13069
addBatch(19) -> Elapsed time: 8.643140194seconds
addBatch(20) -> input rows 14998 output insertRequests 13997
addBatch(20) -> Elapsed time: 8.930952027seconds
....
addBatch(70) -> input rows 13442 output insertRequests 12626
addBatch(70) -> Elapsed time: 7.891013506seconds
....
addBatch(146) -> Elapsed time: 11.865784296seconds
addBatch(147) -> input rows 19489 output insertRequests 17976
addBatch(147) -> Elapsed time: 11.57164558seconds
addBatch(148) -> input rows 18229 output insertRequests 16863
addBatch(148) -> Elapsed time: 12.319194939seconds
addBatch(149) -> input rows 20976 output insertRequests 19298
addBatch(149) -> Elapsed time: 13.426055377seconds
addBatch(150) -> input rows 21017 output insertRequests 19325
addBatch(150) -> Elapsed time: 12.750020974seconds
addBatch(151) -> input rows 19494 output insertRequests 17962
addBatch(151) -> Elapsed time: 11.652468707seconds
....
addBatch(180) -> input rows 22960 output insertRequests 21013
addBatch(180) -> Elapsed time: 15.388526621seconds
addBatch(181) -> input rows 22492 output insertRequests 20555
addBatch(181) -> Elapsed time: 14.175100191seconds
addBatch(182) -> input rows 22964 output insertRequests 21039
addBatch(182) -> Elapsed time: 16.682608754seconds
Я не могу понять, почему мой процесс задерживается.Почему первые партии довольно стабильны и почему это начинает занимать все больше и больше времени, пока не произойдет сбой примерно через полчаса, несмотря на изменение тестируемых параметров (размер экземпляра, количество осколков кинезиса ...).
Вотнекоторые вопросы по искровой структурированной потоковой передаче:
- Неограниченная таблица постоянно увеличивается в памяти?
- То же самое для таблицы результатов?
- Есть ли способограничить количество записей в потоке кинезиса?Я пробовал с триггером времени, но то же самое происходит, когда есть задержка, все читается до «сейчас».
- Заметно ли увеличение таблицы результатов, что замедляет процесс?Поиск строк обновления занимает все больше и больше времени?
Какой момент я упускаю?
Спасибо,
Филипп