Как провести распараллеливание в базах данных ключ-значение? - PullRequest
0 голосов
/ 12 января 2019

Мое намерение состоит в том, чтобы сделать версию больших файлов CSV, и, следовательно, я использую базы данных значения ключа, где ключ будет столбцом (столбцами) из полной строки, а значение будет самой строкой. Например:

Name, Age, Roll.No
Aviral, 22, 1
Apoorv, 19, 2

Если я укажу Roll no в качестве ключа, я собираюсь иметь ключ в DB в качестве rollno (вероятно, его хэш) и значение в качестве полной строки: Aviral, 22, 1

Я завершил вышеописанную реализацию, но для работы с большими CSV-файлами (даже 20 ГБ с 534M строк) скорость слишком низкая. Я реализую Dask, но это медленнее, чем обычные последовательные потоки панд. Я сомневаюсь, как я могу иметь параллельные вставки в базу данных ключ-значение?

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.key)] = self.row

    def index_row(self, key, row):
        self.row = row
        self.key = key
        DB.process(self.dbproc, "index.kch")

# destination = "/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv"
destination = "10M_rows.csv"
df = dd.read_csv(destination)
df_for_file_attributes = pd.read_csv(destination, nrows=2)
column_list = list(df_for_file_attributes)

# df = df.compute(scheduler='processes')     # convert to pandas

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()

# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
    #for row in d.itertuples(index=False):
    # print(row)
    print("a block called!")
    d = d.to_dict(orient='records')
    for row in d:
        key = str(row["0"])
        row = json.dumps(row, default=str)
        ob.index_row(key, row)

print("Calling compute!")
dask.compute(*[print_a_block(d) for d in df.to_delayed()])
print(datetime.utcnow() - start_time)

1 Ответ

0 голосов
/ 13 января 2019

Kyotocabinet не позволяет распараллеливать вставки (https://fallabs.com/kyotocabinet/spex.html), Каждый Writer будет блокировать до завершения другого Writer, поэтому вы не сможете распараллелить вставки в kyotocabinet, но Redis разрешит такую ​​вставку, чтобы оптимизировать дальнейшее использование конвейеризации Redis (https://redis.io/topics/pipelining), который будет пакетировать ваши данные и значительно уменьшать RTT при загрузке огромных данных.

Причина, по которой ваша задача выполняется медленнее, чем последовательная обработка, связана с последовательным управлением многопроцессорной записью БД.

...