Запись записей в снежинку от Кафки - PullRequest
0 голосов
/ 15 января 2020

Я использую соединитель Kafka Connect для Snowflake, где я получаю небольшие партии записей в своей топике c, а затем записываю их в сценические таблицы в Snowflake. Это прекрасно работает.

Затем я хотел бы обработать записи для каждой микробатки сразу после , когда они записываются в Snowflake управляемым событиями способом. То есть, не запуская код по расписанию. В идеале, разъем Kafka должен запускать обработку, передавая имя topi c, а также начальное и конечное смещения для микробакта.

Есть ли лучший способ добиться этого? Я не думаю, что это изначально поддерживается в коннекторе Kafka, но я подумал, что могу прослушать _consumer_offsets topi c, и когда коннектор передает смещения брокеру, это будет сигнализировать мой код начать обработку.

/ Джо

1 Ответ

0 голосов
/ 19 января 2020

Потребление и последующие записи из _consumer_offsets могут плохо масштабироваться, так как количество шума (от несвязанных потребителей и тем) увеличивается в использовании вашей организацией Kafka.

Вероятно, есть несколько способов добиться этого, но самое близкое к событию, которое указывает на то, что запись в Snowflake завершена и готова к фиксации , можно получить, подключившись непосредственно к интерфейсу Kafka Connect.

Per Kafka Connect архитектура и реализация Snowflake API-интерфейсов Connect, легкого класса-обертки, который выполняет дополнительные логи c после каждого завершенного события flu sh.

Вызов preCommit в каждом выполняемом объекте задачи вызывается как часть непрерывного l oop, обычно один раз в минуту (или длины "" offset.flu sh .interval.ms "). A postCommit было бы самым идеальным, но его пока нет в API.

Очень базовый c пример, который подключает ваши логи c к preCommit API-интерфейсу. Будет выглядеть так:

class WrappedSnowflakeSinkTask extends com.snowflake.kafka.connector.SnowflakeSinkTask {

  @Override
  public Map<TopicPartition, OffsetAndMetadata> preCommit(
    Map<TopicPartition, OffsetAndMetadata> offsets)
    throws RetriableException {

    Map<TopicPartition, OffsetAndMetadata> offsets = super.preCommit();

    // Invoke Snowflake processing logic here using the offsets object,
    // preferably in an asynchronous manner (submit a callable to a threadpool, etc.) and
    // with an initial submit/sleep delay of over 1-2 minutes or "offset.flush.timeout.ms",
    // whichever is higher, to account for potential commit timeouts after this returns.
    doOwnLogic(offsets, …);

    return offsets;
  }

}

Также напишите класс соединителя, очень простую обертку c, которая переопределяет только метод класса SinkTask для реализации созданного выше класса:

class WrappedSnowflakeConnector extends com.snowflake.kafka.connector.SnowflakeSinkConnector {
  @Override
  public Class<? extends Task> taskClass() {
    return WrappedSnowflakeSinkTask.class;
  }
}

Создайте и разверните упакованный соединитель (WrappedSnowflakeConnector) вместо оригинального Snowflake Connector.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...