Как указать идентификатор группы потребителя kafka для структурированной потоковой передачи искры? - PullRequest
3 голосов
/ 01 августа 2020

Я хотел бы запустить 2 задания структурированной потоковой передачи искр в одном кластере emr для использования одного и того же kafka topi c. Оба задания находятся в рабочем состоянии. Однако только одно задание может получить данные кафки. Моя конфигурация для части kafka следующая:

        .format("kafka")
        .option("kafka.bootstrap.servers", "xxx")
        .option("subscribe", "sametopic")
        .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "./cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
        .load()

Я не устанавливал group.id. Я предполагаю, что один и тот же идентификатор группы в двух заданиях используется для возникновения этой проблемы. Однако, когда я устанавливаю group.id, он жалуется, что «указанные пользователем группы потребителей не используются для отслеживания смещений». Как правильно решить эту проблему? Спасибо!

1 Ответ

1 голос
/ 03 августа 2020

Вам необходимо запустить Spark v3.

From https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

kafka.group.id

Идентификатор группы Kafka для использования в потребителе Kafka при чтении из Kafka. Используйте это с осторожностью. По умолчанию каждый запрос генерирует уникальный идентификатор группы для чтения данных. Это гарантирует, что каждый источник Kafka имеет свою собственную группу потребителей, которая не сталкивается с помехами со стороны других потребителей и, следовательно, может читать все разделы своих подписанных тем. В некоторых сценариях ios (например, авторизация на основе групп Kafka) вы можете захотеть использовать указанный c идентификатор авторизованной группы для чтения данных. При желании вы можете установить идентификатор группы. Однако делайте это с особой осторожностью, поскольку это может вызвать неожиданное поведение. Одновременно выполняющиеся запросы (как пакетные, так и потоковые) или источники с одним и тем же идентификатором группы, вероятно, мешают друг другу, заставляя каждый запрос читать только часть данных. Это также может произойти, когда запросы запускаются / перезапускаются в быстрой последовательности. Чтобы свести к минимуму такие проблемы, установите время ожидания сеанса потребителя Kafka (задав для параметра «kafka.session.timeout.ms») очень маленькое значение. Когда это установлено, опция groupIdPrefix игнорируется.

...