Невозможно загрузить пакетные данные в Kinesis из Mysql базы данных - PullRequest
0 голосов
/ 27 мая 2020

Я не могу загрузить полные данные из mysql в Kinesis. Я могу видеть данные mysql по строкам в командной строке, но когда я пытаюсь преобразовать данные в Json, результат не отображается и не загружается в KInesis. Может быть, что-то мне не хватает в моем коде. Пожалуйста, помогите мне найти ошибку. Я новичок в AWS.

import json
import boto3
import pymysql
import socket,array
import pandas as pd
import os
import pymysql.cursors
from datetime import date, datetime
from pymysqlreplication import BinLogStreamReader

class DateTimeEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()

        return json.JSONEncoder.default(self, o)
def main():
  connection = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": "root"}
  access_key_id = os.getenv('access-key')
  secret_access_key = os.getenv('secret-key')
  kinesis = boto3.client("kinesis",region_name='ap-south- 
  1',aws_access_key_id=access_key_id,aws_secret_access_key=secret_access_key)
  db_connection = pymysql.connections.Connection(host='127.0.0.1', user="root", password="root", 
  database='mysqltest', connect_timeout=5)
  stream = pd.read_sql('select * from emp', con=db_connection)
  print(stream) 
  with db_connection.cursor(pymysql.cursors.DictCursor) as cursor:
    cursor.execute('''select * from emp''')
    row_headers = [x[0] for x in cursor.description]
    data = cursor.fetchall()
    json_data=[]
    for result in data:
        json_data.append(dict(zip(row_headers,result)))
    return json.dumps(json_data)
    kinesis.put_record(StreamName="KinesisMysqlStream", 
Data=json.dumps(json_data,cls=DateTimeEncoder), PartitionKey="default")
    print(json.dumps(json_data,cls=DateTimeEncoder))
if __name__ == "__main__":
   main()
...