Проверка электронной почты: 550 Ответ через прокси socks5 (заблокирован почтовым сервером) - PullRequest
0 голосов
/ 17 января 2020

Я разрабатываю сценарий проверки электронной почты в python, и ошибка, с которой я сталкиваюсь, заключается в том, что всякий раз, когда я пытаюсь проверить деловую электронную почту, т.е. (xxx@ytpak.edu), он блокируется почтовым сервером, возвращающим ответ как код ошибки 550 т.е. запрос заблокирован почтовым сервером. Я использую этот скрипт. https://github.com/Gardener-gg/email-verifier

Это мой класс smtp для носка

'' 'Импорт сокетов для импорта сокетов

из smtplib import SMTP

Класс SocksSMTP (SMTP):

def __init__(self,
        host='',
        port=0,
        local_hostname=None,
        timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
        source_address=None,
        proxy_type=None,
        proxy_addr=None,
        proxy_port=None,
        proxy_rdns=True,
        proxy_username=None,
        proxy_password=None,
        socket_options=None):

    self.proxy_type=proxy_type
    self.proxy_addr=proxy_addr
    self.proxy_port=proxy_port
    self.proxy_rdns=proxy_rdns
    self.proxy_username=proxy_username
    self.proxy_password=proxy_password
    self.socket_options=socket_options
    # if proxy_type is provided then change the socket to socksocket
    # behave like a normal SMTP class.
    if self.proxy_type:
        self._get_socket = self.socks_get_socket

    super(SocksSMTP, self).__init__(host, port, local_hostname, timeout, source_address)

def socks_get_socket(self, host, port, timeout):
    if self.debuglevel>0:
        self._print_debug('connect: to', (host, port), self.source_address)
        print(host,port)
    return socks.create_connection((host, port),
            timeout=timeout,
            source_address=self.source_address,
            proxy_type=self.proxy_type,
            proxy_addr=self.proxy_addr,
            proxy_port=self.proxy_port,
            proxy_rdns=self.proxy_rdns,
            proxy_username=self.proxy_username,
            proxy_password=self.proxy_password,
            socket_options=self.socket_options)

Это мой верифицируемый класс:

Проверка класса:

def __init__(self,
             source_addr,
             proxy_type = None,
             proxy_addr = None,
             proxy_port = None,
             proxy_username = None,
             proxy_password = None):
    """
    Initializes the Verifier object with proxy settings.
    :param proxy_type: One of `SOCKS4`, `SOCKS5` or `HTTP`.
    :param proxy_addr: Address of the proxy.
    :param proxy_port: Port of the proxy.
    :param proxy_username: Username to authenticate with.
    :param proxy_password: Password for the user. (Only when username is provided)
    """
    if proxy_type:
        try:
            self.proxy_type = proxy[proxy_type.lower()]
        except KeyError as e:
            raise UnknownProxyError(proxy_type)
    else:
        self.proxy_type = None
    self.source_addr = source_addr
    self.proxy_addr = proxy_addr
    self.proxy_port = proxy_port
    self.proxy_username = proxy_username
    self.proxy_password = proxy_password

def _parse_address(self, email) -> Address:
    """
    Parses the email address provided and splits it 
    into username and domain.

    Returns a named tuple Address
    """
    name, addr = parseaddr(email)
    if not addr:
        raise EmailFormatError(f"email does not contain address: {email}")
    try:
        domain = addr.split('@')[-1]
        username = addr.split('@')[:-1][0]
    except IndexError:
        raise EmailFormatError(f"address provided is invalid: {email}")
    return Address(name, addr, username, domain)

def _random_email(self, domain):
    """
    This method generates a random email by using the os.urandom
    for the domain provided in the parameter.
    """
    return f'{binascii.hexlify(os.urandom(20)).decode()}@{domain}'

