Fastparquet, похоже, не давит фильтры - PullRequest
0 голосов
/ 29 ноября 2018

Я создал файл паркета, используя метод данных dask to_parquet, используя fastparquet в качестве движка.При чтении файла с помощью fastparquet.ParquetFile я получаю следующую информацию.

from fastparquet import ParquetFile
file = ParquetFile('data/raw_data_fastpar.par/')
file.dtypes
OrderedDict([(u'@timestamp', dtype('<M8[ns]')),
         (u'@version', dtype('O')),
         (u'_id', dtype('O')),
         (u'browser_build', dtype('O')),
         (u'browser_device', dtype('O')),
         (u'browser_major', dtype('float64')),
         (u'browser_minor', dtype('float64')),
         (u'browser_name', dtype('O')),
         (u'browser_os', dtype('O')),
         (u'browser_os_name', dtype('O')),
         (u'dst', dtype('O')),
         (u'dst_port', dtype('float64')),
         (u'http_req_header_contentlength', dtype('O')),
         (u'http_req_header_host', dtype('O')),
         (u'http_req_header_referer', dtype('O')),
         (u'http_req_header_useragent', dtype('O')),
         (u'http_req_headers', dtype('O')),
         (u'http_req_method', dtype('O')),
         (u'http_req_secondleveldomain', dtype('O')),
         (u'http_req_url', dtype('O')),
         (u'http_req_version', dtype('O')),
         (u'http_resp_code', dtype('O')),
         (u'http_resp_header_contentlength', dtype('O')),
         (u'http_resp_header_contenttype', dtype('O')),
         (u'http_resp_headers', dtype('O')),
         (u'http_user', dtype('O')),
         (u'received_from', dtype('O')),
         (u'redis_db', dtype('O')),
         (u'src', dtype('O')),
         (u'src_port', dtype('float64')),
         (u'type', dtype('O')),
         (u'month', u'category'),
         (u'day', u'category')])


file.schema.text
u'- schema: \n
| - @timestamp: INT64, TIMESTAMP_MICROS, OPTIONAL\n
| - @version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - _id: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_build: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_device: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_major: DOUBLE, OPTIONAL\n
| - browser_minor: DOUBLE, OPTIONAL\n
| - browser_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst_port: DOUBLE, OPTIONAL\n
| - http_req_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_host: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_referer: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_useragent: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_method: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_secondleveldomain: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_url: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_code: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contenttype: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_user: BYTE_ARRAY, UTF8, OPTIONAL\n
| - received_from: BYTE_ARRAY, UTF8, OPTIONAL\n
| - redis_db: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src_port: DOUBLE, OPTIONAL\n  
| - type: BYTE_ARRAY, UTF8, OPTIONAL'

Итак, поля верны.Так как они были данными временных рядов, месяц и день использовались для разделения данных.Общее количество данных 22815984.Теперь я пытаюсь прочитать паркет, используя ключевое слово filters, и получаю странное поведение.

# this works
import datetime
since = datetime.datetime(year=2018, month=10, day=1)
filters = [('@timestamp', '>', np.datetime64(since)),]

raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'], filters=filters)

raw_data.count().compute()

http_user          3835971
dst                3835971
dst_port           3835971
http_req_method    3835971
dtype: int64

, что правильно, и фильтрация была отложена.Когда я меняю фильтр на другое поле,

filters = [('http_req_method', '=', 'GET'),]

Возвращает все данные

http_user          22815984
dst                22815984
dst_port           22815984
http_req_method    22815984
dtype: int64

Делая это вручную, оно работает:

raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'])
raw_data.loc[raw_data.http_req_method == 'GET'].count().compute()
http_user          14407709
dst                14407709
dst_port           14407709
http_req_method    14407709
dtype: int64

Такжеизменение фильтра на несуществующее поле не вызывает никаких исключений, поэтому это тоже странно.Что-то мне не хватает в отношении паркета и фильтрации?

Dask DataFrame Structure:
    http_user   dst     dst_port    http_req_method
npartitions=612                 
    object      object  float64         object
    ...         ...     ...             ...
    ...         ...     ...             ...     
... ...         ...     ...             ...
    ...         ...     ...             ...
Dask Name: read-parquet, 612 tasks

1 Ответ

0 голосов
/ 29 ноября 2018

Опция filters= включена в качестве оптимизации для случаев, когда это имеет смысл, чтобы избежать рассмотрения разделов данных, которые наверняка не содержат никаких действительных данных.

В документах :

Это реализует только фильтрацию на уровне группы строк (разделов), т. Е. Для предотвращения загрузки некоторых фрагментов данных и только в том случае, если в метаданные включена соответствующая статистика.

Например, если у вас есть набор групп строк, в которых интересующий столбец монотонно увеличивается, то фильтр этого столбца, вероятно, сможет исключить многие из групп строк (иначеперегородки).С другой стороны, если каждая группа строк содержит значения во всем диапазоне этого столбца, этот тип фильтра будет иметь какой-либо эффект.

data[raw_data.http_req_method == 'GET']

Это делает что-то другое: теперь каждая группа строк загружается как раздел, а затем фильтруется в памяти рабочих.Dask может загружать только определенные разделы только в особом случае, который вы фильтруете по индексу.

Если вы хотите оптимизировать данные, но ваши данные не структурированы таким образом, чтобы границы разделов идеально совпадали с вашим фильтромВ этом случае вам нужно будет использовать оба метода.

Пожалуйста, поднимите вопрос, если считаете, что строка документа может быть более понятной.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...