Резервный механизм, когда Rabbitmq выходит из строя - PullRequest
0 голосов
/ 26 сентября 2019

Приложение My Spring Boot отправляет события в Timescale через RabbitMQ.Мне нужно знать, как сохранить мои события, если RabbitMQ выходит из строя.

Подробно:

Сообщения, публикуемые RabbitMQ, являются постоянными.Когда брокер сообщений выходит из строя, события не публикуются, и я планирую сохранить эти события в базе данных и снова опубликовать их в RabbitMQ, когда он будет запущен.Любое решение и предложение приветствуется.

1 Ответ

0 голосов
/ 27 сентября 2019

Чтобы немного расширить мой комментарий: на самом деле нет необходимости использовать базу данных для постоянной буферизации.

РЕДАКТИРОВАТЬ : Кроме того, сама причина невозможности отправки сообщенийк RabbitMQ вполне может быть потерянное сетевое соединение.В этом случае большинство СУБД будут бесполезны. Конец редактирования.

package io.mahlberg.stackoverflow.questions.objpersistencedemo;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;

public class PersistenceDemo {

  /*
   * A decent default number of objects to be buffered.
   */
  private static long COUNT = 10000L;

  /*
   * Default filename
   */
  private static final String OBJ_DAT = "./obj.dat";

  private static FileOutputStream   fos;
  private static ObjectOutputStream oos;

  private static FileInputStream   fis;
  private static ObjectInputStream ois;

  private static File dat;

  private static Object lock;

  public static void main(String[] args) throws InterruptedException, IOException {

    // Get the actual number of counts
    if (args[0] != null) {
      COUNT = Long.parseLong(args[0]);
    }

    // Initialize out lock
    lock = new Object();

    // Ensure the datafile exists.
    dat = new File(OBJ_DAT);
    dat.createNewFile();

    // Initialize our streams.
    try {
      fos = new FileOutputStream(dat);
    } catch (Exception e1) {
      e1.printStackTrace();
      System.exit(1);
    }

    oos = new ObjectOutputStream(fos);

    // Define the writer thread.
    Thread writer = new Thread(new Runnable() {

      public void run() {
        Data obj;

        // Make sure we have the behaviour of the queue.
        synchronized (lock) {

          for (int i = 0; i < COUNT; i++) {
            obj = new Data(String.format("Obj-%d", i), new Date());
            try {
              oos.writeObject(obj);
              oos.flush();
              fos.flush();

              // Notify the reader...
              lock.notify();
            } catch (IOException e1) {
              // TODO Auto-generated catch block
              e1.printStackTrace();
            }
            try {
              // ... and wait until the reader is finished.
              lock.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              break;
            }
          }
          // We need to notify the reader one last time for the last
          // Object we put into the stream.
          lock.notify();
        }

      }
    });

    // Initialize the streams used by reader.
    fis = new FileInputStream(dat);
    ois = new ObjectInputStream(fis);

    Thread reader = new Thread(new Runnable() {

      public void run() {
        Data obj;
        while (true) {
          synchronized (lock) {

            try {
              obj = (Data) ois.readObject();

              // Notify writer we are finished with reading the latest entry...
              lock.notify();
            } catch (ClassNotFoundException e1) {
              e1.printStackTrace();
            } catch (IOException e1) {
              break;
            }

            try {
              // ...and wait till writer is done writing.
              lock.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              break;
            }

          }
        }
      }
    });

    // For doing a rough performance measurement.
    Instant start = Instant.now();
    writer.start();
    reader.start();

    // Wait till both threads are done.
    writer.join();
    reader.join();

    Instant end = Instant.now();
    Duration timeElapsed = Duration.between(start, end);
    System.out.format("Took %sms for %d objects\n", timeElapsed.toMillis(), COUNT);
    System.out.format("Avg: %.3fms/object\n", ((double) timeElapsed.toMillis() / COUNT));

    // Cleanup
    oos.close();
    fos.close();
    ois.close();
    fis.close();
  }

}

Обычно мы используем synchronize, notify и wait для эмуляции буфера FIFO с файлом.

Пожалуйстаобратите внимание, что я взял несколько ярлыков для краткости и читабельности, но я думаю, вы поняли картину.Читатель должен время от времени проверять размер файла (как часто это зависит от размера ваших данных) и обрезать файл, а обработка ошибок практически не существует.Я создал банку из этого класса и класса данных, и вот несколько примеров результатов:

$ for i in {1..10}; do java -jar target/objpersistencedemo-0.0.1-SNAPSHOT.jar 20000; done
20000
Took 1470ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1510ms for 20000 objects
Avg: 0,076ms/object
20000
Took 1614ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1600ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1626ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1620ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1489ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1604ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1632ms for 20000 objects
Avg: 0,082ms/object
20000
Took 1564ms for 20000 objects
Avg: 0,078ms/object

Обратите внимание, что эти значения предназначены для записи и чтения.Я предполагаю, что менее 0,1 мс / объект немного быстрее, чем запись и последующее чтение из СУБД.

Если вы позволите вашему читателю отправлять сообщения в экземпляр RabbitMQ и добавляете небольшое усечениеи логику отката, вы можете гарантировать, что все ваши события находятся либо в буферном файле, либо записаны в RabbitMQ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...