Параллельное сопоставление файлов, Python - PullRequest
4 голосов
/ 02 октября 2011

Я пытаюсь улучшить скрипт, который сканирует файлы на наличие вредоносного кода.У нас есть список шаблонов регулярных выражений в файле, по одному шаблону в каждой строке.Это регулярное выражение для grep, так как наша текущая реализация - это в основном bash-скрипт find \ grep combo.Скрипт bash занимает 358 секунд в моем каталоге тестов.Я смог написать скрипт на python, который сделал это за 72 секунды, но хочу улучшить еще.Сначала я опубликую базовый код, а затем настройки, которые я пробовал:

import os, sys, Queue, threading, re

fileList = []
rootDir = sys.argv[1]

class Recurser(threading.Thread):

    def __init__(self, queue, dir):
    self.queue = queue
    self.dir = dir
    threading.Thread.__init__(self)

    def run(self):
    self.addToQueue(self.dir)

    ## HELPER FUNCTION FOR INTERNAL USE ONLY
    def addToQueue(self,  rootDir):
      for root, subFolders, files in os.walk(rootDir):
    for file in files:
       self.queue.put(os.path.join(root,file))
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)

class Scanner(threading.Thread):

    def __init__(self, queue, patterns):
    self.queue = queue
    self.patterns = patterns
    threading.Thread.__init__(self)

    def run(self):
    nextFile = self.queue.get()
    while nextFile is not -1:
       #print "Trying " + nextFile
       self.scanFile(nextFile)
       nextFile = self.queue.get()


    #HELPER FUNCTION FOR INTERNAL UES ONLY
    def scanFile(self, file):
       fp = open(file)
       contents = fp.read()
       i=0
       #for patt in self.patterns:
       if self.patterns.search(contents):
      print "Match " + str(i) + " found in " + file

############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################


fileQueue = Queue.Queue()

#Get the shell scanner patterns
patterns = []
fPatt = open('/root/patterns')
giantRE = '('
for line in fPatt:
   #patterns.append(re.compile(line.rstrip(), re.IGNORECASE))
   giantRE = giantRE + line.rstrip() + '|'

giantRE = giantRE[:-1] + ')'
giantRE = re.compile(giantRE, re.IGNORECASE)

#start recursing the directories
recurser = Recurser(fileQueue,rootDir)
recurser.start()

print "starting scanner"
#start checking the files
for scanner in xrange(0,8):
   scanner = Scanner(fileQueue, giantRE)
   scanner.start()

Это явно отладочный \ уродливый код, не обращайте внимания на миллион queue.put (-1), я его почистюпотом.Некоторые отступы не отображаются должным образом, особенно в scanFile.

Во всяком случае, некоторые вещи я заметил.Использование 1, 4 и даже 8 потоков (для сканера в xrange (0, ???) :) не имеет значения.Я все еще получаю ~ 72 секунды независимо.Я предполагаю, что это происходит из-за GIL в Python.

В отличие от создания гигантского регулярного выражения, я попытался поместить каждую строку (шаблон) как RE-файл compilex в список и перебирать этот список в своей функции scanfile.Это привело к увеличению времени выполнения.

Чтобы избежать GIL в python, я попытался заставить каждый поток разветвляться на grep следующим образом:

#HELPER FUNCTION FOR INTERNAL UES ONLY
def scanFile(self, file):
      s = subprocess.Popen(("grep", "-El", "--file=/root/patterns", file), stdout = subprocess.PIPE)
      output = s.communicate()[0]
      if output != '':
         print 'Matchfound in ' + file

Это привело к увеличению времени выполнения.

Любые предложения по улучшению производительности.

::::::::::::: EDIT ::::::::

Я не могу опубликовать ответы на своиТем не менее, здесь есть ответы на несколько вопросов:

@ David Nehme - просто чтобы люди знали, что я знаю о том, что у меня есть миллион queue.put (-1) '

@ Blender - для отметки нижней части очереди.Мои потоки сканера продолжают блокироваться, пока не достигнут -1, который находится внизу (в то время как nextFile не равен -1 :).Ядра процессора равны 8, однако из-за того, что GIL, использующий 1 поток, 4 потока или 8 потоков, НЕ имеет значения.Возникновение 8 подпроцессов привело к значительно более медленному коду (142 сек против 72)

@ ed - Да, это так же медленно, как и команда find \ grep, на самом деле медленнее, потому что без разбора greps файл не нужен

