Трассировка контекста утратила опертакинг и смелость для кафки - PullRequest
0 голосов
/ 27 февраля 2020

У меня есть несколько заданий Flink, которые используют kafka в качестве источника и приемника, и я хочу добавить к нему трассировку, чтобы любое сообщение, полученное / произведенное из / в Kafka, было хорошо прослежено, для этого я использую перехватчики kafka для перехвата сообщения и журнал трассировки, span и parent traceId, для этого я использую opentracing-kafka-client (v0.1.11) в сочетании с brave-opentracing (v0.35.1), поэтому я использую пользовательские перехватчики, потому что мне нужно регистрировать сообщения в указанном формате.

После настройки перехватчиков они запускаются, и он использует информацию трассировки (из заголовков), поступающую из вышестоящей системы, и регистрирует ее, но когда дело доходит до повторного создания сообщения для kafka, затем контекст трассировки Например, потерян, рассмотрите следующий сценарий

1) Сообщение, помещенное на Kafka какой-либо службой отдыха 2) Сообщение, принятое заданием flink и перехватчиками, запускает и использует информацию трассировки из заголовка и регистрирует ее 3) После обработки сообщения генерируется от работы Flink к Кафке

Хорошо работает il шаг # 2, но когда дело доходит до создания сообщения, тогда информация трассировки из предыдущего шага не используется, потому что у него нет никакой информации заголовков, и, следовательно, он производит совершенно новую трассировку.

Я регистрирую трассировщик, как показано ниже: -

public class MyTracer {

  private static final Tracer INSTANCE = BraveTracer.create(Tracing.newBuilder().build());

  public static void registerTracer() {
    GlobalTracer.registerIfAbsent(INSTANCE);
  }

  public static Tracer getTracer() {
    return INSTANCE;
  }
}

А я пользуюсь TracingConsumerInterceptor и TracingProducerInterceptor от открытой кафки.

...