Реестр Kafka Schema - реклама динамического порта в контейнере - PullRequest
0 голосов
/ 24 октября 2019

Как я понимаю, используя образ док-станции слияния реестра Schema (не с упругостью zookeeper, но с упругостью kafka), мы можем объявить имя хоста контейнера для Kafka с помощью переменной SCHEMA_REGISTRY_HOST_NAME env.

Если я пытаюсь использовать SCHEMA_REGISTRY_PORT, я получаю следующую ошибку:

PORT is deprecated. Please use SCHEMA_REGISTRY_LISTENERS instead.

Почему мы не можем установить связанный порт? Я могу получить этот динамический порт (динамический порт, который хост-машина динамически отображал в моем контейнере), но как мне поделиться им с Kafka?

РЕДАКТИРОВАТЬ 1:

Чтобы добавить больше деталей,Вот пример назначения, сделанного координатором реестра схемы:

[2019-10-25 11:55:47,813] INFO Finished rebalance with master election result: Assignment{version=1, error=0, master='sr-1-7a9a403a-63cc-4fed-b548-10ea440863d5', masterIdentity=version=1,host=10.135.124.179,port=29932,scheme=http,masterEligibility=true} (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector)

Как вы можете видеть, есть имя хоста и порт (выделено жирным шрифтом). имя хоста происходит из переменной SCHEMA_REGISTRY_HOST_NAME, но согласно коду отсюда port :

/**
   * A Schema Registry instance's identity is in part the port it listens on. Currently the port can
   * either be configured via the deprecated `port` configuration, or via the `listeners`
   * configuration.
   *
   * <p>This method uses `Application.parseListeners()` from `rest-utils` to get a list of
   * listeners, and returns the port of the first listener to be used for the instance's identity.
   *
   * <p></p>In theory, any port from any listener would be sufficient. Choosing the first, instead
   * of say the last, is arbitrary.
   */
  // TODO: once RestConfig.PORT_CONFIG is deprecated, remove the port parameter.
  static SchemeAndPort getSchemeAndPortForIdentity(int port, List<String> configuredListeners,
                                                   String requestedScheme)

(https://github.com/confluentinc/schema-registry/blob/5af0ca3be1138fe483d0f90f4ccfd4f02f158334/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java#L211-L223)

Таким образом, единственный способ объявить порт - настроить его с помощью прослушивателей, что может раздражать (но все же возможно в качестве обходного пути).

Ответы [ 2 ]

0 голосов
/ 28 октября 2019

Хорошо, я сделал рабочий файл шаблона Nomad, возможно, это может помочь. Обходной путь должен использовать тот же динамический порт, который кочевник использует для отображения портов внутри контейнера, следующим образом:

job "schema-registry" {
  datacenters = ["YOURDC"]
  type = "service"

  # Update strategy
  update {
    # Max instances (task groups) to be updated in parallel.
    max_parallel = 1

    # Once an allocation finishes, wait min_healthy_time until starting next one.
    min_healthy_time = "10s"

    # If allocation not healthy after healthy_deadline, mark as unhealthy.
    healthy_deadline = "3m"

    # If allocation unhealthy after progress_deadline, fail the deployment.
    progress_deadline = "10m"

    # Should auto revert to previous version if the deployment fails?
    auto_revert = false

    # Create n canaries.
    canary = 0
  }

  spread {
    attribute = "${attr.unique.hostname}"
    weight    = 100
  }

  migrate {
    # As in update stanza.
    max_parallel = 1

    # "checks" for health checks or "task_state" for task state.
    health_check = "checks"

    # As in update stanza.
    min_healthy_time = "10s"

    # As in update stanza.
    healthy_deadline = "5m"
  }

  # The "group" stanza defines a series of tasks that should be co-located on
  # the same Nomad client.
  group "schema-registry-group" {

    # count of instances of the "schema-registry" task group
    count = 3

    restart {
      # The number of attempts to run the job within the specified interval.
      attempts = 2
      interval = "30m"

      # The "delay" parameter specifies the duration to wait before restarting
      # a task after it has failed.
      delay = "5s"

      # What if after a few restats within `interval` the `attempts` limit is meet?
      # - "delay" mode delays the next restart until the next interval,
      # - "fail" mode does not restart the task.
      mode = "fail"
    }

    # Use ephemeral disk shared between tasks instead of HDD
    ephemeral_disk {
      size = 300
    }

    task "schema-registry" {
      driver = "docker"

      # Driver (docker) settings.
      config {
        image = "confluentinc/cp-schema-registry"

        # We cannot use this as schema registry will use the LISTENERS config to advertize its port..
        #port_map {
        #  schema_registry_port = 8081
        #}
      }

      # Time to shut down after SIGINT.
      # Caution! If want to set higher than 30s,
      # make sure max_kill_timeout allows that.
      kill_timeout = "30s"
      kill_signal = "SIGINT"
      shutdown_delay = "2s"

      env {
        SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="YOURBOOTSTRAPSERVER"
        SCHEMA_REGISTRY_HOST_NAME="${NOMAD_IP_schema_registry_port}"
        SCHEMA_REGISTRY_LISTENERS="http://0.0.0.0:${NOMAD_HOST_PORT_schema_registry_port}"
      }

      # Max required storage = (max_files * 2) * max_file_size
      # *2 because there's a log file for stderr and stdout
      logs {
         max_files     = 10
         max_file_size = 15
       }

      # Required resources.
      resources {
        cpu    = 500 # 500 MHz
        memory = 512
        network {
          port "schema_registry_port" {} # defined in port_map
        }
      }

      service {
        name = "schema-registry"
        tags = ["schema-registry"]
        port = "schema_registry_port"
        check {
          name     = "alive"
          type     = "tcp"
          interval = "60s"
          timeout  = "4s"
        }
      }
    }
  }
}
0 голосов
/ 24 октября 2019

listeners принимает адрес привязки и порт (ы) , и, как вы обнаружили, объявленный адрес равен the port of the first listener to be used for the instance's identity

Список слушателей, разделенных запятыми, которыепрослушивать запросы API через HTTP или HTTPS. Если прослушиватель использует HTTPS, необходимо также настроить соответствующие параметры конфигурации SSL.

Схемы Идентификаторы реестра хранятся в ZooKeeper и состоят из имени хоста и порта. Если настроено несколько прослушивателей, для идентификации используется первый порт прослушивателя.

Тип: список
По умолчанию: "http://0.0.0.0:8081"

https://docs.confluent.io/current/schema-registry/installation/config.html#listeners

...