@ Рон - Не могу обновить, это должно быть универсально.Как вы думаете, это ускорит> 72 секунд?Bash Grepper делает 358 секунд.Мой метод pythoniant RE делает 72 секунды с \ 1-8 потоками.Метод popen w \ 8 thrads (8 подпроцессов) выполнялся за 142 секунды.До сих пор гигантский метод только для Python RE является явным победителем на сегодняшний день

@ intuted

Вот суть нашей текущей комбинации find \ grep (не мой сценарий).Это довольно просто.Там есть некоторые дополнительные вещи, такие как ls, но ничего, что должно привести к 5-кратному замедлению.Даже если grep -r немного более эффективен, 5x ОГРОМНОЕ замедление.

 find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -and -ls | tee -a "${HOME}/found.txt"

Код Python более эффективен, я не знаю почему, но я его экспериментально протестировал.Я предпочитаю делать это на питоне.Я уже добился ускорения в 5 раз с python, я хотел бы ускорить его.

::::::::::::: ПОБЕДИТЕЛЬ ПОБЕДИТЕЛЬ ПОБЕДИТЕЛЬ :::::::::::::::::

Похоже, у нас есть победитель.

Сценарий оболочки intued занимает 2-е место с 34 секундами, однако @ steveha пришел первым с 24 секундами.Из-за того, что у многих наших коробок нет python2.6, мне пришлось его cx_freeze.Я могу написать оболочку сценария оболочки для wget tar и распаковать его.Однако мне нравятся интуиции за простоту.

Спасибо за вашу помощь, ребята, теперь у меня есть эффективный инструмент для системного администрирования

Ответы [ 4 ]

5 голосов
/ 02 октября 2011

Я немного озадачен тем, как ваш скрипт на Python оказался быстрее, чем ваша команда find / grep. Если вы хотите использовать grep в некоторой степени аналогично тому, что было предложено Роном Смитом в его ответе, вы можете сделать что-то вроде

find -type f | xargs -d \\n -P 8 -n 100 grep --file=/root/patterns

для запуска grep процессов, которые будут обрабатывать 100 файлов перед выходом, поддерживая до 8 таких процессов одновременно. При обработке ими 100 файлов время загрузки каждого из них может быть незначительным.

note : опция -d \\n для xargs является расширением GNU, которое не будет работать во всех системах POSIX-ish. Он указывает, что разделитель * d * между именами файлов является новой строкой. Хотя технически имена файлов могут содержать символы новой строки, на практике никто не делает этого и сохраняет свою работу. Для совместимости с не-GNU xargs вам необходимо добавить опцию -print0 к find и использовать -0 вместо -d \\n с xargs. Это позволит использовать нулевой байт \0 (hex 0x00) в качестве разделителя как find, так и xargs.

.

Вы также можете воспользоваться первым подсчетом количества файлов, которые нужно скопировать

NUMFILES="$(find -type f | wc -l)";

, а затем использовать это число, чтобы получить четное разделение между 8 процессами (при условии bash в качестве оболочки)

find -type f | xargs -d \\n -P 8 -n $(($NUMFILES / 8 + 1)) grep --file=/root/patterns

Я думаю, что это может работать лучше, потому что дисковый ввод / вывод find не будет влиять на дисковый ввод / вывод различных grep с. Я полагаю, что это отчасти зависит от размера файлов и от того, хранятся ли они непрерывно - с небольшими файлами диск все равно будет много искать, так что это не будет иметь большого значения. Также обратите внимание на то, что, особенно если у вас приличный объем ОЗУ, последующие запуски такой команды будут выполняться быстрее, поскольку некоторые файлы будут сохранены в вашем кеше памяти.

Конечно, вы можете параметризовать 8, чтобы было проще экспериментировать с различным числом одновременных процессов.

Как изд. Как упоминается в комментариях, вполне возможно, что производительность этого подхода будет все же менее впечатляющей, чем у однопроцессного grep -r. Я думаю, это зависит от относительной скорости вашего диска [массива], количества процессоров в вашей системе и т. Д.

5 голосов
/ 02 октября 2011

Я думаю, что вместо использования модуля threading вы должны использовать модуль multiprocessing для своего решения Python.Потоки Python могут столкнуться с GIL;GIL не проблема, если у вас просто запущено несколько процессов Python.

Я думаю, что для того, что вы делаете, пул рабочих процессов - это то, что вы хотите.По умолчанию в пуле будет по умолчанию один процесс для каждого ядра в системном процессоре.Просто вызовите метод .map() со списком имен файлов для проверки и функцию, которая выполняет проверку.

