Обновление с использованием SQLalquemy - PullRequest
0 голосов
/ 11 апреля 2019

Мне нужна помощь, пожалуйста.

Я получаю сообщение amazon sqs в цикле, и мне нужно сохранить эти сообщения в базе данных, поэтому для выполнения этой задачи я использую python и sqlalchemy, но застрял, когда пытался сохранить или обновить сообщение с помощью сообщение с использованием этого кода.

   try:
        arquivo = convert_obj(msg)
        object_msg = session.query(Arquivo).filter_by(id_doc_holmes=arquivo.id_doc_holmes).one()
        if object_msg:
            session.merge(object_msg)
            session.commit()
        session.add(arquivo)
        session.commit()
        return True
    except Exception as ex:
        print(ex)

но если я удаляю obj_msg = session.query (Arquivo) .filter_by (id_doc_holmes = arquivo.id_doc_holmes) .one (), это работает, но я просто добавляю и не обновляю.

Есть идеи?

Таблица

from  sqlalchemy import create_engine, Column, Integer, String, Boolean,  Date
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy.orm import sessionmaker, relationship


Base = declarative_base()

class Arquivo(Base):
     __tablename__ = "arquivo"

    id = Column('id', Integer, primary_key=True)
    id_doc_holmes = Column('id_doc_holmes', Integer)
    link = Column('link', String)
    nome_arquivo = Column('nome_arquivo', String)
    classe = Column('classe', String)
    id_carteira = Column('id_carteira', Integer)
    id_caso = Column('id_caso', Integer)
    nome_caso = Column('nome_caso', String)
    status = Column('status', String)
    nr_matricula = Column('nr_matricula', String)
    oficio_imovel = Column('oficio_imovel', String)
    comarca_imovel = Column('comarca_imovel', String)
    estado_imovel = Column('estado_imovel', String)
    dt_certificacao = Column('dt_certificacao', Date)
    proprietario = Column('proprietario', String)
    cpf_cnpj_prop = Column('cpf_cnpj_prop', String)
    porcentagem_prop =  Column('porcentagem_prop', String)
    id_alienacao = Column('id_alienacao', String)
    dt_alienacao = Column('dt_alienacao', Date)
    tipo_de_alienacao = Column('tipo_de_alienacao', String)
    tipo_de_imovel = Column('tipo_de_imovel', String)
    urbano_rural = Column('urbano_rural', String)
    tipo_logradouro = Column('tipo_logradouro', String)
    logradouro = Column('logradouro', String)
    cidade = Column('cidade', String)
    estado = Column('estado', String)
    area_terreno = Column('area_terreno', String)
    devedor_avalista_prop = Column('devedor_avalista_prop', String)
    onus = Column('onus', String)
    documento_relevante = Column('documento_relevante', String)


engine = create_engine('postgresql://postgres:teste@localhost:5432/teste')
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)

Цикл, который будет получать сообщения

def get_from_queue():
    try:       
        arquivo = Processador()
        for message in queue.receive_messages(VisibilityTimeout=1, MaxNumberOfMessages=10, WaitTimeSeconds=3):
            sucesso = arquivo.mensagem_parser(message.body)
            if sucesso:
                print('deleta mensagem')
            # message.delete()
            else:
                print('verificar mensagem')
    except Exception as ex:
       print(ex)

в то время как True: пытаться: get_from_queue () time.sleep (5) кроме Исключения как ex: печать (ех)

Класс, который будет обрабатывать сообщение

class Processador:

    @staticmethod
    def mensagem_parser(message):
        msg_canonico = None
        try:
            msg =  message.encode('ascii').decode('utf-8')
        except:
            return {"message": "Não foi possivel decodificar"}
        try:            
            msg_queue = json.loads(msg)         
        cria_ou_atualiza(msg_queue['Message'])
        return True
    except Exception as ex:
        msg_canonico = ex
        return msg_canonico

эту часть я сделал, чтобы упростить сохранение объекта

