У меня есть код для подключения к базе данных Oracle (11g), которая необходима для одновременных вставок. В то же время мне также нужно получить идентификатор последней добавленной строки. Для этого я сделал следующее:
import cx_Oracle
import numpy as np
from typing import *
from multiprocessing import Process
from threading import Thread
import os
from itertools import zip_longest
Id = str
ip = "localhost"
user = "user"
userpwd = "pwd"
dsn_tns = cx_Oracle.makedsn(ip, "1521", service_name='xe')
oracle_pool = cx_Oracle.SessionPool(user, userpwd, dsn_tns, min=5, max=5, increment=1, threaded=True, encoding="UTF-8")
class OracleConn:
def __init__(self, pool=oracle_pool):
self.pool = pool
self.conn = self.pool.acquire()
self.cur = self.conn.cursor()
def add_vector(self, vector: np.ndarray, subgroup: Id, iddl: Id, annoy_status: int):
vector_bin_string = vector.tostring()
self.cur.execute(
""" insert into VECTORS (vector, subgroup, iddl, annoy_status) values (:1, :2, :3, :4) """,
(vector_bin_string, subgroup, iddl, annoy_status)
)
def get_lastrowid(self) -> int:
self.cur.execute("""SELECT ANN_SEQ.currval FROM DUAL""")
return self.cur.fetchone()[0]
def close(self) -> None:
self.conn.commit()
self.pool.release(self.conn)
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
self.conn.commit()
self.pool.release(self.conn)
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def add_vectors(ids, vecs):
with OracleConn() as conn:
for i, idx in enumerate(ids):
print(f"{os.getpid()}: ", conn.add_vector(vecs[i], idx, idx, 0))
print(f"{os.getpid()}: ", conn.get_lastrowid())
print(f"=============Finished Process {os.getpid()}")
n = 200
p = 4
start = 7000031 # Just some random ID to work with
numbers = list(range(start, start + p*n))
vecs = np.random.randn(n, 32)
values: List[List[int]] = list(grouper(numbers, n)) # This is a list of equal size lists of indices
p1 = Process(target=add_vectors, args=(values[0], vecs))
p2 = Process(target=add_vectors, args=(values[1], vecs))
p3 = Process(target=add_vectors, args=(values[2], vecs))
p4 = Process(target=add_vectors, args=(values[3], vecs))
processes = [p1, p2, p3, p4]
for pr in processes:
pr.start()
for pr in processes:
pr.join()
Однако этот код может вставить только несколько строк, а затем зависает (продолжает работать) с некоторыми исключениями:
File "<ipython-input-52-9d4c539598e9>", line 323, in get_lastrowid
return self.cur.fetchone()[0]
cx_Oracle.DatabaseError: ORA-01002: fetch out of sequence
И в другом прогоне:
File "<ipython-input-4-94343739bd60>", line 30, in get_lastrowid
self.cur.execute("""SELECT ANN_SEQ.currval FROM DUAL""")
cx_Oracle.DatabaseError: ORA-00900: invalid SQL statement
С другой стороны, если я изменяю Process для класса from threading import Thread
, он работает отлично. Однако, поскольку мое приложение связано с процессором, мне нужно использовать многопроцессорность, чтобы улучшить процесс обработки. Есть ли способ заставить его работать с многопроцессорностью ?
Примечание 1: Мне нужно было использовать последовательность, потому что Oracle 11g не поддерживает Identity или что-то подобное.
Примечание 2: SQL код для создания базы данных:
CREATE TABLE VECTORS (
id NUMBER NOT NULL,
vector BLOB NOT NULL,
subgroup INTEGER NOT NULL,
iddl INTEGER NOT NULL,
annoy_status SMALLINT NOT NULL,
PRIMARY KEY (id),
UNIQUE(subgroup, iddl)
);
CREATE INDEX VECTORS_ANNOY_STATUS_IDX ON VECTORS (ANNOY_STATUS);
CREATE SEQUENCE ANN_SEQ START WITH 1;
CREATE OR REPLACE TRIGGER ANN_ID_TRIGG
BEFORE INSERT ON VECTORS
FOR EACH ROW
BEGIN
:new.id := ANN_SEQ.NEXTVAL;
END;
Примечание 3: Python 3.7.7
Примечание 4: cx_ Oracle 7.3.0