Как читать CSV и обрабатывать строки с помощью DASK? - PullRequest
0 голосов
/ 11 января 2019

Я хочу прочитать 28-гигабайтный CSV-файл и распечатать его содержимое. Тем не менее, мой код:

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.row)] = self.row

    def index_row(self, row):
        self.row = row
        DB.process(self.dbproc, "index.kch")

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes')     # convert to pandas
df = df.to_dict(orient='records')
for row in df:
    ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)

не работает. Когда я запускаю команду htop, я вижу, как работает dask, но ничего не выводится. Также не создан файл index.kch. Я ругаю то же самое без использования Dask, и он работал нормально; Я использовал Pandas streaming api (chunksize), но он был слишком медленным и, следовательно, я хочу использовать dask.

1 Ответ

0 голосов
/ 11 января 2019
df = df.compute(scheduler='processes')     # convert to pandas

Не делай этого!

Вы загружаете части в отдельных процессах, а затем переносите все данные, которые должны быть сшиты, в один фрейм данных в основном процессе. Это только увеличит накладные расходы на вашу обработку и создаст копии данных в памяти.

Если все, что вы хотите сделать, - это (по какой-то причине) распечатать каждую строку на консоли, то вам было бы прекрасно, если бы вы использовали Pandas streaming CSV reader (pd.read_csv(chunksize=..)). Вы могли бы запустить его, используя чанки Даск, и, возможно, получить ускорение, если вы печатаете на рабочих, которые читают данные:

df = dd.read_csv(..)

# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
    for row in df:
        print(row)

dask.compute(*[print_a_block(d) for d in df.to_delayed()])

Обратите внимание, что for row in df на самом деле возвращает вам столбцы, может быть, вы хотели, чтобы это были строки, или, возможно, вы действительно хотели как-то обработать ваши данные.

...