Самый простой подход, который я нашел для выполнения пакета запросов, - передать сеанс Grakn каждому потоку в ThreadPool
. В каждом потоке вы можете управлять транзакциями и, конечно, делать более сложные логи c:
from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
def batch_job(session, batch):
tx = session.transaction().write()
for query in batch:
tx.query(query)
tx.commit()
client = GraknClient(uri="localhost:48555")
session = client.session(keyspace="grakn")
pool = ThreadPool(num_threads)
pool.map(partial(batch_job, session), batches)
pool.close()
pool.join()
session.close()
client.close()
. В качестве более правильного примера я использовал такую структуру для параллелизации пакетов insert
выписки из двух файлов:
import time
from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
files = [
{
"file_path": f"/location/to/your/file.gql",
},
{
"file_path": f"/location/to/your/file2.gql",
}
]
KEYSPACE = "grakn"
URI = "localhost:48555"
BATCH_SIZE = 10
NUM_BATCHES = 1000
# Entry point where migration starts
def migrate():
start_time = time.time()
for file in files:
print('==================================================')
print(f'Loading from {file["file_path"]}')
print('==================================================')
batches = split_file_into_batches(file["file_path"], BATCH_SIZE)
multi_thread_batches(batches, KEYSPACE, URI, num_threads=16)
elapsed = time.time() - start_time
print(f'Time elapsed {elapsed:.1f} seconds')
elapsed = time.time() - start_time
print(f'Time elapsed {elapsed:.1f} seconds')
def multi_thread_batches(batches, keyspace, uri, num_threads=None):
client = GraknClient(uri=uri)
session = client.session(keyspace)
pool = ThreadPool(num_threads)
pool.map(partial(migrate_batch, session), batches)
pool.close()
pool.join()
session.close()
client.close()
def migrate_batch(session, batch):
tx = session.transaction().write()
for query in batch:
tx.query(query)
tx.commit()
def split_file_into_batches(file_path, batch_size):
file = open(file_path, "r") # Here we are assuming you have 1 Graql query per line!
lines = file.readlines()
batch = []
for index, item in enumerate(lines):
batch.append(item)
if index % batch_size == 0:
yield batch
batch = []
yield batch
if __name__ == "__main__":
migrate()