Я наконец решил использовать Apache Flume для этой цели.Я изменил его источник netcat так, чтобы незафиксированные сообщения находились в куче flume, и как только будет получено сообщение о подтверждении транзакции, все сообщения будут отправлены в приемник kafka.
Я собираюсь изменить сообщениехранение местоположения из кучи потока во внешний кэш, так что я смогу истечь хранимые сообщения, если транзакция прерывается или откатывается.
Ниже приведен мой фрагмент кода для логики транзакций:
String eventMessage = new String(body);
int indexOfTrxIdSeparator = eventMessage.indexOf("-");
if (indexOfTrxIdSeparator != -1) {
String txnId = eventMessage.substring(0, indexOfTrxIdSeparator).trim();
String message = eventMessage.substring(indexOfTrxIdSeparator + 1).trim();
ArrayList<Event> events = cachedEvents.get(txnId);
if (message.equals("COMMIT")) {
System.out.println("@@@@@ COMMIT RECEIVED");
if (events != null) {
for (Event eventItem : events) {
ChannelException ex = null;
try {
source.getChannelProcessor().processEvent(eventItem);
} catch (ChannelException chEx) {
ex = chEx;
}
if (ex == null) {
counterGroup.incrementAndGet("events.processed");
} else {
counterGroup.incrementAndGet("events.failed");
logger.warn("Error processing event. Exception follows.", ex);
}
}
cachedEvents.remove(txnId);
}
} else {
System.out.println("@@@@@ MESSAGE RECEIVED: " + message);
if (events == null) {
events = new ArrayList<Event>();
}
events.add(EventBuilder.withBody(message.getBytes()));
cachedEvents.put(txnId, events);
}
}
Я добавил этот код в processEvents
метод источника Flucat для netcat.Я не хотел работать с кодом Ruby, поэтому решил перейти на Flume.Однако то же самое можно сделать и в logstash.
Спасибо