Ошибка при использовании cx_ Oracle SessionPool с многопроцессорностью - PullRequest
0 голосов
/ 15 апреля 2020

У меня есть код для подключения к базе данных Oracle (11g), которая необходима для одновременных вставок. В то же время мне также нужно получить идентификатор последней добавленной строки. Для этого я сделал следующее:

import cx_Oracle
import numpy as np
from typing import *
from multiprocessing import Process
from threading import Thread
import os
from itertools import zip_longest

Id = str
ip = "localhost"
user = "user"
userpwd = "pwd"
dsn_tns = cx_Oracle.makedsn(ip, "1521", service_name='xe')
oracle_pool = cx_Oracle.SessionPool(user, userpwd, dsn_tns, min=5, max=5, increment=1, threaded=True, encoding="UTF-8")

class OracleConn:
    def __init__(self, pool=oracle_pool):
        self.pool = pool
        self.conn = self.pool.acquire()
        self.cur = self.conn.cursor()

    def add_vector(self, vector: np.ndarray, subgroup: Id, iddl: Id, annoy_status: int):
        vector_bin_string = vector.tostring()
        self.cur.execute(
            """ insert into VECTORS (vector, subgroup, iddl, annoy_status) values (:1, :2, :3, :4) """,
                (vector_bin_string, subgroup, iddl, annoy_status)
        )

    def get_lastrowid(self) -> int:
        self.cur.execute("""SELECT ANN_SEQ.currval FROM DUAL""")
        return self.cur.fetchone()[0]

    def close(self) -> None:
        self.conn.commit()
        self.pool.release(self.conn)

    def __enter__(self):
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self.conn.commit()
        self.pool.release(self.conn)

def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def add_vectors(ids, vecs):
    with OracleConn() as conn:
        for i, idx in enumerate(ids):
            print(f"{os.getpid()}: ", conn.add_vector(vecs[i], idx, idx, 0))
            print(f"{os.getpid()}: ", conn.get_lastrowid())

    print(f"=============Finished Process {os.getpid()}")

n = 200
p = 4
start = 7000031 # Just some random ID to work with
numbers = list(range(start, start + p*n))
vecs = np.random.randn(n, 32)
values: List[List[int]] = list(grouper(numbers, n)) # This is a list of equal size lists of indices
p1 = Process(target=add_vectors, args=(values[0], vecs))
p2 = Process(target=add_vectors, args=(values[1], vecs))
p3 = Process(target=add_vectors, args=(values[2], vecs))
p4 = Process(target=add_vectors, args=(values[3], vecs))
processes = [p1, p2, p3, p4]
for pr in processes:
    pr.start()

for pr in processes:
    pr.join()

Однако этот код может вставить только несколько строк, а затем зависает (продолжает работать) с некоторыми исключениями:

File "<ipython-input-52-9d4c539598e9>", line 323, in get_lastrowid return self.cur.fetchone()[0] cx_Oracle.DatabaseError: ORA-01002: fetch out of sequence

И в другом прогоне:

File "<ipython-input-4-94343739bd60>", line 30, in get_lastrowid self.cur.execute("""SELECT ANN_SEQ.currval FROM DUAL""") cx_Oracle.DatabaseError: ORA-00900: invalid SQL statement

С другой стороны, если я изменяю Process для класса from threading import Thread, он работает отлично. Однако, поскольку мое приложение связано с процессором, мне нужно использовать многопроцессорность, чтобы улучшить процесс обработки. Есть ли способ заставить его работать с многопроцессорностью ?

Примечание 1: Мне нужно было использовать последовательность, потому что Oracle 11g не поддерживает Identity или что-то подобное.

Примечание 2: SQL код для создания базы данных:

CREATE TABLE VECTORS (
    id NUMBER NOT NULL, 
    vector BLOB NOT NULL,
    subgroup INTEGER NOT NULL,
    iddl INTEGER NOT NULL,
    annoy_status SMALLINT NOT NULL,
    PRIMARY KEY (id),
    UNIQUE(subgroup, iddl)
);
CREATE INDEX VECTORS_ANNOY_STATUS_IDX ON VECTORS (ANNOY_STATUS);
CREATE SEQUENCE ANN_SEQ START WITH 1;
CREATE OR REPLACE TRIGGER ANN_ID_TRIGG 
BEFORE INSERT ON VECTORS 
FOR EACH ROW
BEGIN
    :new.id := ANN_SEQ.NEXTVAL;
END;

Примечание 3: Python 3.7.7

Примечание 4: cx_ Oracle 7.3.0

1 Ответ

0 голосов
/ 18 апреля 2020

Хорошо, я должен воспроизвести ваш случай, и я лучше понимаю, что происходит. Дело в том, что пул соединений cx_ Oracle не может работать с многопроцессорной средой. Каждый процесс в этом случае имеет свою собственную память, и поэтому пул соединений, созданный родительским процессом, не будет работать должным образом, и он наверняка будет перекрываться. Я думаю, что лучшим решением для вас является использование резидентного пула соединений с базой данных. Вот ссылка на do c, и она проста в использовании: add: объединена в строку подключения, которая у вас есть (если ezconnect, или добавьте (server = pooled) к записи tns). После настройки вы удаляете создание пула из кода python и вместо того, чтобы получать соединение из пула, вы открываете соединение с сервером как обычно. Это быстрее, чем обычное соединение, так как нет процесса, который порождается, а затем завершается.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...