Получить схему из сообщений parquet-avro, извлеченных из kafka - PullRequest
1 голос
/ 26 сентября 2019

Используя образцы из разных источников, я написал этот метод (соответствующий раздел показан ниже), где я извлекаю сообщения parquet-avro от kafka для тестового приложения.Основываясь на коде, который я смог найти, чтобы заставить это работать (некоторые из которых http://aseigneurin.github.io/2016/03/04/kafka-spark-avro-producing-and-consuming-avro-messages.html),, я использую переданную схему вместо схемы, извлеченной из самих сообщений. Я что-то пропустил или я мог извлечьсхема из каждого сообщения вместо необходимости передавать его. Я новичок во всем этом, поэтому я хотел быть уверен, что я делаю это наилучшим образом.


import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

....

  public List awaitAndConsumeParquet(String topic, List fieldValues, Schema avroSchema, String field, int minutesTimeout)
            throws InterruptedException {

        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(topic));

        List<String> foundValues = new ArrayList<>();

        long startTime = System.currentTimeMillis();
        long elapsedTime;
        while (true) {

            ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, byte[]> record : consumerRecords) {

                Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);
                GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                String k = genericRecord.get(field).toString();
                if (fieldValues.contains(k)) {
                    foundValues.add(k);
                }
            }

            consumer.commitAsync();

...

Звонящий:


....
        TestConsumer tc = new TestConsumer();
        tc.setBootstrapServers("localhost:9092");
        tc.setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer");
        tc.setValueDeserializer("org.apache.kafka.common.serialization.ByteArrayDeserializer");
        tc.setGroupId("consumerGroup1");


        // await expected data - provide jsonpath to use to query expected strings from json
        List<String> notFoundMessages = tc.awaitAndConsumeParquet("demo", known_items_list, avroSchema, "known_item_key", 1);
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...