У меня есть несколько заданий Flink, которые используют kafka в качестве источника и приемника, и я хочу добавить к нему трассировку, чтобы любое сообщение, полученное / произведенное из / в Kafka, было хорошо прослежено, для этого я использую перехватчики kafka для перехвата сообщения и журнал трассировки, span и parent traceId, для этого я использую opentracing-kafka-client (v0.1.11) в сочетании с brave-opentracing (v0.35.1), поэтому я использую пользовательские перехватчики, потому что мне нужно регистрировать сообщения в указанном формате.
После настройки перехватчиков они запускаются, и он использует информацию трассировки (из заголовков), поступающую из вышестоящей системы, и регистрирует ее, но когда дело доходит до повторного создания сообщения для kafka, затем контекст трассировки Например, потерян, рассмотрите следующий сценарий
1) Сообщение, помещенное на Kafka какой-либо службой отдыха 2) Сообщение, принятое заданием flink и перехватчиками, запускает и использует информацию трассировки из заголовка и регистрирует ее 3) После обработки сообщения генерируется от работы Flink к Кафке
Хорошо работает il шаг # 2, но когда дело доходит до создания сообщения, тогда информация трассировки из предыдущего шага не используется, потому что у него нет никакой информации заголовков, и, следовательно, он производит совершенно новую трассировку.
Я регистрирую трассировщик, как показано ниже: -
public class MyTracer {
private static final Tracer INSTANCE = BraveTracer.create(Tracing.newBuilder().build());
public static void registerTracer() {
GlobalTracer.registerIfAbsent(INSTANCE);
}
public static Tracer getTracer() {
return INSTANCE;
}
}
А я пользуюсь TracingConsumerInterceptor
и TracingProducerInterceptor
от открытой кафки.