Исправлено окно Kafka Stream, которое не группировалось по ключу - PullRequest
1 голос
/ 05 мая 2020

Я получаю один поток Кафки. Как я могу накапливать сообщения для определенного c временного окна независимо от ключа?

Мой вариант использования - записывать файл каждые 10 минут из потока без учета ключа.

1 Ответ

1 голос
/ 05 мая 2020

Вам нужно будет использовать Transformer с хранилищем состояний и запланировать вызовы пунктуации на go через хранилище каждые 10 минут и передавать записи. Преобразователь должен возвращать null, когда вы собираете записи в хранилище состояний, поэтому вам также понадобится фильтр после преобразователя, чтобы игнорировать любые записи null.

Вот небольшой пример того, что, на мой взгляд, близко к тому, о чем вы просите. Дайте мне знать, как это происходит.

class WindowedTransformerExample {

  public static void main(String[] args) {
    final StreamsBuilder builder = new StreamsBuilder();

    final String stateStoreName = "stateStore";
    final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
        Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(stateStoreName),
            Serdes.String(),
            Serdes.String());

    builder.addStateStore(keyValueStoreBuilder);

       builder.<String, String>stream("topic").transform(new WindowedTransformer(stateStoreName), stateStoreName)
        .filter((k, v) -> k != null && v != null)
           // Here's where you do something with records emitted after 10 minutes
        .foreach((k, v)-> System.out.println());
  }


  static final class WindowedTransformer implements TransformerSupplier<String, String, KeyValue<String, String>> {

    private final String storeName;

    public WindowedTransformer(final String storeName) {
         this.storeName = storeName;
    }

    @Override
    public Transformer<String, String, KeyValue<String, String>> get() {
      return new Transformer<String, String, KeyValue<String, String>>() {
        private KeyValueStore<String, String> keyValueStore;
        private ProcessorContext processorContext;

        @Override
        public void init(final ProcessorContext context) {
          processorContext = context;
          keyValueStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
          // could change this to PunctuationType.STREAM_TIME if needed
          context.schedule(Duration.ofMinutes(10), PunctuationType.WALL_CLOCK_TIME, (ts) -> {
            try(final KeyValueIterator<String, String> iterator = keyValueStore.all()) {
                while (iterator.hasNext()) {
                  final KeyValue<String, String> keyValue = iterator.next();
                  processorContext.forward(keyValue.key, keyValue.value);
                }
            }
          });
        }

        @Override
        public KeyValue<String, String> transform(String key, String value) {
          if (key != null) {
            keyValueStore.put(key, value);
          }
          return null;
        }

        @Override
        public void close() {

        }
      };
    }
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...