Как мы соединяем сервер AMPS [CRANK UP THE AMPS] и Apache Flink для потока в реальном времени? - PullRequest
0 голосов
/ 07 декабря 2018

Мы подписываемся на данные в реальном времени с сервера AMPS [CRANK UP THE AMPS] в качестве источника Apache Flink.Любая идея о том, как соединить их обоих, как Кафка.

Сервер Amps: http://www.crankuptheamps.com/amps/

1 Ответ

0 голосов
/ 07 декабря 2018

В настоящее время Apache Flink не предоставляет никакого готового разъема для AMPS, как вы могли видеть здесь .Но он предоставляет расширяемый интерфейс Source / Sink, который можно использовать для подключения к любому пользовательскому источнику / приемнику.

Вы можете создать свой собственный соединитель источника AMPS, расширив RichSourceFunction и передав его в addSourceметод, упомянутый в этой документации flink .Обратитесь к Java Client API , предоставленному crankuptheamps для подключения к теме источника и подписке на сообщения.

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.crankuptheamps.client.Client;
import com.crankuptheamps.client.Message;

public class AMPSSource extends RichSourceFunction<String> {


    private static final long serialVersionUID = -8708182052610791593L;
    private String name, topic, connectionString;
    private Client client;

    public AMPSSource(String name, String connectionString, String topic) {
        this.name = name;
        this.topic = topic;
        this.connectionString = connectionString;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // We create a Client, then connect() and logon()
        client = new Client(this.name);
        client.connect(this.connectionString);
        client.logon();
    }

    public void run(SourceContext<String> sourceContext) throws Exception {
        /*
         * Here, we iterate over messages in the MessageStream returned by
         * subscribe method
         */
        for (Message message : client.subscribe(this.topic)) {
            sourceContext.collect(message.getData());
        }
    }

    @Override
    public void close() throws Exception {
        try {
            cancel();
        } finally {
            super.close();
        }
    }

    public void cancel() {
        client.close();
    }

}

Это можно использовать в качестве источника в вашем процессоре следующим образом:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StreamProcessor {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> ampsStream = env
                .addSource(new AMPSSource("flink-consumer", "tcp://127.0.0.1:9007/amps/json", "test-topic"));

        ampsStream.print();
        env.execute();
    }
}

Примечание. Реализации RichSourceFunction имеют параллелизм 1. Чтобы включить параллельное выполнение, определенный пользователем источник должен реализовывать org.apache.flink.streaming.api.functions.source.ParallelSourceFunction или расширять org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction

...