def _can_deliver(self,
                 exchange : str,
                 address : str):
    """
    Checks the deliverablity of an email to the given mail_exchange.
    Creates a connection using the SMTP and tries to add the email to 
    a recipients.

    :param exchange: The exchange url for the domain of email
    :param address: The email address to check for deliverablity

    :returns: A 3-tuple of host_exists, deliverable and catch_all
    """
    host_exists = False
    with SMTP(exchange[1],
            proxy_type=self.proxy_type,
            proxy_addr=self.proxy_addr,
            proxy_port=self.proxy_port,
            proxy_username=self.proxy_username,
            proxy_password=self.proxy_password) as smtp:
        host_exists = True
        smtp.helo()
        smtp.mail(self.source_addr)
        test_resp = smtp.rcpt(address.addr)
        catch_all_resp = smtp.rcpt(self._random_email(address.addr))
        if test_resp[0] == 250:
            deliverable = True
            if catch_all_resp[0] == 250:
                catch_all = True
            else:
                catch_all = False
        elif test_resp[0] >= 400:
            raise SMTPRecepientException(*test_resp)
    return host_exists, deliverable, catch_all

def verify(self, email):
    """
    method that performs the verification on the passed
    email address.
    """
    lookup = {
        'address': None,
        'valid_format': False,
        'deliverable': False,
        'full_inbox': False,
        'host_exists': False,
        'catch_all': False,
    }
    try:
        lookup['address'] = self._parse_address(email)
        lookup['valid_format'] = True
    except EmailFormatError:
        lookup['address'] = f"{email}"
        return lookup

    # look for mx record and create a list of mail exchanges
    try:
        mx_record = resolver.query(lookup['address'].domain, 'MX')
        mail_exchangers = [exchange.to_text().split() for exchange in mx_record]
        lookup['host_exists'] = True
    except (resolver.NoAnswer, resolver.NXDOMAIN, resolver.NoNameservers):
        lookup['host_exists'] = False
        return lookup

    for exchange in mail_exchangers:
        try:
            host_exists, deliverable, catch_all = self._can_deliver(exchange, lookup['address'])
            if deliverable:
                lookup['host_exists'] = host_exists
                lookup['deliverable'] = deliverable
                lookup['catch_all'] = catch_all
                break
        except SMTPRecepientException as err:
            # Error handlers return a dict that is then merged with 'lookup'
            kwargs = handle_error.get(err.code, handle_unrecognised)(err.response)
            # This expression merges the lookup dict with kwargs
            lookup = {**lookup, **kwargs}

        except smtplib.SMTPServerDisconnected as err:
            lookup['message'] = "Internal Error"
        except smtplib.SMTPConnectError as err:
            lookup['message'] = "Internal Error. Maybe blacklisted"

    return lookup

Это моя функция проверки электронной почты, в которой я отправка запроса smtp через прокси socks5 на почтовый сервер:

def Email_Verification():
    global client_connection
    t1 = time.perf_counter()
    with open('email.json', 'r') as f:
        data = json.load(f)
        sort = sorted(data.keys())
        count = sort[-1]
    emails = []
    for key, value in data[count][0].items():
        emails.append(value)
    for email in emails:
        domain = email.split('@')[-1]
        socks_verifier = Verifier(
            source_addr='gulzi.61930@gmail.com',
            proxy_type='socks5',
            proxy_addr='159.89.49.106',
            proxy_port=1110,
            proxy_username='funky-username',
            proxy_password='crazy-password'
        )
        results = socks_verifier.verify(email)
        print(results)
        response = pickle.dumps(results)
        response = bytes(f'{len(response):<{HEADERSIZE}}','utf-8')+response
        client_connection.send(response)
        time.sleep(1)
    t2 = time.perf_counter()
    print(f'Finished time is {t2-t1} secs')

Пожалуйста, помогите! Этот скрипт хорошо работает для Gmail, Hotmail и других веб-писем. Не работает для бизнес-доменов.

...