искровая структурированная трансляция из кинезиса в динамод - PullRequest
0 голосов
/ 18 мая 2018

Я выполняю запрос потоковой структурированной искры на источнике потока кинезиса.Он читает (без триггера времени) из kinesis, вычисляет агрегацию и отправляет результат (outputMode "update") с помощью пользовательского приемника DynamodB.

spark версия: 2.1.1 и исходный код kinesis lib: https://github.com/maropu/spark-kinesis-sql-asl

Я хочу обработать 1 500 событий в секунду (я создаю фальшивые события, используя KinesisDataGenerator: https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)

Все работает на AWS EMR, через оболочку spark (это POC)

Я немного настроился и пришел к этим параметрам:

  • 2 кинезисных осколка
  • 2 000 WCU для динамической базы данных
  • 1 одиночный кластерный узел m4.4xlarge (16 процессоров64 go)

Все работает нормально в течение примерно получаса, и кажется, что внезапно происходит некоторая задержка, каждая партия занимает все больше и больше времени, пока не будет ProvisionedThroughputExceededException (количество операций чтения превышает лимит)

У меня нет проблем с DynamodB (пики WCU около 1500 с 2000 подготовкой, без дросселированных запросов на запись)

Считывания Kinesis довольно стабильны,но до тех пор, пока процесс замедляется, все больше и больше событий считываются одновременно из кинезиса, и он вылетает.

Я добавил до 8 осколков, и то же самое происходит, я выигрываю лишь немного времени, прежде чемProvisionedThroughputExceededException (nb: число процессоров экземпляра> чем число осколков кинезиса)

Я изменил пропускную способность загрузки кинесиса с 500 до 1500 событий / сек, и то же самое происходит примерно в половине часа.

Использование большего экземпляра дает те же результаты ... Я пытался с меньшим количеством событий (100 различных случайных событий на входе против 600 000 различных): то же самое

Относительно приемника динамодаба: вычисляется вторая агрегацияна результат потока запросов (потому что искровая структурированная потоковая передача не допускает множественных агрегаций).

Сквозной процесс состоит из: «Чтение из kinesis -> Потоковый запрос (агрегация 1) -> Dynamodb Sink (агрегация 2 + DynamodB вставки)».Я использую «update» outputMode, чтобы в приемник отправлялись только строки, обновленные из таблицы результатов.

Я добавил несколько журналов в метод addBatch приемника:

  • числовходные строки (результат потокового запроса)
  • количество запросов на вставку (результат потокового запроса после моего второго накопления, второе число всегда равно или меньше первого)
  • истекшее время для всегопроцесс обработки.

Для кинезисной загрузки 500 событий / сек я получил что-то вроде этого:

....
addBatch(4) -> input rows 3574 output insertRequests 3449                       
addBatch(4) -> Elapsed time: 6.983257993seconds
addBatch(5) -> input rows 4052 output insertRequests 3909                       
addBatch(5) -> Elapsed time: 5.83499691seconds
addBatch(6) -> input rows 2745 output insertRequests 2657                       
addBatch(6) -> Elapsed time: 4.879952608seconds
addBatch(7) -> input rows 3081 output insertRequests 2969                       
addBatch(7) -> Elapsed time: 3.939775103seconds
addBatch(8) -> input rows 2010 output insertRequests 1925                       
addBatch(8) -> Elapsed time: 5.151868592seconds
addBatch(9) -> input rows 2977 output insertRequests 2869                       
addBatch(9) -> Elapsed time: 4.722617963seconds
addBatch(10) -> input rows 2530 output insertRequests 2448                      
addBatch(10) -> Elapsed time: 5.562205997seconds
....
addBatch(100) -> input rows 2996 output insertRequests 2864                     
addBatch(100) -> Elapsed time: 4.873477912seconds
addBatch(101) -> input rows 2535 output insertRequests 2458                     
addBatch(101) -> Elapsed time: 4.284861452seconds
addBatch(102) -> input rows 2477 output insertRequests 2388                     
addBatch(102) -> Elapsed time: 5.174349386seconds
addBatch(103) -> input rows 3942 output insertRequests 3791                     
addBatch(103) -> Elapsed time: 4.957703028seconds
addBatch(104) -> input rows 3490 output insertRequests 3366                     
addBatch(104) -> Elapsed time: 8.761093424seconds
.....
addBatch(224) -> Elapsed time: 4.475522722seconds
addBatch(225) -> input rows 2508 output insertRequests 2406                     
addBatch(225) -> Elapsed time: 5.819129461seconds
addBatch(226) -> input rows 3009 output insertRequests 2914                     
addBatch(226) -> Elapsed time: 10.994768506seconds
addBatch(227) -> input rows 4755 output insertRequests 4557                     
addBatch(227) -> Elapsed time: 11.034992149seconds
addBatch(228) -> input rows 6739 output insertRequests 6397                     
addBatch(228) -> Elapsed time: 12.409657235seconds
addBatch(229) -> input rows 5049 output insertRequests 4888                     
addBatch(229) -> Elapsed time: 12.584365428seconds
addBatch(230) -> input rows 7758 output insertRequests 7485                     
addBatch(230) -> Elapsed time: 18.631239639seconds
addBatch(231) -> input rows 9554 output insertRequests 9030                     
addBatch(231) -> Elapsed time: 18.426685181seconds
addBatch(232) -> input rows 8822 output insertRequests 8382                     
addBatch(232) -> Elapsed time: 15.920548608seconds
addBatch(233) -> input rows 9251 output insertRequests 8759                     
addBatch(233) -> Elapsed time: 19.984515183seconds
addBatch(234) -> input rows 10336 output insertRequests 9923                    
addBatch(234) -> Elapsed time: 20.487669168seconds
....
addBatch(247) -> Elapsed time: 25.539402107seconds
addBatch(248) -> input rows 12863 output insertRequests 12269                   
addBatch(248) -> Elapsed time: 27.541254374seconds
addBatch(249) -> input rows 13744 output insertRequests 13069    

