Настройка Flink Basi c Кафка производитель потребительская с Custom Class в Java - PullRequest
0 голосов
/ 09 марта 2020

Я хотел настроить базового c производителя-потребителя с Flink на Kafka, но у меня проблемы с выдачей данных существующему потребителю через Java.

CLI решение

  1. Я настраиваю Kafka broker, используя kafka_2.11-2.4.0 zip из https://kafka.apache.org/downloads с командами

    bin/zookeeper-server-start.sh config/zookeeper.properties

    и bin/kafka-server-start.sh config/server.properties

  2. Я создаю топи c, называемые транзакциями1, используя

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions1

    Теперь я могу использовать производитель и потребитель в командной строке, чтобы увидеть, что topi c был создан и работает.

  3. Для настройки потребителя я запускаю

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions1 --from-beginning

    Теперь, если какой-либо производитель отправит данные в topi c transactions1, я увижу это в консоли потребителя.

    Я проверяю, что потребитель работает, запустив

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic transactions1

    и вводя в строке производителя следующие строки данных, которые также показывают в потребительском клиенте.

{"txnID": 1, "amt": 100.0, "account": "AC1"}

{"txnID ": 2," amt ": 10.0," account ":" AC2 "}

{" txnID ": 3," amt ": 20.0," account ":" AC3 "}

Теперь я хочу повторить шаг 3, т. Е. Производителя и потребителя, в Java коде, который является основной проблемой этого вопроса.

  1. Поэтому я настроил gradle java8 проект с build.gradle
...
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
    compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
    // https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
    compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
    compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
    compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
    compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
    compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
    compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
Я установил пользовательский класс Transactions.class, где вы можете предлагать изменения в Serialization Logi c с использованием Kryo, Protobuf или TbaseSerializer, расширяя классы, относящиеся к Flink .
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;

public class Transaction {
    public final int txnID;
    public final float amt;
    public final String account;

    public Transaction(int txnID, float amt, String account) {
        this.txnID = txnID;
        this.amt = amt;
        this.account = account;
    }


    public String toJSONString() {
        Gson gson = new Gson();
        return gson.toJson(this);
    }

    public static Transaction fromJSONString(String some) {
        Gson gson = new Gson();
        return gson.fromJson(some, Transaction.class);
    }

    public static MapFunction<String, String> mapTransactions() {
        MapFunction<String, String> map = new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value != null || value.trim().length() > 0) {
                    try {
                        return fromJSONString(value).toJSONString();
                    } catch (Exception e) {
                        return "";
                    }
                }
                return "";
            }
        };
        return map;
    }

    @Override
    public String toString() {
        return "Transaction{" +
                "txnID=" + txnID +
                ", amt=" + amt +
                ", account='" + account + '\'' +
                '}';
    }
}
Теперь пришло время использовать Flink для производства и потребления потоков в topi c transactions1.
public class SetupSpike {
    public static void main(String[] args) throws Exception {
        System.out.println("begin");
        List<Transaction> txns = new ArrayList<Transaction>(){{
            add(new Transaction(1, 100, "AC1"));
            add(new Transaction(2, 10, "AC2"));
            add(new Transaction(3, 20, "AC3"));
        }};
        // This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray 
        //via producer and then to the topic in Kafka broker 
        //and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.

        String topic = "transactions1";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", topic);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);

        // working Consumer logic below which needs edit if you change serialization
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
        myConsumer.setStartFromEarliest();     // start from the earliest record possible
        DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);

        //working Producer logic below which works if you are sinking a pre-existing DataStream
        //but needs editing to work with Java List<Transaction> datatype.
        System.out.println("sinking expanded stream");
        MapFunction<String, String> etl = new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value != null || value.trim().length() > 0) {
                    try {
                        return fromJSONString(value).toJSONString();
                    } catch (Exception e) {
                        return "";
                    }
                }
                return "";
            }
        };
        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                        try {
                            System.out.println(element);
                            return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return null;
                    }
                }, properties, Semantic.EXACTLY_ONCE);
//        stream.timeWindowAll(Time.minutes(1));
        stream.addSink(myProducer);
        JobExecutionResult execute = env.execute();

    }
}

Как видите, я не могу сделать это с предоставленным списком txns. Выше приведен рабочий код, который я мог собрать из документации Flink для перенаправления данных потока topi c и отправки данных вручную через производителя Cli. Проблема заключается в написании кода KafkaProducer в java, который отправляет данные в topi c, что дополнительно усугубляется такими проблемами, как

  1. Добавление отметок времени, водяных знаков
  2. Операции KeyBy
  3. Операции GroupBy / WindowBy
  4. Добавление пользовательских логи ETL c перед погружением.
  5. Журналы сериализации / десериализации c в Flink

Может ли кто-нибудь, кто работал с Flink, помочь мне с тем, как создать список txns для transactions1 topi c в Flink, а затем убедиться, что он работает с Consumer? Также любая помощь по вопросам добавления метки времени или некоторой обработки перед погружением будет очень полезна. Вы можете найти исходный код на https://github.com/devssh/kafkaFlinkSpike, и цель состоит в том, чтобы сгенерировать шаблон Flink для добавления деталей " AC1 "из хранилища в памяти и присоедините его к событию Transaction, поступающему в режиме реального времени, для отправки расширенного вывода пользователю.

1 Ответ

1 голос
/ 09 марта 2020

Несколько баллов, без определенного порядка:

Было бы лучше не смешивать Flink версии 1.9.2 с версией 1.9.0, как вы сделали здесь:

compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'

Обучающие материалы о том, как работать с метками времени, водяными знаками, keyBy, windows и т. Д. c., См. В онлайновых учебных материалах Ververica .

. Для использования List<Transaction> txns в качестве входной поток, вы можете сделать это ( документы ):

DataStream<Transaction> transactions = env.fromCollection(txns);

Пример того, как обрабатывать сериализацию / десериализацию при работе с Flink и Kafka, см. Flink Операционная площадка , в частности посмотрите на ClickEventDeserializationSchema и ClickEventStatisticsSerializationSchema, которые используются в ClickEventCount. java и определены здесь . (Примечание: эта игровая площадка еще не была обновлена ​​для Flink 1.10.)

...