Потребление и последующие записи из _consumer_offsets
могут плохо масштабироваться, так как количество шума (от несвязанных потребителей и тем) увеличивается в использовании вашей организацией Kafka.
Вероятно, есть несколько способов добиться этого, но самое близкое к событию, которое указывает на то, что запись в Snowflake завершена и готова к фиксации , можно получить, подключившись непосредственно к интерфейсу Kafka Connect.
Per Kafka Connect архитектура и реализация Snowflake API-интерфейсов Connect, легкого класса-обертки, который выполняет дополнительные логи c после каждого завершенного события flu sh.
Вызов preCommit
в каждом выполняемом объекте задачи вызывается как часть непрерывного l oop, обычно один раз в минуту (или длины "" offset.flu sh .interval.ms "). A postCommit
было бы самым идеальным, но его пока нет в API.
Очень базовый c пример, который подключает ваши логи c к preCommit
API-интерфейсу. Будет выглядеть так:
class WrappedSnowflakeSinkTask extends com.snowflake.kafka.connector.SnowflakeSinkTask {
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets)
throws RetriableException {
Map<TopicPartition, OffsetAndMetadata> offsets = super.preCommit();
// Invoke Snowflake processing logic here using the offsets object,
// preferably in an asynchronous manner (submit a callable to a threadpool, etc.) and
// with an initial submit/sleep delay of over 1-2 minutes or "offset.flush.timeout.ms",
// whichever is higher, to account for potential commit timeouts after this returns.
doOwnLogic(offsets, …);
return offsets;
}
}
Также напишите класс соединителя, очень простую обертку c, которая переопределяет только метод класса SinkTask для реализации созданного выше класса:
class WrappedSnowflakeConnector extends com.snowflake.kafka.connector.SnowflakeSinkConnector {
@Override
public Class<? extends Task> taskClass() {
return WrappedSnowflakeSinkTask.class;
}
}
Создайте и разверните упакованный соединитель (WrappedSnowflakeConnector) вместо оригинального Snowflake Connector.