Я пытаюсь прочитать таблицу Google BigQuery, используя apache NIFI, для того же я написал пользовательский процессор. Который прочитал таблицу и поместил это в FlowFile. Я полагаю, что при записи данных в файл потока происходит сбой и связь go с ошибкой. Ниже приведено исключение, которое я получаю.
Ключ: 'error_message' Значение: 'IOException при чтении элемента BigQuery: IOException, выброшенное из ReadBigQueryProcessor [id = ab1995ab-0171-1000-1768-d4f8fe9c46a8]: java .io.NotSerializableException: com.google.api.services.bigquery.model.TableCell '
Пожалуйста, используйте код ниже:
public Iterator<List<FieldValue>> listTableData(String datasetName, String tableName) {
getLogger().warn("Reading Table Started.");
TableId tableIdObject = TableId.of(datasetName, tableName);
Page<List<FieldValue>> tableData = bigQuery.listTableData(tableIdObject, TableDataListOption.pageSize(100));
Iterator<List<FieldValue>> rowIterator = tableData.iterateAll();
getLogger().warn("Reading Table Completed."+ rowIterator.next().toString());
return rowIterator;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// TODO Auto-generated method stub
String tableName = context.getProperty(TABLE).getValue();
String dataset = context.getProperty(DATASET).getValue();
Iterator<List<FieldValue>> tableData = listTableData(dataset, tableName);
while (tableData.hasNext()) {
FlowFile flowFile = session.create();
try {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
// TODO Auto-generated method stub
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(tableData.next());
oos.flush();
out.write(bos.toByteArray());
}
});
session.getProvenanceReporter().create(flowFile);
session.transfer(flowFile, SUCCESS);
} catch (Exception e) {
// TODO: handle exception
getLogger().error("IOException while reading BigQuery item: " + e.getMessage());
flowFile = session.putAttribute(flowFile, "error_message",
"IOException while reading BigQuery item: " + e.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
}
}