def convert_obj(msg, arquivo=None):
    arquivo = Arquivo() if arquivo is None else arquivo
    if msg['id_documento_holmes']:
        arquivo.id_doc_holmes = msg['id_documento_holmes']
    else:
        arquivo.id_doc_holmes = None
    if msg['link']:
        arquivo.link  = msg['link']
    else:
        arquivo.link = None
    if msg['nome']:
        arquivo.nome_arquivo  = msg['nome']
    else:
        arquivo.nome_arquivo = None
    if msg['classe']:
        arquivo.classe  = msg['classe']
    else:
        arquivo.classe = None
    if msg['id_carteira']:
        arquivo.id_carteira  = msg['id_carteira']
    else:
        arquivo.id_carteira = None
    if msg['id_caso']:
        arquivo.id_caso  = msg['id_caso']
    else:
        arquivo.id_caso =None
    if msg['nome_caso']:
        arquivo.nome_caso = msg['nome_caso']
    else:
        arquivo.nome_caso = None
    if msg['status']:
        arquivo.status = msg['status']
    else:
        arquivo.status = None
    if msg['fields']['imóvel']['nº da matrícula']:
        arquivo.nr_matricula_imovel = msg['fields']['imóvel']['nº da matrícula']
    else:
        arquivo.nr_matricula_imovel = None
    if msg['fields']['imóvel']['ofício']:
        arquivo.oficio_imovel = msg['fields']['imóvel']['ofício']
    else:
        arquivo.oficio_imovel = None
    if msg['fields']['imóvel']['comarca']:
        arquivo.comarca_imovel = msg['fields']['imóvel']['comarca']
    else:
        arquivo.comarca_imovel = None
    if msg['fields']['imóvel']['comarca']:
        arquivo.estado_imovel = msg['fields']['imóvel']['estado']
    else:
        arquivo.estado_imovel = None
    if msg['fields']['imóvel']['estado']:
        arquivo.dt_certificacao = msg['fields']['data emissão certidão']
    else:
        arquivo.dt_certificacao = None
    if msg['fields']['atual proprietário']['nome do proprietário']:
        arquivo.proprietario = msg['fields']['atual proprietário']['nome do proprietário']
    else:
        arquivo.proprietario = None
    if msg['fields']['atual proprietário']["% de propriedade"]:
        arquivo.cpf_cnpj_prop = msg['fields']['atual proprietário']["% de propriedade"]
    else:
        arquivo.cpf_cnpj_prop = None
    if msg['fields']['alienações']['id da alienação']:
        arquivo.id_alienacao = msg['fields']['alienações']['id da alienação']
    else:
        arquivo.id_alienacao = None
    if msg['fields']['alienações']['data da alienação']:
        arquivo.dt_alienacao  = msg['fields']['alienações']['data da alienação']
    else:
        arquivo.dt_alienacao = None
    if msg['fields']['alienações']['tipo de alienação']:
        arquivo.tipo_de_alienacao = msg['fields']['alienações']['tipo de alienação']
    else:
        arquivo.tipo_de_alienacao = None
    if msg['fields']['tipo do imóvel']:
        arquivo.tipo_de_imovel = msg['fields']['tipo do imóvel']
    else:
        arquivo.tipo_de_imovel = None
    if msg['fields']['rural / urbano']:
        arquivo.urbano_rural  = msg['fields']['rural / urbano']
    else:
        arquivo.urbano_rural = None
    if msg['fields']['endereço']['tipo de logradouro']:
        arquivo.tipo_logradouro = msg['fields']['endereço']['tipo de logradouro']
    else:
        arquivo.tipo_logradouro = ''
    if msg['fields']['endereço']['logradouro']:
        arquivo.logradouro = msg['fields']['endereço']['logradouro']
    else:
        arquivo.logradouro = None
    if msg['fields']['endereço']['cidade']:
        arquivo.cidade = msg['fields']['endereço']['cidade']
    else:
        arquivo.cidade = None
    if msg['fields']['endereço']['estado']:
        arquivo.estado = msg['fields']['endereço']['estado']
    else:
        arquivo.estado = None
    if msg['fields']['área terreno']:
        arquivo.area_terreno = msg['fields']['área terreno']
    else:
        arquivo.area_terreno = None
    if msg['fields']['área construída']:
        arquivo.devedor_avalista_prop = msg['fields']['área construída']
    else:
        arquivo.devedor_avalista_prop = None
    if msg['fields']['devedor/avalista é ou era proprietário']:
        arquivo.onus = msg['fields']['devedor/avalista é ou era proprietário']
    else:
        arquivo.onus = None
    if msg['fields']['documento relevante?']:
        arquivo.documento_relevante = msg['fields']['documento relevante?']
    else:
        arquivo.documento_relevante = None

    return arquivo

здесь я пытаюсь сохранить или обновить

def cria_ou_atualiza(msg):
    session = Session()
    try:
        arquivo = convert_obj(msg)
        object_msg = session.query(Arquivo).filter_by(id_doc_holmes=arquivo.id_doc_holmes).one()
        if object_msg:
            session.merge(object_msg)
            session.commit()
        session.add(arquivo)
        session.commit()
        return True
    except Exception as ex:
        print(ex)

    session.close()
...