Как сохранить таблицу flink в файл csv? - PullRequest
0 голосов
/ 27 мая 2018

Я использую Flink 1.4.0

Я пытаюсь сохранить результаты запроса API таблицы в файл CSV, но получаю ошибку.Вот подробности:

Мой входной файл выглядит следующим образом:

id,species,color,weight,name 
311,canine,golden,75,dog1 
312,canine,brown,22,dog2 
313,feline,gray,8,cat1

Я запускаю запрос по этому вопросу, чтобы выбрать только клыки, и я хочу сохранить его в файле CSV:

ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment(); 
BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env); 

String inputPath = "location-of-source-file"; 
CsvTableSource petsTableSource = CsvTableSource.builder() 
  .path(inputPath) 
  .ignoreFirstLine() 
  .fieldDelimiter(",") 
  .field("id", Types.INT()) 
  .field("species", Types.STRING()) 
  .field("color", Types.STRING()) 
  .field("weight", Types.DOUBLE()) 
  .field("name", Types.STRING()) 
  .build(); 

// Register our table source 
tableEnv.registerTableSource("pets", petsTableSource); 
Table pets = tableEnv.scan("pets"); 

Table counts = pets 
  .groupBy("species") 
  .select("species, species.count as count") 
  .filter("species === 'canine'"); 

// Convert to Dataset and display results
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class); 
result.print(); 

// Write Results to File 
TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets.csv", ","); 
counts.writeToSink(sink); 

Когда я запускаю это, я вижу результаты из набора данных:canine, 2

Однако я не вижу результатов в выходном файле и вижу эти ошибки ниже.Что я могу сделать, чтобы это исправить?Спасибо!

2018-05-27 13:29:17,040 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields
2018-05-27 13:29:17,040 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields
2018-05-27 13:29:17,040 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields.
2018-05-27 13:29:17,047 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not contain a getter for field fields
2018-05-27 13:29:17,047 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not contain a setter for field fields
2018-05-27 13:29:17,047 INFO  [main] typeutils.TypeExtractor (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid POJO type because not all fields are valid POJO fields.

1 Ответ

0 голосов
/ 28 мая 2018

Возможно, вам не хватает

env.execute(); 

после

counts.writeToSink(sink);

В отличие от print(), который немедленно запускает выполнение, writeToSink() просто добавляет оператор приемника и требуетдля явного запуска выполнения.

Сообщения INFO TypeExtractor "просто" говорят вам, что Row нельзя использовать в качестве типа POJO, но здесь все в порядке.

...