Мне нужна помощь, пожалуйста.
Я получаю сообщение amazon sqs в цикле, и мне нужно сохранить эти сообщения в базе данных, поэтому для выполнения этой задачи я использую python и sqlalchemy, но застрял, когда пытался сохранить или обновить сообщение с помощью сообщение с использованием этого кода.
try:
arquivo = convert_obj(msg)
object_msg = session.query(Arquivo).filter_by(id_doc_holmes=arquivo.id_doc_holmes).one()
if object_msg:
session.merge(object_msg)
session.commit()
session.add(arquivo)
session.commit()
return True
except Exception as ex:
print(ex)
но если я удаляю obj_msg = session.query (Arquivo) .filter_by (id_doc_holmes = arquivo.id_doc_holmes) .one (), это работает, но я просто добавляю и не обновляю.
Есть идеи?
Таблица
from sqlalchemy import create_engine, Column, Integer, String, Boolean, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
Base = declarative_base()
class Arquivo(Base):
__tablename__ = "arquivo"
id = Column('id', Integer, primary_key=True)
id_doc_holmes = Column('id_doc_holmes', Integer)
link = Column('link', String)
nome_arquivo = Column('nome_arquivo', String)
classe = Column('classe', String)
id_carteira = Column('id_carteira', Integer)
id_caso = Column('id_caso', Integer)
nome_caso = Column('nome_caso', String)
status = Column('status', String)
nr_matricula = Column('nr_matricula', String)
oficio_imovel = Column('oficio_imovel', String)
comarca_imovel = Column('comarca_imovel', String)
estado_imovel = Column('estado_imovel', String)
dt_certificacao = Column('dt_certificacao', Date)
proprietario = Column('proprietario', String)
cpf_cnpj_prop = Column('cpf_cnpj_prop', String)
porcentagem_prop = Column('porcentagem_prop', String)
id_alienacao = Column('id_alienacao', String)
dt_alienacao = Column('dt_alienacao', Date)
tipo_de_alienacao = Column('tipo_de_alienacao', String)
tipo_de_imovel = Column('tipo_de_imovel', String)
urbano_rural = Column('urbano_rural', String)
tipo_logradouro = Column('tipo_logradouro', String)
logradouro = Column('logradouro', String)
cidade = Column('cidade', String)
estado = Column('estado', String)
area_terreno = Column('area_terreno', String)
devedor_avalista_prop = Column('devedor_avalista_prop', String)
onus = Column('onus', String)
documento_relevante = Column('documento_relevante', String)
engine = create_engine('postgresql://postgres:teste@localhost:5432/teste')
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
Цикл, который будет получать сообщения
def get_from_queue():
try:
arquivo = Processador()
for message in queue.receive_messages(VisibilityTimeout=1, MaxNumberOfMessages=10, WaitTimeSeconds=3):
sucesso = arquivo.mensagem_parser(message.body)
if sucesso:
print('deleta mensagem')
# message.delete()
else:
print('verificar mensagem')
except Exception as ex:
print(ex)
в то время как True:
пытаться:
get_from_queue ()
time.sleep (5)
кроме Исключения как ex:
печать (ех)
Класс, который будет обрабатывать сообщение
class Processador:
@staticmethod
def mensagem_parser(message):
msg_canonico = None
try:
msg = message.encode('ascii').decode('utf-8')
except:
return {"message": "Não foi possivel decodificar"}
try:
msg_queue = json.loads(msg)
cria_ou_atualiza(msg_queue['Message'])
return True
except Exception as ex:
msg_canonico = ex
return msg_canonico
эту часть я сделал, чтобы упростить сохранение объекта
def convert_obj(msg, arquivo=None):
arquivo = Arquivo() if arquivo is None else arquivo
if msg['id_documento_holmes']:
arquivo.id_doc_holmes = msg['id_documento_holmes']
else:
arquivo.id_doc_holmes = None
if msg['link']:
arquivo.link = msg['link']
else:
arquivo.link = None
if msg['nome']:
arquivo.nome_arquivo = msg['nome']
else:
arquivo.nome_arquivo = None
if msg['classe']:
arquivo.classe = msg['classe']
else:
arquivo.classe = None
if msg['id_carteira']:
arquivo.id_carteira = msg['id_carteira']
else:
arquivo.id_carteira = None
if msg['id_caso']:
arquivo.id_caso = msg['id_caso']
else:
arquivo.id_caso =None
if msg['nome_caso']:
arquivo.nome_caso = msg['nome_caso']
else:
arquivo.nome_caso = None
if msg['status']:
arquivo.status = msg['status']
else:
arquivo.status = None
if msg['fields']['imóvel']['nº da matrícula']:
arquivo.nr_matricula_imovel = msg['fields']['imóvel']['nº da matrícula']
else:
arquivo.nr_matricula_imovel = None
if msg['fields']['imóvel']['ofício']:
arquivo.oficio_imovel = msg['fields']['imóvel']['ofício']
else:
arquivo.oficio_imovel = None
if msg['fields']['imóvel']['comarca']:
arquivo.comarca_imovel = msg['fields']['imóvel']['comarca']
else:
arquivo.comarca_imovel = None
if msg['fields']['imóvel']['comarca']:
arquivo.estado_imovel = msg['fields']['imóvel']['estado']
else:
arquivo.estado_imovel = None
if msg['fields']['imóvel']['estado']:
arquivo.dt_certificacao = msg['fields']['data emissão certidão']
else:
arquivo.dt_certificacao = None
if msg['fields']['atual proprietário']['nome do proprietário']:
arquivo.proprietario = msg['fields']['atual proprietário']['nome do proprietário']
else:
arquivo.proprietario = None
if msg['fields']['atual proprietário']["% de propriedade"]:
arquivo.cpf_cnpj_prop = msg['fields']['atual proprietário']["% de propriedade"]
else:
arquivo.cpf_cnpj_prop = None
if msg['fields']['alienações']['id da alienação']:
arquivo.id_alienacao = msg['fields']['alienações']['id da alienação']
else:
arquivo.id_alienacao = None
if msg['fields']['alienações']['data da alienação']:
arquivo.dt_alienacao = msg['fields']['alienações']['data da alienação']
else:
arquivo.dt_alienacao = None
if msg['fields']['alienações']['tipo de alienação']:
arquivo.tipo_de_alienacao = msg['fields']['alienações']['tipo de alienação']
else:
arquivo.tipo_de_alienacao = None
if msg['fields']['tipo do imóvel']:
arquivo.tipo_de_imovel = msg['fields']['tipo do imóvel']
else:
arquivo.tipo_de_imovel = None
if msg['fields']['rural / urbano']:
arquivo.urbano_rural = msg['fields']['rural / urbano']
else:
arquivo.urbano_rural = None
if msg['fields']['endereço']['tipo de logradouro']:
arquivo.tipo_logradouro = msg['fields']['endereço']['tipo de logradouro']
else:
arquivo.tipo_logradouro = ''
if msg['fields']['endereço']['logradouro']:
arquivo.logradouro = msg['fields']['endereço']['logradouro']
else:
arquivo.logradouro = None
if msg['fields']['endereço']['cidade']:
arquivo.cidade = msg['fields']['endereço']['cidade']
else:
arquivo.cidade = None
if msg['fields']['endereço']['estado']:
arquivo.estado = msg['fields']['endereço']['estado']
else:
arquivo.estado = None
if msg['fields']['área terreno']:
arquivo.area_terreno = msg['fields']['área terreno']
else:
arquivo.area_terreno = None
if msg['fields']['área construída']:
arquivo.devedor_avalista_prop = msg['fields']['área construída']
else:
arquivo.devedor_avalista_prop = None
if msg['fields']['devedor/avalista é ou era proprietário']:
arquivo.onus = msg['fields']['devedor/avalista é ou era proprietário']
else:
arquivo.onus = None
if msg['fields']['documento relevante?']:
arquivo.documento_relevante = msg['fields']['documento relevante?']
else:
arquivo.documento_relevante = None
return arquivo
здесь я пытаюсь сохранить или обновить
def cria_ou_atualiza(msg):
session = Session()
try:
arquivo = convert_obj(msg)
object_msg = session.query(Arquivo).filter_by(id_doc_holmes=arquivo.id_doc_holmes).one()
if object_msg:
session.merge(object_msg)
session.commit()
session.add(arquivo)
session.commit()
return True
except Exception as ex:
print(ex)
session.close()