amqplib: сокет внезапно закрылся при открытии рукопожатия - PullRequest
0 голосов
/ 18 февраля 2019

Что я пытаюсь сделать

Я пытаюсь создать rabbit-mq для издателя и подписчика.Он работает, как и ожидалось, пока я не попытаюсь перезапустить свой сервер rabbit-mq.

Что работает

Я использую rabbitmq:3-management образ докера, ampqlib 5.3 и Node.js 11.10.0, чтобы сделатьэта простая программа:

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

function createChannel() {
    return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
        .then((conn) => conn.createChannel());
}

Promise.all([createChannel(), createChannel()])

    .then(async (channels) => {
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        await publisherChannel.assertQueue(q).then(function(ok) {
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        await consumerChannel.assertQueue(q).then(function(ok) {
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

Итак, во-первых, я сделал два канала.Один в качестве издателя, а другой в качестве потребителя.

Издатель отправляет something to do сообщение в очередь tasks.

Затем потребитель перехватывает сообщение и выводит его на экран, используя console.log.

Работает как положено.

Что не работает

Первая попытка

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

function createChannel() {
    return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
        .then((conn) => conn.createChannel());
}

Promise.all([createChannel(), createChannel()])

    .then((channels) => {

        // Let's say rabbitmq is down, and then up again
        execSync("docker stop rabbitmq");
        execSync("docker start rabbitmq");

        return channels;
    })

    .then(async (channels) => {
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        await publisherChannel.assertQueue(q).then(function(ok) {
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        await consumerChannel.assertQueue(q).then(function(ok) {
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

Аналогично моей предыдущей попытке, но на этот разЯ пытаюсь остановить и запустить контейнер rabbit-mq (перезапустить сервер), прежде чем продолжить.

Не работает, вместо этого я получаю эту ошибку:

{ Error: Socket closed abruptly during opening handshake
    at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                           
    at Socket.emit (events.js:202:15)
    at endReadableNT (_stream_readable.js:1129:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Socket closed abruptly during opening handshake
       at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                        
       at Socket.emit (events.js:202:15)
       at endReadableNT (_stream_readable.js:1129:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }
[guldan@draenor labs]$ node --version
v11.10.0
[guldan@draenor labs]$ docker start rabbitmq && node test.js
rabbitmq
{ Error: Channel ended, no reply will be forthcoming
    at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)                                                                                  
    at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)                                                            
    at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)                                                                   
    at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)                                                      
    at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)                                                             
    at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)                                                                     
    at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)                                                 
    at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)                                                                        
    at Socket.emit (events.js:197:13)
    at emitReadable_ (_stream_readable.js:539:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Channel ended, no reply will be forthcoming
       at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)                                                                               
       at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)                                                         
       at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)                                                                
       at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)                                                   
       at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)                                                          
       at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)                                                                  
       at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)                                              
       at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)                                                                     
       at Socket.emit (events.js:197:13)
       at emitReadable_ (_stream_readable.js:539:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }

Вторая попытка

Моя первая попытка не сработала.Итак, я пытаюсь создать новый канал после перезапуска сервера:

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

function createChannel() {
    return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
        .then((conn) => conn.createChannel());
}

