Излучение «боковых выходов» и «выходных данных процесса» в одном приемнике с другим путем - PullRequest
0 голосов
/ 13 декабря 2018

Как излучать «боковые выходы» и «выходные данные процесса», используя один приемник.Здесь, в этом случае, оба выходных данных должны отправляться в один приемник, и в зависимости от пути к папке тегов различается

Например,

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};    
SingleOutputStreamOperator<String> mainDataStream = source.process(new ProcessFunction<String, String>() {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        try {
             builder.parse(new InputSource(new StringReader(value)));
             out.collect(value);
        } catch (SAXException | IOException e) {
             ctx.output(outputTag, value);
        }
    }
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

Есть ли какое-либо иное лучшее решение?Просто беспокоюсь о производительности

Ответы [ 2 ]

0 голосов
/ 17 декабря 2018

Flink's BucketingSink может использовать Bucketer , чтобы определить, какой подкаталог внутри базового каталога будет использоваться.Таким образом, вы можете использовать это для установки подкаталога на основе атрибута записываемой записи.

Что касается использования единственного приемника, поскольку как основной, так и боковой выход вашей функции являются объектами String(того же типа), вы можете mainDataStream.union(sideOutputStream) два потока вместе, прежде чем выводить результат.

0 голосов
/ 14 декабря 2018

Если вы хотите использовать один приемник, вы можете добавить атрибут в выходной формат и использовать этот атрибут для идентификации источника данных в одном приемнике.

Вы также можете создать два приемника с разными параметрами для получения данных из разных источников.На мой взгляд, без учета используемой вами базы данных, этот многопоточный способ имеет лучшую производительность.

...