JSON Кафка носик в Apache Буря - PullRequest
0 голосов
/ 14 января 2020

Я строю топологию шторма с носиком Кафки. Я использую Kafka (без Zookeeper) в формате JSON, и Storm должен вывести его.
Как определить правильную схему для типа данных JSON? В настоящее время у меня есть такая кодовая база с реализацией базового c spout:

val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

Я новичок в Apache Storm, поэтому буду рад любым советам.

1 Ответ

1 голос
/ 16 января 2020

Вы можете сделать пару вещей:

Вы можете определить RecordTranslator. Этот интерфейс позволяет вам определить, как носик будет создавать кортеж, основываясь на ConsumerRecord, который он прочитал из Kafka.

Реализация по умолчанию выглядит следующим образом:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }

Как видите, вы получите ConsumerRecord, который является типом, встроенным в базовую клиентскую библиотеку Kafka, а затем должны превратить его в List<Object>, который будет значениями кортежа. Если вы хотите сделать что-то сложное с записью перед отправкой данных, это будет то, как вы это сделаете. Например, если вы хотите вставить ключ, значение и смещение в структуру данных, которую он затем отправил, вы можете сделать это здесь. Вы используете переводчик как KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()

Лучшая альтернатива, если вы хотите десериализовать ключ / значение только в один из ваших собственных классов данных, это реализовать Deserializer. Это позволит вам определить, как десериализовать ключ / значение, которое вы получаете от Kafka. Если вы хотите десериализовать, например, значение в свой собственный класс данных, вы можете сделать это с помощью этого интерфейса.

По умолчанию StringDeserializer делает это:

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

После того, как вы создали ваш собственный Deserializer, вы используете его, делая что-то вроде KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build(). Для настройки десериализатора значения существует аналогичное свойство потребителя.

...