Promise.all([createChannel(), createChannel()])

    .then((channels) => {

        // Let's say rabbitmq is down, and then up again
        execSync("docker stop rabbitmq");
        execSync("docker start rabbitmq");

        return Promise.all([createChannel(), createChannel()]);
        // return channels;
    })

    .then(async (channels) => {
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        await publisherChannel.assertQueue(q).then(function(ok) {
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        await consumerChannel.assertQueue(q).then(function(ok) {
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

И на этот раз я получил эту ошибку вместо:

{ Error: Socket closed abruptly during opening handshake
    at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                           
    at Socket.emit (events.js:202:15)
    at endReadableNT (_stream_readable.js:1129:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Socket closed abruptly during opening handshake
       at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                        
       at Socket.emit (events.js:202:15)
       at endReadableNT (_stream_readable.js:1129:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }

Я не совсем уверен, но яЯ думаю, что ошибка связана с Она может быть связана с https://github.com/squaremo/amqp.node/issues/101.

Что я хочу

Я хочу, чтобы обходной путь / решение переподключилось к rabbitmq после перезапуска сервера.Любое объяснение / предложение также приветствуется.

Редактировать

Я пытаюсь пойти глубже и немного изменить свой код:

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

async function createConnection() {
    console.log("connect");
    const conn = amqplib.connect("amqp://root:toor@0.0.0.0:5672/");
    console.log("connected");
    return conn;
}

async function createChannel(conn) {
    console.log("create channel");
    const channel = conn.createChannel({durable: false});
    console.log("channel created");
    return channel;
}

async function createConnectionAndChannel() {
    const conn = await createConnection();
    const channel = await createChannel(conn);
    return channel;
}

Promise.all([createConnectionAndChannel(), createConnectionAndChannel()])

    .then((channels) => {

        // Let's say rabbitmq is down, and then up again
        console.log("restart server");
        execSync("docker stop rabbitmq");
        execSync("docker start rabbitmq");
        console.log("server restarted");

        return Promise.all([createConnectionAndChannel(), createConnectionAndChannel()]);
        // return channels;
    })

    .then(async (channels) => {
        console.log("channels created");
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        console.log("publish");
        await publisherChannel.assertQueue(q).then(function(ok) {
            console.log("published");
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        console.log("consume");
        await consumerChannel.assertQueue(q).then(function(ok) {
            console.log("consumed");
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

И я получаю этот вывод:

connect
connected
connect
connected
create channel
channel created
create channel
channel created
restart server
server restarted
connect
connected
connect
connected
{ Error: Socket closed abruptly during opening handshake
    at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)                                                                                             
    at Socket.emit (events.js:202:15)
    at endReadableNT (_stream_readable.js:1129:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Socket closed abruptly during opening handshake
       at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)                                                                                          
       at Socket.emit (events.js:202:15)
       at endReadableNT (_stream_readable.js:1129:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }

Итак, я полагаю, что amqplib может повторно подключиться , но не может создать канал .

1 Ответ

0 голосов
/ 22 февраля 2019

Наконец мне удается найти ответ:

const { execSync } = require("child_process");
const amqp = require("amqplib");

async function sleep(delay) {
    return new Promise((resolve, reject) => {
        setTimeout(resolve, delay);
    });
}

async function createChannel(config) {
    const { url, publishers, listeners } = Object.assign({url: "", publishers: {}, listeners: {}}, config);
    try {
        // create connection
        const connection = await amqp.connect(url);
        let channel = null;
        connection._channels = [];
        connection.on("error", (error) => {
            console.error("Connection error : ", config, error);
        });
        connection.on("close", async (error) => {
            if (channel) {
                channel.close();
            }
            console.error("Connection close : ", config, error);
            await sleep(1000);
            createChannel(config);
        });
        // create channel
        channel = await connection.createConfirmChannel();
        channel.on("error", (error) => {
            console.error("Channel error : ", config, error);
        });
        channel.on("close", (error) => {
            console.error("Channel close : ", config, error);
        });
        // register listeners
        for (queue in listeners) {
            const callback = listeners[queue];
            channel.assertQueue(queue, { durable: false });
            channel.consume(queue, callback);
        }
        // publish
        for (queue in publishers) {
            const message = publishers[queue];
            channel.assertQueue(queue, { durable: false });
            channel.sendToQueue(queue, message);
        }
        return channel;
    } catch (error) {
        console.error("Create connection error : ", error);
        await sleep(1000);
        createChannel(config);
    }
}

async function main() {
    // publish "hello" message to queue
    const channelPublish = await createChannel({
        url: "amqp://root:toor@0.0.0.0:5672",
        publishers: {
            "queue": Buffer.from("hello"),
        }
    });

    // restart rabbitmq
    execSync("docker stop rabbitmq");
    execSync("docker start rabbitmq");

    // consume message from queue
    const channelConsume = await createChannel({
        url: "amqp://root:toor@0.0.0.0:5672",
        listeners: {
            "queue": (message) => {
                console.log("Receive message ", message.content.toString());
            },
        }
    });

    return true;
}

main().catch((error) => console.error(error));

По сути, я присоединяю канал к соединению.Поэтому всякий раз, когда возникает ошибка соединения (например, сервер rabbitmq выключен), программа будет ждать секунду и пытаться создать новое соединение.

Недостаток: я потеряю ссылку на старое соединениеи это канал.

Чтобы преодолеть эту проблему, я должен хранить информацию об очереди, издателях и потребителях где-то еще (в данном случае я помещаю ее как параметр createChannel).

Наконец, каждый раз, когда я воссоединяюсь, я также создаю канал и создаю каждого издателя и потребителя.

Не совсем удобно, но, по крайней мере, он работает так, как задумано.

...