Я написал очень простой конвейер в Apchea Beam, как показано ниже, для чтения данных из моего кластера kafka о слиянии следующим образом:
Pipeline pipeline = Pipeline.create(options);
Map<String, Object> propertyBuilder = new HashMap();
propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
propertyBuilder.put("sasl.mechanism","PLAIN");
propertyBuilder.put("request.timeout.ms","20000");
propertyBuilder.put("retry.backoff.ms","500");
pipeline
.apply(KafkaIO.<byte[], byte[]>readBytes()
.withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
.withTopic("gcp-ingestion-1")
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.updateConsumerProperties(propertyBuilder)
.withoutMetadata() // PCollection<KV<Long, String>>
) .apply(Values.<byte[]>create());
Тем не менее, я получаю меньше исключений при работе над кодами для чтения данных измой кластер кафки
Я запускаю выше на прямом Java Runner, я использую луч 2.8,
Я могу читать и создавать сообщения в мой конфлюентный кластер Кафка, но не с помощью вышеуказанных кодов.