Отслеживание IP-адресов попыток входа в SSH с помощью twisted.conch - PullRequest
2 голосов
/ 01 апреля 2012

Я пытаюсь перехватить попытки ssh bruteforce, используя простой ssh-сервер, код приведен ниже и работает, однако я не могу сопоставить комбинации имени пользователя и пароля с их исходным IP-адресом.

Это настолько просто, насколько я могу понять, пока он не регистрирует, а просто печатает на стандартный вывод.Я использовал определение buildProtocol для распечатки IP-адреса нового соединения, когда оно установлено.Однако я хотел бы получить IP-адрес вместе с именем пользователя и паролем, чтобы я мог одновременно отслеживать несколько попыток ssh bruteforce от разных клиентов.В существующем состоянии я могу получить только IP-адрес при установлении соединения, что делает невозможным одновременное отслеживание нескольких соединений.

Просто чтобы уточнить, когда я имею в виду соединение или соединения, я не имею в виду успешный вход в системуэто невозможно с моим кодом и не предназначено.Я имею в виду одно подключение к серверу ssh, которое разрешает 3 попытки ввода пароля перед повторным подключением.

from zope.interface import implements
from twisted.conch.unix import UnixSSHRealm
from twisted.cred import portal
from twisted.cred.credentials import IUsernamePassword
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.error import UnauthorizedLogin
from twisted.conch.ssh import factory, userauth, keys, session
from twisted.internet import reactor, defer

with open('id_rsa') as privateBlobFile:
    privateKey = privateBlobFile.read()

with open('id_rsa.pub') as publicBlobFile:
    publicKey = publicBlobFile.read()

class FailDB:
    credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker)

    def requestAvatarId(self, credentials):
        print"%s:%s" % ( credentials.username, credentials.password)
        return defer.fail(UnauthorizedLogin("invalid password"))

class UnixSSHdFactory(factory.SSHFactory):
    publicKeys = {
        'ssh-rsa': keys.Key.fromString(data=publicKey)
    }
    privateKeys = {
        'ssh-rsa': keys.Key.fromString(data=privateKey)
    }
    services = {
        'ssh-userauth': userauth.SSHUserAuthServer
    }

    def buildProtocol(self, addr):
        print addr
        return factory.SSHFactory.buildProtocol(self, addr)

if __name__ == '__main__':
    portal = portal.Portal(UnixSSHRealm())
    portal.registerChecker(FailDB())
    UnixSSHdFactory.portal = portal
    reactor.listenTCP(2022, UnixSSHdFactory())
    reactor.run()

Пример вывода:

IPv4Address(TCP, '127.0.0.1', 42141)
root:password123
root:123456
root:letmein

1 Ответ

0 голосов
/ 10 февраля 2015

Вы можете получить адрес после неудачной попытки входа в систему, получив производную от SSHUserAuthServer и переопределив метод _ebBadAuth.Пример:

class AuthServer(userauth.SSHUserAuthServer):
    def _ebBadAuth(self, reason):
        addr = self.transport.getPeer().address.host
        print("addr {} failed to log in as {} using {}"
              .format(addr, self.user, self.method))
        userauth.SSHUserAuthServer._ebBadAuth(self, reason)

Если вы также хотите разрешить успешное подключение, вам не нужно переопределять SSHFactory.services, а только обновлять его с помощью вашего производного класса, например factory.services.update({'ssh-userauth': AuthServer}).

...