Apache Flink преобразовать DataStream (источник) в список? - PullRequest
0 голосов
/ 25 апреля 2018

Мой вопрос заключается в том, как преобразовать DataStream в List, например, чтобы иметь возможность перебирать его.

Код выглядит так:

package flinkoracle;

//imports
//....

public class FlinkOracle {

    final static Logger LOG = LoggerFactory.getLogger(FlinkOracle.class);

    public static void main(String[] args) {
        LOG.info("Starting...");
        // get the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
        //get the source from Oracle DB
        DataStream<?> source = env
                .createInput(JDBCInputFormat.buildJDBCInputFormat()
                        .setDrivername("oracle.jdbc.driver.OracleDriver")
                        .setDBUrl("jdbc:oracle:thin:@localhost:1521")
                        .setUsername("user")
                        .setPassword("password")
                        .setQuery("select * from  table1")
                        .setRowTypeInfo(rowTypeInfo)
                        .finish());

        source.print().setParallelism(1);

        try {
            LOG.info("----------BEGIN----------");
            env.execute();
            LOG.info("----------END----------");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        LOG.info("End...");
    }

}

Большое спасибо заранее. бром Тамас

1 Ответ

0 голосов
/ 25 апреля 2018

Flink предоставляет приемник итератора для сбора результатов DataStream для целей тестирования и отладки.Его можно использовать следующим образом:

import org.apache.flink.contrib.streaming.DataStreamUtils;

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

Вы можете скопировать итератор в новый список следующим образом:

while (iter.hasNext())
    list.add(iter.next());

Flink также предоставляет набор простых методов write * () дляDataStream, которые в основном предназначены для отладки.Сброс данных в целевую систему зависит от реализации OutputFormat.Это означает, что не все элементы, отправленные в OutputFormat, немедленно отображаются в целевой системе.Примечание. Эти методы write * () не участвуют в контрольных точках Flink, и в случае сбоя эти записи могут быть потеряны.

writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket

Источник: ссылка .

Вам может понадобиться добавить следующую зависимость для использования DataStreamUtils:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-contrib</artifactId>
    <version>0.10.2</version>
</dependency>
...