Проблемы при слиянии dask-данных - PullRequest
0 голосов
/ 02 июня 2019

У меня есть несколько файлов .pcap, данные которых я хочу записать в один большой кадр данных dask. В настоящее время инициализирует фрейм данных dask, используя данные из первого файла. Затем он должен обработать остальные файлы pcap и добавить к этому фрейму данных dask, используя merge / concat. Однако, когда я проверяю количество строк в объединенном фрейме данных dask, оно не увеличивается. Что происходит?

Я также не уверен, что использую правильный подход для моего варианта использования. Я пытаюсь преобразовать весь свой набор данных в гигантский фрейм данных dask и записать его в файл h5. У моего компьютера недостаточно памяти для загрузки всего набора данных, поэтому я использую dask. Идея состоит в том, чтобы загрузить фрейм данных dask, содержащий весь набор данных, чтобы я мог выполнять операции со всем набором данных. Я новичок в dask, и я прочитал некоторые из документации, но я все еще не уверен, как dasks обрабатывает загрузку данных с диска вместо памяти. Я также не совсем понимаю, как работают разделы в сумерках. В частности, я также не уверен, чем отличается размер фрагмента от разделов, поэтому у меня возникают проблемы с правильным разделением этого фрейма данных. Любые советы и рекомендации будут полезны.

Как уже было сказано, я прочитал основные части документации.

Я пытался использовать dd.merge (dask_df, panda_df), как показано в документации. Когда я инициализирую кадр данных dask, он начинается с 6 строк. Когда я использую слияние, количество строк уменьшается до 1

Я также пытался использовать concat. Опять же, у меня есть счет 6 строк во время инициализации. Однако после операций concat количество строк по-прежнему остается на уровне 6. Я ожидаю, что число строк увеличится.

Вот функция инициализации

import os
import sys
import h5py
import pandas as pd
import dask.dataframe as dd
import gc
import pprint
from scapy.all import *
flags = {
        'R': 0,
        'A': 1,
        'S': 2,
        'DF':3,
        'FA':4,
        'SA':5,
        'RA':6,
        'PA':7,
        'FPA':8
    }

def initialize(file):
    global flags
    data = {
        'time_delta': [0],
        'ttl':[],
        'len':[],
        'dataofs':[],
        'window':[],
        'seq_delta':[0],
        'ack_delta':[0],
        'flags':[]
    }
    scap = sniff(offline=file,filter='tcp and ip')
    for packet in range(0,len(scap)):
        pkt = scap[packet]
        flag = flags[str(pkt['TCP'].flags)]
        data['ttl'].append(pkt['IP'].ttl)
        data['len'].append(pkt['IP'].len)
        data['dataofs'].append(pkt['TCP'].dataofs)
        data['window'].append(pkt['TCP'].window)
        data['flags'].append(flag)
        if packet != 0:
            lst_pkt = scap[packet-1]
            data['time_delta'].append(pkt.time - lst_pkt.time)
            data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
            data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)

    panda = pd.DataFrame(data=data)
    panda['ttl']=panda['ttl'].astype('float16')
    panda['flags']=panda['flags'].astype('float16')
    panda['dataofs']=panda['dataofs'].astype('float16')
    panda['len']=panda['len'].astype('float16')
    panda['window']=panda['window'].astype('float32')
    panda['seq_delta']=panda['seq_delta'].astype('float32')
    panda['ack_delta']=panda['ack_delta'].astype('float32')

    df =dd.from_pandas(panda,npartitions=6)

    gc.collect()
    return df

Вот функция конкатенации

def process(file):
    global flags
    global df
    data = {
        'time_delta': [0],
        'ttl':[],
        'len':[],
        'dataofs':[],
        'window':[],
        'seq_delta':[0],
        'ack_delta':[0],
        'flags':[]
    }
    scap = sniff(offline=file,filter='tcp and ip')
    for packet in range(0,len(scap)):
        pkt = scap[packet]
        flag = flags[str(pkt['TCP'].flags)]
        data['ttl'].append(pkt['IP'].ttl)
        data['len'].append(pkt['IP'].len)
        data['dataofs'].append(pkt['TCP'].dataofs)
        data['window'].append(pkt['TCP'].window)
        data['flags'].append(flag)
        if packet != 0:
            lst_pkt = scap[packet-1]
            data['time_delta'].append(pkt.time - lst_pkt.time)
            data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
            data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)

    panda = pd.DataFrame(data=data)
    panda['ttl']=panda['ttl'].astype('float16')
    panda['flags']=panda['flags'].astype('float16')
    panda['dataofs']=panda['dataofs'].astype('float16')
    panda['len']=panda['len'].astype('float16')
    panda['window']=panda['window'].astype('float32')
    panda['seq_delta']=panda['seq_delta'].astype('float32')
    panda['ack_delta']=panda['ack_delta'].astype('float32')

    #merge version dd.merge(df, panda)
    dd.concat([df,dd.from_pandas(panda,npartitions=6)])

    gc.collect()

А вот и основная программа

directory = 'dev/streams/'
files = os.listdir(directory)
df = initialize(directory+files[0])
files.remove(files[0])
for file in files:
    process(directory+file)
print(len(df))

с использованием слияния:

print(len(df)) = 1

используя concat:

print(len(df))=6

ожидается

print(len(df)) > 10,000

1 Ответ

1 голос
/ 02 июня 2019

Попробуйте явным образом вернуть df в результате вашего конкатного запроса:

df = dd.concat([df, dd.from_pandas(panda,npartitions=6)])

И не дублируйте точно такие же блоки кода, а заключите их в другую функцию:

def process_panda(file_wpath, flags):
    data = {
    [...]
    panda['ack_delta']=panda['ack_delta'].astype('float32')
    return panda

Тогда вам просто нужно проверить, является ли файл для обработки первым, поэтому ваш основной код становится:

import os
import sys
import h5py
import pandas as pd
import dask.dataframe as dd
import gc
import pprint
from scapy.all import *

flags = {
        'R': 0,
        'A': 1,
        'S': 2,
        'DF':3,
        'FA':4,
        'SA':5,
        'RA':6,
        'PA':7,
        'FPA':8
    }

directory = 'dev/streams/'
files = os.listdir(directory)

for file in files:
    file_wpath = os.path.join(directory, file)
    panda = process_panda(file_wpath, flags)
    if file == files[0]:
        df = dd.from_pandas(panda, npartitions=6)
    else:
        df = dd.concat([df, dd.from_pandas(panda, npartitions=6)])        
    gc.collect()

print(len(df))
...