Для кинезисной загрузки 1000 событий / сек я получилчто-то вроде этого:

....
addBatch(8) -> input rows 5247 output insertRequests 5023                       
addBatch(8) -> Elapsed time: 5.162705257seconds
addBatch(9) -> input rows 7484 output insertRequests 7106                       
addBatch(9) -> Elapsed time: 5.829125017seconds
....
addBatch(19) -> input rows 13981 output insertRequests 13069                    
addBatch(19) -> Elapsed time: 8.643140194seconds
addBatch(20) -> input rows 14998 output insertRequests 13997                    
addBatch(20) -> Elapsed time: 8.930952027seconds
....
addBatch(70) -> input rows 13442 output insertRequests 12626                    
addBatch(70) -> Elapsed time: 7.891013506seconds
....
addBatch(146) -> Elapsed time: 11.865784296seconds
addBatch(147) -> input rows 19489 output insertRequests 17976                   
addBatch(147) -> Elapsed time: 11.57164558seconds
addBatch(148) -> input rows 18229 output insertRequests 16863                   
addBatch(148) -> Elapsed time: 12.319194939seconds
addBatch(149) -> input rows 20976 output insertRequests 19298                   
addBatch(149) -> Elapsed time: 13.426055377seconds
addBatch(150) -> input rows 21017 output insertRequests 19325                   
addBatch(150) -> Elapsed time: 12.750020974seconds
addBatch(151) -> input rows 19494 output insertRequests 17962                   
addBatch(151) -> Elapsed time: 11.652468707seconds
....
addBatch(180) -> input rows 22960 output insertRequests 21013                   
addBatch(180) -> Elapsed time: 15.388526621seconds
addBatch(181) -> input rows 22492 output insertRequests 20555                   
addBatch(181) -> Elapsed time: 14.175100191seconds
addBatch(182) -> input rows 22964 output insertRequests 21039                   
addBatch(182) -> Elapsed time: 16.682608754seconds

Я не могу понять, почему мой процесс задерживается.Почему первые партии довольно стабильны и почему это начинает занимать все больше и больше времени, пока не произойдет сбой примерно через полчаса, несмотря на изменение тестируемых параметров (размер экземпляра, количество осколков кинезиса ...).

Вотнекоторые вопросы по искровой структурированной потоковой передаче:

  • Неограниченная таблица постоянно увеличивается в памяти?
  • То же самое для таблицы результатов?
  • Есть ли способограничить количество записей в потоке кинезиса?Я пробовал с триггером времени, но то же самое происходит, когда есть задержка, все читается до «сейчас».
  • Заметно ли увеличение таблицы результатов, что замедляет процесс?Поиск строк обновления занимает все больше и больше времени?

Какой момент я упускаю?

Спасибо,

Филипп

...