Сброс соединения RabbitMQ Pika, (-1, ConnectionResetError (104, «Сброс соединения по пиру») - PullRequest
0 голосов
/ 02 января 2019

выполнил поиск в stackoverflow и разместил этот вопрос, потому что у меня не сработало решение, и мой вопрос может отличаться от другого вопроса.

Я пишу сценарий, который получает статью из очереди rabbitMQ и обрабатывает ее, чтобы подсчитать количество слов, извлечь из нее ключевые слова и вывести ее в базу данных. мой скрипт работает нормально, но через некоторое время я получаю это исключение
(-1, "ConnectionResetError(104, 'Connection reset by peer')")

Понятия не имею, почему я это получаю. Я перепробовал множество решений, доступных на стековом потоке. Я написал свой сценарий и попробовал его двумя разными способами. оба работают нормально, но через некоторое время возникает одно и то же исключение.

вот мой первый код:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    # Edit 4
    def pika_connect():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
        channel = connection.channel()
        print ("In pika connect")
        Logger.log_message('Setting up input queue consumer')
        channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
        channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

        Logger.log_message('Starting loop')
        channel.start_consuming()

    #########

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()


    # Edit 5 starting 10 threads to listen to pika 

    for th in range(qthreads):
        Logger.log_message('Starting thread: '+str(th))
        try:
            t = Thread(target=pika_connect, args=())
            t.start()
        except Exception as e:
            Logger.error_message("Exception in starting threads " + str(e))



try:
    app_main()
except Exception as e:
    Logger.error_message("Exception in APP MAIN " + str(e))

вот мой второй код:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    connection = pika.BlockingConnection(pika.ConnectionParameters(
             host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()
    print ("In app main")
    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
    channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

    Logger.log_message('Starting loop')

    try:
        channel.start_consuming()
    except Exception as e:
        Logger.error_message("Exception in start_consuming in main " + str(e))
        raise e


try:
    app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))


В своем первом коде я использовал многопоточность, потому что хочу ускорить процесс обработки статей.
это моя функция обратного вызова
def on_message(ch, method, properties, message): Logger.log_message("Starting parsing new msg ") handle_message(message)

РЕДАКТИРОВАТЬ: Полный код

import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
    vars = {}
    lock = None

    def __init__(self):
        self.lock = threading.Lock()

    def inc(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] += 1
            else:
                self.vars[var] = 1
        finally:
            self.lock.release()


    def dec(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] -= 1
            else:
                Logger.error_message('Cannot decrement ' + var + ', not tracked')
        finally:
            self.lock.release()

    def get(self, var):

        out = None
        self.lock.acquire()
        try:
            if var in self.vars:
                out = self.vars[var]
            else:
                Logger.error_message('Cannot get ' + var + ', not tracked')
        finally:
            self.lock.release()


        return out

    def get_all(self):

        out = None
        self.lock.acquire()
        try:
            out = self.vars.copy()
        finally:
            self.lock.release()


        return out


class SpeedTracker(threading.Thread):
    speedvars = None
    start_ts = None
    last_vars = {}

    def __init__(self, speedvars):
        super(SpeedTracker, self).__init__()
        self.start_ts = time.time()
        self.speedvars = speedvars
        Logger.log_message('Setting up speed tracker')

    def run(self):
        while True:
            time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
            prev = self.last_vars
            cur = self.speedvars.get_all()
            now = time.time()
            if len(prev) > 0:
                q = {}
                for key in cur:
                    qty = cur[key] - prev[key]
                    avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
                    overall_avg = cur[key] / (now - self.start_ts)
                    Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
                                       + ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
                                       + ', overall speed ' + '%0.2f' % overall_avg + '/sec')
                pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
                pending_avg = pending / (now - self.start_ts)
                Logger.log_message('Speed-tracking (pending): total ' + str(pending)
                                   + ', overall speed ' + '%0.2f' % pending_avg + '/sec')
            self.last_vars = cur


class ResultsSender(threading.Thread):
    channel = None
    results = None
    speedvars = None

    def __init__(self, results, speedvars):
        super(ResultsSender, self).__init__()
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=Config.AMQ_DAEMONS['base']['amq-host']))
        self.channel = connection.channel()
        Logger.log_message('Setting up output exchange')
        self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
        self.results = results
        self.speedvars = speedvars

    def run(self):
        while True:
            item = self.results.get()
            self.channel.basic_publish(
                exchange=Config.AMQ_DAEMONS['consumer']['output'],
                routing_key='',
                body=item)
            self.speedvars.inc(SPD_SENT)

def parse_message(message):
    try:
        bodytxt = message.decode('UTF-8')
        body = json.loads(bodytxt)
        return body
    except Exception as e:
        Logger.error_message("Cannot parse message - " + str(e))
        raise e

def get_body_elements(body):
    try:
        artid = str(body.get('article_id'))
        article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
        date = article_dt.strftime(Config.DATE_FORMAT)
        article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
        return (artid, date, article)
    except Exception as e:
        Logger.error_message("Cannot retrieve article attributes " + str(e))
        raise e

def process_article(id, date, text):
    global results, speedvars
    try:
        Logger.log_message('Processing article ' + id)
        keywords = Pipeline.extract_keywords(text)
        send_data = {"id": id, "date": date, "keywords": keywords}
        results.put(pickle.dumps(send_data))
        # print('Queue Size:',results.qsize())
    except Exception as e:
        Logger.error_message("Problem processing article " + str(e))
        raise e

def ack_message(ch, delivery_tag):
    """Note that `channel` must be the same pika channel instance via which
    the message being ACKed was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
        # Channel is already closed, so we can't ACK this message;
        # log and/or do something that makes sense for your app in this case.
        #pass

def handle_message(connection, ch, delivery_tag, message):
    global speedvars
    start = time.time()
    thread_id = threading.get_ident()

    try:
        speedvars.inc(SPD_RECEIVED)
        body = parse_message(message)
        (id, date, text) = get_body_elements(body)
        words = len(text.split())
        if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
            process_article(id, date, text)
        else:
            Logger.log_message('Ignoring article, over word count limit')
            speedvars.inc(SPD_DISCARDED)

    except Exception as e:
        Logger.error_message("Could not process message - " + str(e))

    cb = functools.partial(ack_message, ch, delivery_tag)
    connection.add_callback_threadsafe(cb)

    Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag)) 
    Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK    
## def on_message(ch, method, properties, message):
##    global executor
##    executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
    (connection, threads) = args
    delivery_tag = method.delivery_tag
    t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
    t.start()
    threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
    global channel, results, speedvars

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()


    # Pika Connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()

    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

    #channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
    channel.basic_qos(prefetch_count=1)
    threads = []
    on_message_callback = functools.partial(on_message, args=(connection, threads))
    channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

    Logger.log_message('Starting loop')
    ## channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()

    Wait for all to complete
    for thread in threads:
        thread.join()

    connection.close()


app_main()  

Пика не тратит много времени на обработку сообщения, но я все еще сталкиваюсь с проблемой сброса соединения.
** Общее время обработки сообщения: 0.0005991458892822266 **

1 Ответ

0 голосов
/ 03 января 2019

Ваш метод handle_message блокирует тактовые импульсы, поскольку весь ваш код, включая цикл ввода / вывода Pika, выполняется в одном потоке. Посмотрите этот пример о том, как запустить вашу работу (handle_message) в отдельном потоке от цикла ввода / вывода Pikas, а затем правильно подтвердите сообщения.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы о StackOverflow.

...