Ошибка при перемещении данных в Amazon Kinesis (длина участника должна быть меньше или равна 1048576) - PullRequest
0 голосов
/ 28 мая 2020

Мой код python выдает ошибку ниже при перемещении данных из базы данных Mysql в Amazon Kinesis. Эта ошибка может быть связана с большим количеством строк в таблице. Она отлично работает для таблицы с меньшим количеством строк. Есть ли способ исправить эту ошибку? Пожалуйста посоветуй.

Ошибка: botocore.exceptions.ClientError: Произошла ошибка (ValidationException) при вызове операции PutRecord: обнаружена 1 ошибка проверки: значение в 'data' не соответствует ограничению: член должен иметь длину меньше или равно 1048576.

Ниже приведен код python, который я использую

from contextlib import closing
from datetime import datetime
import json
import mysql.connector as sql
import boto3
import os
DB_NAME = 'test'
DB_USER = 'root'
DB_PASS = 'xxx'
os.environ['AWS_PROFILE'] = "kinesis_developer"
kinesis = boto3.client('kinesis',region_name='ap-southeast-1')

def get_tables(cursor):
     cursor.execute('SHOW tables')
     return [r[0] for r in cursor.fetchall()] 

def get_rows_as_dicts(cursor, table):
     cursor.execute('select * from {}'.format(table))
     columns = [d[0] for d in cursor.description]
     return [dict(zip(columns, row)) for row in cursor.fetchall()]

class dump_date(json.JSONEncoder):
     def default(self, o):
        if isinstance(o, datetime):
           return o.isoformat()

        return json.JSONEncoder.default(self, o)

with closing(sql.connect(user=DB_USER, passwd=DB_PASS, db=DB_NAME)) as conn, closing(conn.cursor()) as cursor:
   dump = {}
   table = input("Enter name of the table to load: ")
   dump[table] = get_rows_as_dicts(cursor, table)
   kinesis.put_record(StreamName="KinesisStreamName", Data=json.dumps(dump,cls=dump_date), PartitionKey="default")
   print(json.dumps(dump,cls=dump_date)) 
...