Flink to Nifi Волшебного Заголовка нет - PullRequest
0 голосов
/ 31 декабря 2018

Я пытаюсь использовать этот пример для подключения Nifi к Flink:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
            .url("http://localhost:8090/nifi")
            .portName("Data for Flink")
            .requestBatchCount(5)
            .buildConfig();

    SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
    DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);

    DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
        @Override
        public String map(NiFiDataPacket value) throws Exception {
            return new String(value.getContent(), Charset.defaultCharset());
        }
    });

    dataStream.print();
    env.execute();

Я использую Nifi как отдельный сервер со свойствами по умолчанию, за исключением следующих свойств:

nifi.remote.input.host=localhost
nifi.remote.input.secure=false
nifi.remote.input.socket.port=8090
nifi.remote.input.http.enabled=true

Вызов каждый раз завершается неудачно, со следующим логином Nifi:

[Site-to-Site Worker Thread-24] o.a.nifi.remote.SocketRemoteSiteListener 
Unable to communicate with remote instance null due to
org.apache.nifi.remote.exception.HandshakeException: Handshake 
with nifi://localhost:61680 failed because the Magic Header 
was not present; closing connection

Версия Nifi: 1.7.1, Версия Flink: 1.7.1

1 Ответ

0 голосов
/ 01 января 2019

После использования nifi-toolkit я удалил пользовательское значение nifi.remote.input.socket.port, а затем добавил transportProtocol(SiteToSiteTransportProtocol.HTTP) к моим SiteToSiteClientConfig и http://localhost:8080/nifi в качестве URL.

Причина, по которой я в первую очередь изменил порт, заключается в том, что без указания протокола HTTP он будет использовать RAW по умолчанию.А при использовании протокола RAW со стороны Flink клиент не может создать Transaction и печатает следующее предупреждение:

Unable to refresh Remote Group's peers due to Remote instance of NiFi 
is not configured to allow RAW Socket site-to-site communications

Вот почему я подумал, что это проблема с портом

Тактеперь с конфигурацией по умолчанию Nifi, это работает как ожидалось:

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
            .url("http://localhost:8080/nifi")
            .portName("portNameAsInNifi")
            .transportProtocol(SiteToSiteTransportProtocol.HTTP)
            .requestBatchCount(1)
            .buildConfig();
...