http://docs.python.org/library/multiprocessing.html

Если это не быстрее, чем ваша реализация threading, тогдаЯ не думаю, что GIL - ваша проблема.

РЕДАКТИРОВАТЬ: Хорошо, я добавляю работающую программу Python.При этом используется пул рабочих процессов для открытия каждого файла и поиска шаблона в каждом.Когда работник находит имя файла, которое совпадает, он просто печатает его (в стандартный вывод), чтобы вы могли перенаправить вывод этого скрипта в файл и получить список файлов.

РЕДАКТИРОВАТЬ: Я думаю, что этонемного более легкая для чтения версия, более легкая для понимания.

Я рассчитал это, просматривая файлы в / usr / include на моем компьютере.Поиск завершается примерно за полсекунды.Использование find по конвейеру через xargs для запуска как можно меньшего числа grep процессов занимает около 0,05 секунды, что примерно в 10 раз быстрее.Но я ненавижу странный язык барокко, который вы должны использовать, чтобы заставить find работать должным образом, и мне нравится версия Python.И, возможно, в действительно больших каталогах расхождение будет меньше, так как часть полсекунды для Python должна быть временем запуска.А может быть, полсекунды достаточно для большинства целей!

import multiprocessing as mp
import os
import re
import sys

from stat import S_ISREG


# uncomment these if you really want a hard-coded $HOME/patterns file
#home = os.environ.get('HOME')
#patterns_file = os.path.join(home, 'patterns')

target = sys.argv[1]
size_limit = int(sys.argv[2])
assert size_limit >= 0
patterns_file = sys.argv[3]


# build s_pat as string like:  (?:foo|bar|baz)
# This will match any of the sub-patterns foo, bar, or baz
# but the '?:' means Python won't bother to build a "match group".
with open(patterns_file) as f:
    s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))

# pre-compile pattern for speed
pat = re.compile(s_pat)


def walk_files(topdir):
    """yield up full pathname for each file in tree under topdir"""
    for dirpath, dirnames, filenames in os.walk(topdir):
        for fname in filenames:
            pathname = os.path.join(dirpath, fname)
            yield pathname

def files_to_search(topdir):
    """yield up full pathname for only files we want to search"""
    for fname in walk_files(topdir):
        try:
            # if it is a regular file and big enough, we want to search it
            sr = os.stat(fname)
            if S_ISREG(sr.st_mode) and sr.st_size >= size_limit:
                yield fname
        except OSError:
            pass

def worker_search_fn(fname):
    with open(fname, 'rt') as f:
        # read one line at a time from file
        for line in f:
            if re.search(pat, line):
                # found a match! print filename to stdout
                print(fname)
                # stop reading file; just return
                return

mp.Pool().map(worker_search_fn, files_to_search(target))
1 голос
/ 07 февраля 2019

Позвольте мне также показать вам, как это сделать в Ray , который является средой с открытым исходным кодом для написания параллельных приложений Python. Преимущество этого подхода в том, что он быстр, прост в написании и расширении (скажем, вы хотите передавать много данных между задачами или выполнять некоторое накопление с сохранением состояния), а также может быть запущен в кластере или облаке без изменений. Он также очень эффективен при использовании всех ядер на одной машине (даже для очень больших машин, таких как 100 ядер) и передаче данных между задачами.

import os
import ray
import re

ray.init()

patterns_file = os.path.expanduser("~/patterns")
topdir = os.path.expanduser("~/folder")

with open(patterns_file) as f:
    s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))

regex = re.compile(s_pat)

@ray.remote
def match(pattern, fname):
    results = []
    with open(fname, 'rt') as f:
        for line in f:
            if re.search(pattern, line):
                results.append(fname)
    return results

results = []
for dirpath, dirnames, filenames in os.walk(topdir):
    for fname in filenames:
        pathname = os.path.join(dirpath, fname)
        results.append(match.remote(regex, pathname))

print("matched files", ray.get(results))

Дополнительная информация, в том числе о том, как запустить это в кластере или облаке, доступна в документации

1 голос
/ 02 октября 2011

Если вы хотите перейти на версию 3.2 или выше, вы можете воспользоваться concurrent.futures.ProcessPoolExecutor. Я думаю, что это улучшит производительность по сравнению с методом popen, который вы пытались, потому что он предварительно создаст пул процессов, где ваш метод popen каждый раз создает новый процесс. Вы можете написать свой собственный код, чтобы сделать то же самое для более ранней версии, если по какой-то причине вы не можете перейти на 3.2.

...