Кинесис сваливает записи по количеству осколков - PullRequest
0 голосов
/ 14 апреля 2020

Я пытаюсь отправить записи типа Seq [PutRecordsRequestEntry] в kinesis. У меня есть количество осколков, равное 2. Я пытаюсь отправить записи размером 700. Согласно моему исследованию, записи отправляют максимум 1000 за одну секунду в одном осколке. Теперь у меня есть два шарда, так что один запрос на запись пут должен содержать до 1000 записей одновременно. Но этого не происходит. Я могу отправить только 500 записей одновременно даже с двумя осколками.

sendToKinesis(streamName, records) map { putDataResult =>
         val putDataList = putDataResult.getRecords.asScala.toList
         println(putDataList)
       }


 def sendToKinesis(name: String, batch: Seq[PutRecordsRequestEntry]): Future[PutRecordsResult] =
   Future {
     val putRecordsRequest = {
       val putRecordsRequest = new PutRecordsRequest()
       putRecordsRequest.setStreamName(name)
       val putRecordsRequestEntryList: Seq[PutRecordsRequestEntry] = batch
       putRecordsRequest.setRecords(putRecordsRequestEntryList.asJava)
       putRecordsRequest
     }
     val putRecordsResult = kinesisClient.putRecords(putRecordsRequest)
     putRecordsResult
   }

Я получаю это исключение:


run-main-0) org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation error detected: Value '[PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=7821720c-aecd-422d-b93f-4fe4c3702d4f), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=b6d7aa72-4585-42e5-9784-b6a1fa83210c), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=80a6b14c-bb9d-442f-9c0c-895620b2fac9), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=d53232a2-b7b2-4150-87da-62a2b7f211f8), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=c5f4a8a5-1ec8-4c94-aa10-8d6380597c07), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=c72d9176-58ee-493c-979f-77dcfbc28158), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=43fcab8c-b9d4-4395-8304-9de88dd8e449), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=03fc51a7-af38-46f7-bf06-88c13c979f37), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=6503e977-ca62-4014-b72a-1a5e97dac254), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=34c6af9f-6a14-4ba9-badb-c2fed272255d), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=ccbd9b13-4b22-4b3d-9bc4-690e114e9a07), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=571 cap=571], explicitHashKey=null, partitionKey=024f800b-13ca-4ac6-af82-87b41ff3f25d)

Единственное, что я понимаю, это не о количество записей, это примерно размер записей, которые превысили, но как я узнаю, что если у меня есть два или четыре шарда, какой размер маха я могу поддерживать?

Я также увеличил количество шардов , но все еще не умеет писать. Нужно ли что-то изменить, чтобы сделать запись в несколько шардов?

Ключ раздела - это случайный uuid.

...