Так как я не нашел что-то удобное в python 3 для своих нужд (почтовая часть twisted не работает в python 3), я сделал небольшой макет с asyncio, который вы можете улучшить, если хотите:
Я определил ImapProtocol, который расширяет asyncio.Protocol. Затем запустите сервер следующим образом:
factory = loop.create_server(lambda: ImapProtocol(mailbox_map), 'localhost', 1143)
server = loop.run_until_complete(factory)
mailbox_map - это карта карты: электронная почта -> карта почтовых ящиков -> набор сообщений. Таким образом, все сообщения / почтовые ящики находятся в памяти.
Каждый раз, когда клиент подключается, создается новый экземпляр ImapProtocol.
Затем ImapProtocol выполняется и отвечает для каждого клиента, реализуя возможность / login / fetch / select / search / store:
class ImapHandler(object):
def __init__(self, mailbox_map):
self.mailbox_map = mailbox_map
self.user_login = None
# ...
def connection_made(self, transport):
self.transport = transport
transport.write('* OK IMAP4rev1 MockIMAP Server ready\r\n'.encode())
def data_received(self, data):
command_array = data.decode().rstrip().split()
tag = command_array[0]
self.by_uid = False
self.exec_command(tag, command_array[1:])
def connection_lost(self, error):
if error:
log.error(error)
else:
log.debug('closing')
self.transport.close()
super().connection_lost(error)
def exec_command(self, tag, command_array):
command = command_array[0].lower()
if not hasattr(self, command):
return self.error(tag, 'Command "%s" not implemented' % command)
getattr(self, command)(tag, *command_array[1:])
def capability(self, tag, *args):
# code for it...
def login(self, tag, *args):
# code for it...
Затем в моих тестах я запускаю сервер во время установки с помощью:
self.loop = asyncio.get_event_loop()
self.server = self.loop.run_until_complete(self.loop.create_server(create_imap_protocol, 'localhost', 12345))
Когда я хочу смоделировать новое сообщение:
imap_receive(Mail(to='dest@fakemail.org', mail_from='exp@pouet.com', subject='hello'))
И останови его на разрыве:
self.server.close()
asyncio.wait_for(self.server.wait_closed(), 1)
ср https://github.com/bamthomas/aioimaplib/blob/master/aioimaplib/tests/imapserver.py
РЕДАКТИРОВАТЬ : у меня была остановка сервера с ошибкой, я переписал его с помощью asyncio.Protocol и изменил ответ, чтобы отразить изменения