Управляемый канал является поточно-ориентированным в Transformer Kafka - PullRequest
0 голосов
/ 06 марта 2020

Вот мой преобразователь:

public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {

    private ManagedChannel channel;
    private InfoClient infoclient;
    private LRUCacheCollector < String,
    InfoResponse > cache;


    public DataEnricher() {}

    @Override
    public void init(ProcessorContext context) {
        channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
        infoclient = new InfoClient(channel);
    }

    @Override
    public KeyValue < byte[],
    EnrichedData > transform(byte[] key, EnrichedData request) {
        InfoResponse infoResponse = null;
        String someInfo = request.getSomeInfo();
        try {
            infoResponse = infoclient.getMoreInfo(someInfo);
        } catch (Exception e) {
            logger.warn("An exception has occurred during retrieval.", e.getMessage());
        }
        EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
        return new KeyValue < > (key, enrichedData);
    }

    @Override
    public KeyValue < byte[],
    DataEnricher > punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {
        client.shutdown();
    }
}

В потоках Kafka каждый поток потока инициализирует свою собственную копию топологии потока, а затем создает экземпляр этой топологии для ProcessorContext, то есть для каждой задачи, то есть для каждого раздела. Так не будет ли init() вызываться и перезаписывать / пропускать канал для каждого раздела, а поскольку у нас есть несколько потоков, даже если мы создадим channel/client? Есть ли способ предотвратить это?

это вызывается в методе run():

public KafkaStreams createStreams() {
    final Properties streamsConfiguration = new Properties();
    //other configuration is setup here
    streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    streamsConfiguration.put(
        StreamsConfig.NUM_STREAM_THREADS_CONFIG,
        3);

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    RequestJsonSerde requestSerde = new RequestJsonSerde();
    DataEnricher dataEnricher = new DataEnricher();
    // Get the stream of requests
    final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
        .stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
    final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
        .filter((key, request) - > {
            return Objects.nonNull(request);
        }
        .transform(() - > dataEnricher);

    enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));

    return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}

Ответы [ 2 ]

1 голос
/ 06 марта 2020

Не относится к ManagedChannel, но вы должны предоставить новый момент DataEnricher на ProcessContext в TransformerSupplier.

KStream.transform(DataEnricher::new);

Как только я столкнусь с некоторыми исключениями потока Кафки, связанными с этим, произойдет попытайтесь воссоздать его.

И IMO, если вы не используете пунктуацию для отправки большего количества записей в нисходящий поток, а новый ключ совпадает с входной записью, которую вы должны использовать transformValues(), так как transform() может привести к разбиение при использовании операции на основе ключей, такой как агрегирование, применяется объединение.

1 голос
/ 06 марта 2020

Я предполагаю, что TransformerSupplier создает один Transformer экземпляр для топологии (или ProcessorContext) и, следовательно, один channel для топологии. В этом случае нет опасности перезаписать channel. Также я предполагаю, что ваш client.shutdown() также закрывает свой канал.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...