Как лучше распараллелить запросы grakn с Python? - PullRequest
2 голосов
/ 20 января 2020

Я использую Windows 10, Python 3.7 и 6-ядерный процессор. Одна нить Python на моей машине отправляет 1000 вставок в секунду в grakn. Я хотел бы распараллелить мой код, чтобы вставить и сопоставить еще быстрее. Как люди это делают?

Мой единственный опыт работы с parellelization связан с другим проектом, где я отправляю пользовательскую функцию распределенному клиенту для создания тысяч задач. Прямо сейчас этот же подход не срабатывает всякий раз, когда пользовательская функция получает или генерирует объект / дескриптор транзакции grakn. Я получаю сообщения об ошибках типа:

Traceback (most recent call last):
  File "C:\Users\dvyd\.conda\envs\activefiction\lib\site-packages\distributed\protocol\pickle.py", line 41, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
...
  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

Я никогда не использовал многопроцессорный модуль Python напрямую. Что другие люди делают, чтобы распараллелить свои запросы?

1 Ответ

2 голосов
/ 20 января 2020

Самый простой подход, который я нашел для выполнения пакета запросов, - передать сеанс Grakn каждому потоку в ThreadPool. В каждом потоке вы можете управлять транзакциями и, конечно, делать более сложные логи c:

from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def batch_job(session, batch):
    tx = session.transaction().write()

    for query in batch:
        tx.query(query)
    tx.commit()

client = GraknClient(uri="localhost:48555")
session = client.session(keyspace="grakn")

pool = ThreadPool(num_threads)
pool.map(partial(batch_job, session), batches)
pool.close()
pool.join()

session.close()
client.close()

. В качестве более правильного примера я использовал такую ​​структуру для параллелизации пакетов insert выписки из двух файлов:

import time
from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

files = [
    {
        "file_path": f"/location/to/your/file.gql",
    },
    {
        "file_path": f"/location/to/your/file2.gql",
    }
]

KEYSPACE = "grakn"
URI = "localhost:48555"
BATCH_SIZE = 10
NUM_BATCHES = 1000


# ​Entry point where migration starts
def migrate():
    start_time = time.time()

    for file in files:
        print('==================================================')
        print(f'Loading from {file["file_path"]}')
        print('==================================================')

        batches = split_file_into_batches(file["file_path"], BATCH_SIZE)

        multi_thread_batches(batches, KEYSPACE, URI, num_threads=16)

        elapsed = time.time() - start_time
        print(f'Time elapsed {elapsed:.1f} seconds')

    elapsed = time.time() - start_time
    print(f'Time elapsed {elapsed:.1f} seconds')


def multi_thread_batches(batches, keyspace, uri, num_threads=None):
    client = GraknClient(uri=uri)
    session = client.session(keyspace)

    pool = ThreadPool(num_threads)
    pool.map(partial(migrate_batch, session), batches)

    pool.close()
    pool.join()

    session.close()
    client.close()


def migrate_batch(session, batch):
    tx = session.transaction().write()

    for query in batch:
        tx.query(query)
    tx.commit()


def split_file_into_batches(file_path, batch_size):
    file = open(file_path, "r") # Here we are assuming you have 1 Graql query per line!
    lines = file.readlines()
    batch = []

    for index, item in enumerate(lines):
        batch.append(item)
        if index % batch_size == 0:
            yield batch
            batch = []
    yield batch


if __name__ == "__main__":
    migrate()
...