Есть ли модуль python, который предоставляет точки входа, которые обертывают клиента openssh? - PullRequest
3 голосов
/ 26 апреля 2011

Существует ли модуль Python, обеспечивающий функции удаленного управления путем запуска подпроцессов, реализующих интерфейс командной строки ssh? Вид функций, которые будут полезны, включают:

  • выполните эту команду на этом компьютере и верните код завершения, stdout и stderr или вызовите исключение по истечении времени ожидания
  • проверить, существует ли каталог на этом компьютере
  • записать эти байты в это имя файла на этом компьютере

Мне известны paramiko и Conch . Тем не менее, я хотел бы, чтобы программы, использующие этот модуль, могли использовать функцию openssh ControlMaster , которая повторно использует существующие долго действующие соединения ssh с целевыми машинами. В противном случае время установления соединения SSH может доминировать во время выполнения. Этот подход также позволяет использовать другие программы, реализующие интерфейс командной строки openssh, без переписывания программ с использованием модуля.

Ответы [ 3 ]

3 голосов
/ 26 апреля 2011

Вы можете найти Python Fabric , подходящий для ваших нужд.Похоже, что он использует paramiko, но умеет кешировать соединения, как описано в модели исполнения .

Хотя оно не будет использовать существующее продолжительное соединение.

1 голос
/ 26 апреля 2011

То, о чем вы говорите, звучит так, как будто вы хотите использовать rsync, в частности ваши функции «проверить, существует ли каталог на этом компьютере», «записать эти байты в это имя файла на этом компьютере», «выполнить обратный вызовкаждый раз строки записываются в этот файл на этом компьютере ".Вы можете использовать некоторые из существующих Python-rsync мостов, которые предоставили пользователи, или создать свой собственный.

Невозможно сохранить долгосрочную работу.Туннель SSH открыт без процесса-демона для его обслуживания.И paramiko, и Conch поддерживают каналы SSH, которые можно легко запустить в процессе демона, как при классическом перенаправлении портов SSH.

0 голосов
/ 27 апреля 2011

Я до сих пор не нашел ни одного кода, который мог бы использовать, поэтому написал версию ниже.Я все еще стремлюсь найти модуль с открытым исходным кодом, который делает подобные вещи.

"""Remote control a machine given an ssh client on Linux"""

from pwd import getpwuid
from socket import socket, AF_UNIX, SOCK_STREAM, error
from os import getuid, write, close, unlink, read
import select, errno
from subprocess import PIPE, Popen, check_call
from time import time, sleep
from tempfile import mkstemp

def whoami():
    """figure out user ID"""
    return getpwuid(getuid()).pw_name

def arg_escape(text):
    """Escape things that confuse shells"""
    return text.replace('(', '\\(').replace(')', '\\)')

def try_open_socket(socket_name):
    """can we talk to socket_name?"""
    sock = socket(AF_UNIX, SOCK_STREAM)
    try:
        sock.connect(socket_name)
    except error:
        return False
    else:
        return True

class ProcessTimeoutError(Exception):
    """Indicates that a process failed to finish in alotted time"""
    pass

class ConnectionTimeoutError(Exception):
    """Indicates that it was not possible to connect in alotted time"""

class CalledProcessError(Exception):
    """Indicates non-zero exit of a process we expected to exit cleanly"""

def local_run_with_timeout(command, timeout=60, check=True):
    """Run a command with a timeout after which it will be SIGKILLed.

    If check is set raise CalledProcessError if the command fails.

    Based on the standard library subprocess.Popen._communicate_with_poll.
    """
    process = Popen(command, shell=False, stdout=PIPE, stderr=PIPE)
    poller = select.poll()
    start = time()
    fd2file = {}
    content = {}
    for stream in [process.stdout, process.stderr]:
        poller.register(stream.fileno(), select.POLLIN | select.POLLPRI)
        fd2file[stream.fileno()] = stream
        content[stream] = ''
    vout = lambda: content[process.stdout]
    verr = lambda: content[process.stderr]
    while fd2file:
        delta = time() - start
        if delta > timeout:
            process.kill()
            raise ProcessTimeoutError(command, timeout, vout(), verr())
        try:
            ready = poller.poll(timeout-delta)
        except select.error, exc:
            if exc.args[0] == errno.EINTR:
                continue
            raise
        for fileno, mode in ready:
            if mode & (select.POLLIN | select.POLLPRI):
                data = read(fileno, 4096)
                content[fd2file[fileno]] += data
                if data:
                    continue
            fd2file[fileno].close()
            fd2file.pop(fileno)
            poller.unregister(fileno)
    process.wait()
    if check and process.returncode != 0:
        raise CalledProcessError(process.returncode, delta, command,
                                 vout(), verr())
    return (process.returncode, vout(), verr())



