Storm-kafka: установите startOffsetTime в kafka.api.OffsetRequest.LatestTime в apache топологии Flux Yaml - PullRequest
0 голосов
/ 27 мая 2018

Я работаю над топологией, используя Apache Flux.В настоящее время strom получает сообщения с самого начала, но я хочу, чтобы он получал только последние сообщения от kafka.

Я пишу топологию в файле YAML.

Вот так выглядит мой spoutConfig:

  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "172.25.33.191:2181"

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      - ref: "zkHosts"
      - "blockdata"
      - ""
      - "myId"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

      - name: "ignoreZkOffsets"
        value: true

      - name: "startOffsetTime"
        ref: "XXXXXXXXX"

Теперь я застрял.Как мне установить startOffsetTime на правильную функцию, чтобы получать только последние сообщения от kafka?

Я пробовал ref: "LatestTime", но что бы я там ни указывал, он выдает ошибку:

java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

1 Ответ

0 голосов
/ 27 мая 2018

Я считаю, что Flux может обрабатывать вызовы статических фабричных методов.

- id: "startingOffsetTime"
  className: "kafka.api.OffsetRequest"
  factory: "LatestTime"

, а затем использовать его в своем определении SpoutConfig, например

properties:
  - name: "startOffsetTime"
    ref: "startingOffsetTime"

Я не проверял это, но думаю,он должен работать.Возможность вызывать статические фабричные методы была объединена некоторое время назад https://issues.apache.org/jira/browse/STORM-2796,, но, похоже, отсутствует в документации.Я поднял вопрос об обновлении документов https://issues.apache.org/jira/browse/STORM-3086.

Если вы хотите увидеть пример этой функции, взгляните на https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38

...