Чтобы немного расширить мой комментарий: на самом деле нет необходимости использовать базу данных для постоянной буферизации.
РЕДАКТИРОВАТЬ : Кроме того, сама причина невозможности отправки сообщенийк RabbitMQ вполне может быть потерянное сетевое соединение.В этом случае большинство СУБД будут бесполезны. Конец редактирования.
package io.mahlberg.stackoverflow.questions.objpersistencedemo;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
public class PersistenceDemo {
/*
* A decent default number of objects to be buffered.
*/
private static long COUNT = 10000L;
/*
* Default filename
*/
private static final String OBJ_DAT = "./obj.dat";
private static FileOutputStream fos;
private static ObjectOutputStream oos;
private static FileInputStream fis;
private static ObjectInputStream ois;
private static File dat;
private static Object lock;
public static void main(String[] args) throws InterruptedException, IOException {
// Get the actual number of counts
if (args[0] != null) {
COUNT = Long.parseLong(args[0]);
}
// Initialize out lock
lock = new Object();
// Ensure the datafile exists.
dat = new File(OBJ_DAT);
dat.createNewFile();
// Initialize our streams.
try {
fos = new FileOutputStream(dat);
} catch (Exception e1) {
e1.printStackTrace();
System.exit(1);
}
oos = new ObjectOutputStream(fos);
// Define the writer thread.
Thread writer = new Thread(new Runnable() {
public void run() {
Data obj;
// Make sure we have the behaviour of the queue.
synchronized (lock) {
for (int i = 0; i < COUNT; i++) {
obj = new Data(String.format("Obj-%d", i), new Date());
try {
oos.writeObject(obj);
oos.flush();
fos.flush();
// Notify the reader...
lock.notify();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
// ... and wait until the reader is finished.
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
// We need to notify the reader one last time for the last
// Object we put into the stream.
lock.notify();
}
}
});
// Initialize the streams used by reader.
fis = new FileInputStream(dat);
ois = new ObjectInputStream(fis);
Thread reader = new Thread(new Runnable() {
public void run() {
Data obj;
while (true) {
synchronized (lock) {
try {
obj = (Data) ois.readObject();
// Notify writer we are finished with reading the latest entry...
lock.notify();
} catch (ClassNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
break;
}
try {
// ...and wait till writer is done writing.
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
});
// For doing a rough performance measurement.
Instant start = Instant.now();
writer.start();
reader.start();
// Wait till both threads are done.
writer.join();
reader.join();
Instant end = Instant.now();
Duration timeElapsed = Duration.between(start, end);
System.out.format("Took %sms for %d objects\n", timeElapsed.toMillis(), COUNT);
System.out.format("Avg: %.3fms/object\n", ((double) timeElapsed.toMillis() / COUNT));
// Cleanup
oos.close();
fos.close();
ois.close();
fis.close();
}
}
Обычно мы используем synchronize
, notify
и wait
для эмуляции буфера FIFO с файлом.
Пожалуйстаобратите внимание, что я взял несколько ярлыков для краткости и читабельности, но я думаю, вы поняли картину.Читатель должен время от времени проверять размер файла (как часто это зависит от размера ваших данных) и обрезать файл, а обработка ошибок практически не существует.Я создал банку из этого класса и класса данных, и вот несколько примеров результатов:
$ for i in {1..10}; do java -jar target/objpersistencedemo-0.0.1-SNAPSHOT.jar 20000; done
20000
Took 1470ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1510ms for 20000 objects
Avg: 0,076ms/object
20000
Took 1614ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1600ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1626ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1620ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1489ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1604ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1632ms for 20000 objects
Avg: 0,082ms/object
20000
Took 1564ms for 20000 objects
Avg: 0,078ms/object
Обратите внимание, что эти значения предназначены для записи и чтения.Я предполагаю, что менее 0,1 мс / объект немного быстрее, чем запись и последующее чтение из СУБД.
Если вы позволите вашему читателю отправлять сообщения в экземпляр RabbitMQ и добавляете небольшое усечениеи логику отката, вы можете гарантировать, что все ваши события находятся либо в буферном файле, либо записаны в RabbitMQ.