Сначала небольшое слово о том, как я генерировал объекты событий. Чтобы обеспечить надежную интеграцию с другими java приложениями, я начал с событий JAXB java, используемых в решении. Я преобразовал их в схемы с использованием schemagen, а затем преобразовал сгенерированные схемы в python классы, используя pyXB .
В результате у меня есть модуль в python с эквивалентом классы JAXB в решении java. В приведенном ниже коде я предполагаю, что my_events
- это модуль, сгенерированный с pyXB.
import cx_Oracle
import my_events
ORACLE_QUEUE_NAME = "Q_MIG"
JMS_TEXT_MESSAGE_TYPE = "SYS.AQ$_JMS_TEXT_MESSAGE"
JMS_HEADER_TYPE = "SYS.AQ$_JMS_HEADER"
dsn = "(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP) (HOST=localhost)(PORT=1521))) " \
"(CONNECT_DATA = (SERVER = DEDICATED) (SID = MYSID)))"
with cx_Oracle.connect("MYUSER", "MYPASSWORD", dsn, encoding="UTF-8", nencoding="UTF-8") as con:
event = my_events.my_custom_event()
q = con.queue(ORACLE_QUEUE_NAME, con.gettype(JMS_TEXT_MESSAGE_TYPE))
payload = con.gettype(JMS_TEXT_MESSAGE_TYPE).newobject()
payload.HEADER = con.gettype(JMS_HEADER_TYPE).newobject()
# HEADER.TYPE must have the java class name
# have not checked yet how to get it from the pyxb class
payload.HEADER.TYPE = "MyCustomEvent"
payload.TEXT_VC = event.toxml("utf-8")
payload.TEXT_LEN = len(payload.TEXT_VC)
q.enqOne(con.msgproperties(payload=payload))
con.commit()
print("message enqueued.")
# dequeue the messages
print("\nDequeuing messages...")
queue = con.queue(ORACLE_QUEUE_NAME, con.gettype(JMS_TEXT_MESSAGE_TYPE))
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
while True:
message = queue.deqOne()
if not message:
break
print(message.payload.TEXT_VC)
event = my_events.CreateFromDocument(message.payload.TEXT_VC)
con.commit()
print("\nDone.")
* CreateFromDocument
- это служебный метод, сгенерированный pyXB для создания новых объектов из сериализованных представлений.