Канал kafka не работает должным образом, когда перехватчик фильтрует пустую очередь событий - PullRequest
0 голосов
/ 04 марта 2020

Сценарий:

У меня есть требование для потоковой передачи и фильтрации kafka тем в соответствии с бизнесом. Поскольку фильтр приведет к тому, что список будет пустым, весь конвейер не будет работать должным образом

logi c следующим образом:

    String key = JsonPath.read(message, "$.key");
    switch (key)
        { case "test1": return process(key, event); 
case "test2": return process(key, event); 
case "test3": return process(key, event); 
default: return null; }

Когда все данные очереди будут отфильтрованы, это конвейер (kafka->hdfs) останется в ненормальном состоянии.

Это моя конфигурация:

Flume устройства

Test.sources = r1
Test.sinks = k1
Test.channels = c1
Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
Test.sources.r1.kafka.bootstrap.servers = xxxx
Test.sources.r1.topic = xxx
Test.sources.r1.groupId = test_lab_1
Test.sources.r1.kafka.consumer.timeout.ms = 100
Test.sources.r1.interceptors = i1
Test.sources.r1.interceptors.i1.type = com.goe.DeviceUsageDeserializerInterceptor$Builder. — This is my custom interceptor

Describe the sink
Test.sinks.k1.type = hdfs
Test.sinks.k1.hdfs.path = /user/naming/%{DeviceDir}
Test.sinks.k1.hdfs.filePrefix = device-
Test.sinks.k1.hdfs.fileSuffix = .csv
Test.sinks.k1.hdfs.inUseSuffix = .tmp
Test.sinks.k1.hdfs.idleTimeout = 120
Test.sinks.k1.hdfs.writeFormat = Text
Test.sinks.k1.hdfs.batchSize = 100
Test.sinks.k1.hdfs.threadsPoolSize = 10
Test.sinks.k1.hdfs.rollSize = 0
Test.sinks.k1.hdfs.rollCount = 0
Use a channel which buffers events in memory
Test.channels.c1.type = memory
Test.channels.c1.capacity = 10000
Test.channels.c1.transactionCapacity = 1000
Bind the source and sink to the channel
Test.sources.r1.channels = c1
Test.sinks.k1.channel = c1
...