class Endpoint:
    """Perform operations on a remote machine"""
    def __init__(self, host, user=None, process_timeout=10,
                 connection_timeout=20):
        self.host = host
        self.user = user
        self.target = ((self.user+'@') if self.user else '') + self.host
        self.process_timeout = process_timeout
        self.connection_timeout = connection_timeout

    def start(self):
        """Start the SSH connection and return the unix socket name.

        Requires http://sourceforge.net/projects/sshpass/ and
        http://software.clapper.org/daemonize/ to be installed
        """
        socket_name = '/tmp/' + whoami() + '-ssh-' + self.target
        if not try_open_socket(socket_name):
            check_call(['daemonize', '/usr/bin/ssh',
                        '-N', '-M', '-S', socket_name, self.target])
            start = time()
            while not try_open_socket(socket_name):
                delta = time() - start
                if delta > self.connection_timeout:
                    raise ConnectionTimeoutError(delta, self.target)
                sleep(1)
        return socket_name

    def call(self, command, timeout=None, check=False):
        """Run command with timeout"""
        if not timeout:
            timeout = self.process_timeout
        socket_name = self.start()
        if type(command) == type(''):
            command = command.split()
        command_escape = [arg_escape(x) for x in command]
        command_string = ' '.join(command_escape)

        return local_run_with_timeout(
            ['/usr/bin/ssh', '-S', socket_name,
             self.target, command_string], timeout=timeout, check=check)
    def check_call(self, command, timeout=None):
        """Run command with timeout"""
        exitcode, stdout, stderr = self.call(command, timeout=timeout,
                                             check=True)
        return stdout, stderr

    def isdir(self, directory):
        """Return true if a directory exists"""
        return 'directory\n' in self.call(['stat', directory])[1]

    def write_file(self, content, filename):
        """Store content on filename"""
        handle, name = mkstemp()
        try:
            write(handle, content)
            close(handle)
            socket_name = self.start()
            exitcode, stdout, stderr = local_run_with_timeout(
                ['/usr/bin/scp', '-o', 'ControlPath='+socket_name,
                 '-o', 'ControlMaster=auto', name, self.target+':'+filename],
                timeout=self.process_timeout, check=True)
        finally:
            unlink(name)

def test_check_call():
    """Run through some test cases. """
    tep = Endpoint('localhost')
    assert 'dev' in tep.check_call('ls /')[0]
    assert tep.call('false')[0] != 0

def test_basic_timeout():
    """Ensure timeouts trigger"""
    import pytest # "easy_install pytest" FTW
    start = time()
    with pytest.raises(ProcessTimeoutError):
        Endpoint('localhost').call('sleep 5', timeout=0.2)
    assert not (time()-start > 3)

def test_timeout_output():
    """check timeouts embed stdout"""
    import pytest # "easy_install pytest" FTW
    with pytest.raises(ProcessTimeoutError):
        Endpoint('localhost').call('find /', timeout=0.2)

def test_non_zero_exit():
    """chek check_call raises an CalledProcessError on timeout"""
    import pytest # "easy_install pytest" FTW
    with pytest.raises(CalledProcessError):
        Endpoint('localhost').check_call('false')

def test_fs():
    """check filesystem operations"""
    tep = Endpoint('localhost')
    assert tep.isdir('/usr')
    assert not tep.isdir(str(time()))
    tep.write_file('hello world', '/tmp/test')
    tep.check_call(['grep','hello','/tmp/test'])
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...