Как понюхать сетевой интерфейс с помощью Twisted? - PullRequest
0 голосов
/ 08 декабря 2018

Мне нужно получать необработанные пакеты от сетевого интерфейса в коде Twisted.Пакеты не будут иметь правильный IP-адрес или MAC-адрес, а также действительные заголовки, поэтому мне нужна необработанная вещь.

Я попытался изучить twisted.pair, но я не смог выяснить, как его использоватьчтобы получить в сыром интерфейсе.

Обычно, я бы использовал scapy.all.sniff.Однако это блокировка, поэтому я не могу просто использовать его с Twisted.(Я также не могу использовать scapy.all.sniff с таймаутом и занятым циклом, потому что я не хочу терять пакеты.)

Возможным решением было бы запустить scapy.all.sniff в потоке и каким-то образом перезвонитьв Twisted, когда я получаю пакет.Это кажется немного неуравновешенным (а также, я не знаю, как это сделать, потому что я новичок в Twisted), но я могу согласиться на это, если не найду ничего лучшего.

1 Ответ

0 голосов
/ 08 декабря 2018

Вы можете запустить распределенную систему и передавать данные через центральную систему очередей.Возьмите философию Unix и создайте одно приложение, которое выполняет несколько задач и делает их хорошо.Создайте одно приложение, которое прослушивает пакеты (вы можете использовать scapy здесь, так как это не имеет большого значения, блокируете ли вы что-либо), затем отправляет их в очередь (RabitMQ, Redis, SQS и т. Д.), А другое приложение обрабатывает пакет изочередь.Этот метод должен дать вам наименьшую головную боль.

Если вам нужно запустить все в одном приложении, то потоки / многопроцессорная обработка - единственный вариант.Но есть некоторые шаблоны проектирования, которым вы захотите следовать.Вы также можете разбить следующий код на отдельные функции и использовать выделенную систему очередей.

from threading import Thread
from time import sleep
from twisted.internet import defer, reactor

class Sniffer(Thread):
    def __init__(self, _reactor, shared_queue):
        super().__init__()
        self.reactor = _reactor
        self.shared_queue = shared_queue

    def run(self):
        """
        Sniffer logic here
        """
        while True:
            self.reactor.callFromThread(self.shared_queue.put, 'hello world')
            sleep(5)

@defer.inlineCallbacks
def consume_from_queue(_id, _reactor, shared_queue):
    item = yield shared_queue.get()
    print(str(_id), item)
    _reactor.callLater(0, consume_from_queue, _id, _reactor, shared_queue)

def main():
    shared_queue = defer.DeferredQueue()
    sniffer = Sniffer(reactor, shared_queue)
    sniffer.daemon = True
    sniffer.start()

    workers = 4
    for i in range(workers):
        consume_from_queue(i+1, reactor, shared_queue)

    reactor.run()

main()

Класс Sniffer запускается вне контроля Twisted.Обратите внимание на sniffer.daemon = True, это так, что поток остановится, когда основной поток остановился.Если он установлен на False (по умолчанию), то приложение будет закрываться, только если все потоки закончились.В зависимости от поставленной задачи это может или не всегда возможно.Если вы можете сделать перерыв в прослушивании, чтобы проверить событие потока, то вы можете остановить поток более безопасным способом.

self.reactor.callFromThread(self.shared_queue.put, 'hello world') необходимо, чтобы элемент, помещаемый в очередь, происходил восновной поток реактора, в отличие от потока, выполняемого Sniffer.Основным преимуществом этого будет то, что будет некоторая синхронизация сообщений, поступающих из потоков (при условии, что вы планируете масштабировать до прослушивания нескольких интерфейсов).Кроме того, я не был уверен, что DeferredQueue объекты являются потокобезопасными :) Я относился к ним так, как будто они не были.

Поскольку Twisted не управляет потоками в этом случае, жизненно важно, чтобы разработчик делал это.Обратите внимание на петлю worker и consume_from_queue(i+1, reactor, shared_queue).Этот цикл гарантирует, что только желаемое количество работников выполняет задачи.Внутри функции consume_from_queue() shared_queue.get() будет ожидать (не блокируя), пока элемент не будет помещен в очередь, напечатает элемент, а затем запланирует еще один consume_from_queue().

...