У меня есть несколько файлов .pcap, данные которых я хочу записать в один большой кадр данных dask. В настоящее время инициализирует фрейм данных dask, используя данные из первого файла. Затем он должен обработать остальные файлы pcap и добавить к этому фрейму данных dask, используя merge / concat. Однако, когда я проверяю количество строк в объединенном фрейме данных dask, оно не увеличивается. Что происходит?
Я также не уверен, что использую правильный подход для моего варианта использования. Я пытаюсь преобразовать весь свой набор данных в гигантский фрейм данных dask и записать его в файл h5. У моего компьютера недостаточно памяти для загрузки всего набора данных, поэтому я использую dask. Идея состоит в том, чтобы загрузить фрейм данных dask, содержащий весь набор данных, чтобы я мог выполнять операции со всем набором данных. Я новичок в dask, и я прочитал некоторые из документации, но я все еще не уверен, как dasks обрабатывает загрузку данных с диска вместо памяти. Я также не совсем понимаю, как работают разделы в сумерках. В частности, я также не уверен, чем отличается размер фрагмента от разделов, поэтому у меня возникают проблемы с правильным разделением этого фрейма данных. Любые советы и рекомендации будут полезны.
Как уже было сказано, я прочитал основные части документации.
Я пытался использовать dd.merge (dask_df, panda_df), как показано в документации. Когда я инициализирую кадр данных dask, он начинается с 6 строк. Когда я использую слияние, количество строк уменьшается до 1
Я также пытался использовать concat. Опять же, у меня есть счет 6 строк во время инициализации. Однако после операций concat количество строк по-прежнему остается на уровне 6. Я ожидаю, что число строк увеличится.
Вот функция инициализации
import os
import sys
import h5py
import pandas as pd
import dask.dataframe as dd
import gc
import pprint
from scapy.all import *
flags = {
'R': 0,
'A': 1,
'S': 2,
'DF':3,
'FA':4,
'SA':5,
'RA':6,
'PA':7,
'FPA':8
}
def initialize(file):
global flags
data = {
'time_delta': [0],
'ttl':[],
'len':[],
'dataofs':[],
'window':[],
'seq_delta':[0],
'ack_delta':[0],
'flags':[]
}
scap = sniff(offline=file,filter='tcp and ip')
for packet in range(0,len(scap)):
pkt = scap[packet]
flag = flags[str(pkt['TCP'].flags)]
data['ttl'].append(pkt['IP'].ttl)
data['len'].append(pkt['IP'].len)
data['dataofs'].append(pkt['TCP'].dataofs)
data['window'].append(pkt['TCP'].window)
data['flags'].append(flag)
if packet != 0:
lst_pkt = scap[packet-1]
data['time_delta'].append(pkt.time - lst_pkt.time)
data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)
panda = pd.DataFrame(data=data)
panda['ttl']=panda['ttl'].astype('float16')
panda['flags']=panda['flags'].astype('float16')
panda['dataofs']=panda['dataofs'].astype('float16')
panda['len']=panda['len'].astype('float16')
panda['window']=panda['window'].astype('float32')
panda['seq_delta']=panda['seq_delta'].astype('float32')
panda['ack_delta']=panda['ack_delta'].astype('float32')
df =dd.from_pandas(panda,npartitions=6)
gc.collect()
return df
Вот функция конкатенации
def process(file):
global flags
global df
data = {
'time_delta': [0],
'ttl':[],
'len':[],
'dataofs':[],
'window':[],
'seq_delta':[0],
'ack_delta':[0],
'flags':[]
}
scap = sniff(offline=file,filter='tcp and ip')
for packet in range(0,len(scap)):
pkt = scap[packet]
flag = flags[str(pkt['TCP'].flags)]
data['ttl'].append(pkt['IP'].ttl)
data['len'].append(pkt['IP'].len)
data['dataofs'].append(pkt['TCP'].dataofs)
data['window'].append(pkt['TCP'].window)
data['flags'].append(flag)
if packet != 0:
lst_pkt = scap[packet-1]
data['time_delta'].append(pkt.time - lst_pkt.time)
data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)
panda = pd.DataFrame(data=data)
panda['ttl']=panda['ttl'].astype('float16')
panda['flags']=panda['flags'].astype('float16')
panda['dataofs']=panda['dataofs'].astype('float16')
panda['len']=panda['len'].astype('float16')
panda['window']=panda['window'].astype('float32')
panda['seq_delta']=panda['seq_delta'].astype('float32')
panda['ack_delta']=panda['ack_delta'].astype('float32')
#merge version dd.merge(df, panda)
dd.concat([df,dd.from_pandas(panda,npartitions=6)])
gc.collect()
А вот и основная программа
directory = 'dev/streams/'
files = os.listdir(directory)
df = initialize(directory+files[0])
files.remove(files[0])
for file in files:
process(directory+file)
print(len(df))
с использованием слияния:
print(len(df)) = 1
используя concat:
print(len(df))=6
ожидается
print(len(df)) > 10,000