Отправка событий транзакционно в logstash - PullRequest
0 голосов
/ 28 декабря 2018

Я пытаюсь использовать logstash для получения событий от сокета TCP и вывода их в тему Kafka.Моя текущая конфигурация способна сделать это отлично, но я хочу иметь возможность проводить события для Кафки транзакционным способом.Я имею в виду, что система не должна отправлять события в kafka, пока не будет получено сообщение о коммите:

START TXN 123         --No message sent to Kafka
123 - Event1 Message  --No message sent to Kafka
123 - Event2 Message  --No message sent to Kafka
123 - Event3 Message  --No message sent to Kafka
COMMIT TXN 123           --Event1, Event2, Event3 messages sent to Kafka

Есть ли возможность достичь этого только с помощью logstash или я должен ввести какой-либо другой координатор транзакций между источником иlogstash?Вот мой текущий конфиг:

input {
  tcp {
    port => 9000
  }
}

output {
  kafka { 
    bootstrap_servers => "localhost:9092"
    topic_id =>  "alpayk"
  }
}

Я пытался использовать для этой цели агрегатный фильтр logstash, но у меня не получилось что-то сработать.

Большое спасибозаранее

1 Ответ

0 голосов
/ 08 января 2019

Я наконец решил использовать Apache Flume для этой цели.Я изменил его источник netcat так, чтобы незафиксированные сообщения находились в куче flume, и как только будет получено сообщение о подтверждении транзакции, все сообщения будут отправлены в приемник kafka.

Я собираюсь изменить сообщениехранение местоположения из кучи потока во внешний кэш, так что я смогу истечь хранимые сообщения, если транзакция прерывается или откатывается.

Ниже приведен мой фрагмент кода для логики транзакций:

String eventMessage = new String(body);
int indexOfTrxIdSeparator = eventMessage.indexOf("-");
if (indexOfTrxIdSeparator != -1) {
    String txnId = eventMessage.substring(0, indexOfTrxIdSeparator).trim();
    String message = eventMessage.substring(indexOfTrxIdSeparator + 1).trim();
    ArrayList<Event> events = cachedEvents.get(txnId);

    if (message.equals("COMMIT")) {

        System.out.println("@@@@@ COMMIT RECEIVED");

        if (events != null) {
            for (Event eventItem : events) {
                ChannelException ex = null;
                try {
                    source.getChannelProcessor().processEvent(eventItem);
                } catch (ChannelException chEx) {
                    ex = chEx;
                }

                if (ex == null) {
                    counterGroup.incrementAndGet("events.processed");
                } else {
                    counterGroup.incrementAndGet("events.failed");
                    logger.warn("Error processing event. Exception follows.", ex);
                }
            }

            cachedEvents.remove(txnId);
        }
    } else {
        System.out.println("@@@@@ MESSAGE RECEIVED: " + message);
        if (events == null) {
            events = new ArrayList<Event>();
        }
        events.add(EventBuilder.withBody(message.getBytes()));
        cachedEvents.put(txnId, events);
    }
}

Я добавил этот код в processEvents метод источника Flucat для netcat.Я не хотел работать с кодом Ruby, поэтому решил перейти на Flume.Однако то же самое можно сделать и в logstash.

Спасибо

...