Не могу прочитать с кафки KafkaIO в луче - PullRequest
0 голосов
/ 27 декабря 2018

Я написал очень простой конвейер в Apchea Beam, как показано ниже, для чтения данных из моего кластера kafka о слиянии следующим образом:

        Pipeline pipeline = Pipeline.create(options);

        Map<String, Object> propertyBuilder = new HashMap();
        propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
        propertyBuilder.put("sasl.mechanism","PLAIN");
        propertyBuilder.put("request.timeout.ms","20000");
        propertyBuilder.put("retry.backoff.ms","500");

        pipeline
            .apply(KafkaIO.<byte[], byte[]>readBytes()
               .withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
               .withTopic("gcp-ingestion-1")  
               .withKeyDeserializer(ByteArrayDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .updateConsumerProperties(propertyBuilder)             
               .withoutMetadata() // PCollection<KV<Long, String>>
            ) .apply(Values.<byte[]>create());

Тем не менее, я получаю меньше исключений при работе над кодами для чтения данных измой кластер кафки

Я запускаю выше на прямом Java Runner, я использую луч 2.8,

Я могу читать и создавать сообщения в мой конфлюентный кластер Кафка, но не с помощью вышеуказанных кодов.

1 Ответ

0 голосов
/ 27 декабря 2018

Если вы следуете трассировке стека, кажется, что код пытается преобразовать свойство конфигурации тайм-аута в Integer: https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L112

Но вместо этого он получает строку.Я предполагаю, что это потому, что вы установили здесь строку: propertyBuilder.put("request.timeout.ms","20000").Я предполагаю, что правильнее было бы установить его как Integer, например, как propertyBuilder.put("request.timeout.ms", 20000) (без кавычек вокруг значения тайм-аута).

У вас также могут быть похожие проблемы с другими свойствами конфигурации (например, откат повторной попытки), вам необходимо перепроверить типы свойств.

...