RabbitMQ с Python: подтверждение и отклонение сообщений по отдельности при использовании executemany () - PullRequest
0 голосов
/ 01 января 2019

Я планирую получать более 5 миллионов сообщений в день с rabbitmq, и у меня есть только контроль над потребителем.Я хотел использовать executemany для хранения сообщений в пакетном режиме, но иногда сообщение может быть перехвачено в исключении, и я хочу, чтобы только эти отдельные сообщения были отклонены и просто подтвердили остальные.Как я могу выборочно отклонить и подтвердить после executemany ()?

insert_sql = '''MERGE INTO table1
                        USING (
                        SELECT :col1 COL1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9') COL2, 
                        :col3 COL3, :col4 COL4
                        FROM DUAL
                        ) source_table
                        ON (table1.COL1 = source_table.COL1 AND table1.COL2 = source_table.COL2 
                        AND table1.COL3 = source_table.COL3 AND table1.COL4 = source_table.COL4) 
                        WHEN NOT MATCHED THEN 
                        INSERT (COL1, COL2, COL3, COL4, COL5, COL6, COL8)
                        VALUES (:col1, TO_TIMESTAMP(:col2, 'YYYY-MM-DD HH24:MI:SS,FF9'), :col3, 
                        :col4, :col5, xmltype(:col6), xmltype(:col7))'''

def callback(ch, method, properties, body):
    # some code to clean up the body, snipping it off
    # ...
    if decompressed_j == None:
        _reject(delivery_tag=method.delivery_tag, requeue=False)
    elif isinstance(decompressed_j, dict):
        ch.basic_ack(delivery_tag=method.delivery_tag)
        global acx_list
        acx = ACX_Obj(decompressed_j, raw_jsonArray)
        if acx.response.startswith('<someignorabletag>'):
            ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
        else:
            acx_list.append(acx.transactionValues)
        if len(acx_list) % 20 == 0:
            try:
                ora_exe(acx_list, ch, method)
            except AttributeError as att_e:
                print(att_e)
                ch.basic_reject(
                    delivery_tag=method.delivery_tag, requeue=False)
                raise att_e
            acx_list = []
    else:
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

def ora_exe(acx_list, ch, method):
    conn = cx_Oracle.connect('dummy1/dummyenv', threaded=True)
    c = conn.cursor()
    try:
        c.executemany(insert_sql, acx_list)
    except cx_Oracle.IntegrityError as integrity_err:
        print('integrity err')
        conn.rollback()
    except cx_Oracle.DatabaseError as db_err:
        print(db_err)
        conn.rollback()
    else:
        conn.commit()
    finally:
        c.close()
        conn.close()
...