Flink Генерация потока Dynami c из потока GenericRecord - PullRequest
1 голос
/ 20 февраля 2020

У меня есть случай использования, когда несколько записей Avro поступают в одной топике Kafka c, когда мы подаем в суд на TopicRecordNameStrategy за предмет в реестре схемы.

Теперь я написал потребителя для чтения то топи c и создайте поток данных GenericRecord. Теперь я не могу погрузить этот поток в hdfs / s3 в формате паркета, так как этот поток содержит различные типы записей схемы. Поэтому я фильтрую разные записи для каждого типа, применяя фильтр и создавая разные потоки, а затем погружая каждый поток отдельно.

Ниже приведен код, который я использую --- ``

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.util.List;
import java.util.Properties;

public class EventStreamProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventStreamProcessor.class);
    private static final String KAFKA_TOPICS = "events";
    private static Properties properties = new Properties();
    private static String schemaRegistryUrl = "";

    private static CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);

    public static void main(String args[]) throws Exception {

        ParameterTool para = ParameterTool.fromArgs(args);
        InputStream inputStreamProperties = EventStreamProcessor.class.getClassLoader().getResourceAsStream(para.get("properties"));
        properties.load(inputStreamProperties);
        int numSlots = para.getInt("numslots", 1);
        int parallelism = para.getInt("parallelism");
        String outputPath = para.get("output");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.getConfig().enableForceAvro();

        env.enableCheckpointing(60000);

        ExecutionConfig executionConfig = env.getConfig();
        executionConfig.disableForceKryo();
        executionConfig.enableForceAvro();

        FlinkKafkaConsumer kafkaConsumer010 = new FlinkKafkaConsumer(KAFKA_TOPICS,
                new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

        Path path = new Path(outputPath);

        DataStream<GenericRecord> dataStream = env.addSource(kafkaConsumer010).name("bike_flow_source");

        try {
            final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_list")))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_list")) {
                    return true;
                }
                return false;
            }).addSink(sink).name("search_list_sink").setParallelism(parallelism);


            final StreamingFileSink<GenericRecord> sink_search_details = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_details")))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_details")) {
                    return true;
                }
                return false;
            }).addSink(sink_search_details).name("search_details_sink").setParallelism(parallelism);



            final StreamingFileSink<GenericRecord> search_list = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_list")))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_list")) {
                    return true;
                }
                return false;
            }).addSink(search_list).name("search_list_sink").setParallelism(parallelism);


        } catch (Exception e) {
            LOGGER.info("exception in sinking event");
        }
        env.execute("event_stream_processor");
    }
}
``

Так что это выглядит очень неэффективно для меня, как 1. Каждый раз, когда добавляется новое событие, я должен сделать код изменение. 2. Я должен создать несколько потоков с помощью фильтра и все.

Поэтому, пожалуйста, предложите мне, можно ли написать поток GenericRecord без создания нескольких потоков. Если нет, то как я могу сделать этот код более динамичным c, используя какой-нибудь конфигурационный файл, чтобы каждый раз мне не приходилось снова писать тот же код для нового события?

Пожалуйста, предложите лучший способ решения этой проблемы. .

Ответы [ 2 ]

2 голосов
/ 21 февраля 2020

Ну, вы можете просто передать список возможных типов сообщений в качестве параметра конфигурации, а затем просто перебрать это. У вас будет что-то вроде этого:

messageTypes.foreach( msgType => {
    final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(msgType)))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(Constants.EVENT_NAME).toString().equals(msgType)) {
                    return true;
                }
                return false;
            }).addSink(sink).name(msgType+"_sink").setParallelism(parallelism);

}})

Это будет означать, что вам нужно только перезапустить задание с измененным конфигом, когда придет новый тип сообщения.

0 голосов
/ 24 февраля 2020

Я пытаюсь это нравится, но это не работает ....


        for (EventConfig eventConfig : eventTypesList) {
            LOGGER.info("creating a stream for ", eventConfig.getEvent_name());
            String key = eventConfig.getEvent_name();
            final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
                    return true;
                }
                return false;
            });

            Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
            streamMap.put(key, tuple2);
        }


        DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
        searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));

Пожалуйста, предоставьте правильный способ добиться этого.

Спасибо.

...