Невозможно прочитать таблицу BigQuery, используя apache nifi - PullRequest
0 голосов
/ 24 апреля 2020

Я пытаюсь прочитать таблицу Google BigQuery, используя apache NIFI, для того же я написал пользовательский процессор. Который прочитал таблицу и поместил это в FlowFile. Я полагаю, что при записи данных в файл потока происходит сбой и связь go с ошибкой. Ниже приведено исключение, которое я получаю.

Ключ: 'error_message' Значение: 'IOException при чтении элемента BigQuery: IOException, выброшенное из ReadBigQueryProcessor [id = ab1995ab-0171-1000-1768-d4f8fe9c46a8]: java .io.NotSerializableException: com.google.api.services.bigquery.model.TableCell '

Пожалуйста, используйте код ниже:

public Iterator<List<FieldValue>> listTableData(String datasetName, String tableName) {
    getLogger().warn("Reading Table Started.");
    TableId tableIdObject = TableId.of(datasetName, tableName);
    Page<List<FieldValue>> tableData = bigQuery.listTableData(tableIdObject, TableDataListOption.pageSize(100));
    Iterator<List<FieldValue>> rowIterator = tableData.iterateAll();
    getLogger().warn("Reading Table Completed."+ rowIterator.next().toString());
    return rowIterator;
}

@Override

public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    // TODO Auto-generated method stub
    String tableName = context.getProperty(TABLE).getValue();
    String dataset = context.getProperty(DATASET).getValue();

    Iterator<List<FieldValue>> tableData = listTableData(dataset, tableName);
while (tableData.hasNext()) {
    FlowFile flowFile = session.create();
    try {
        flowFile = session.write(flowFile, new OutputStreamCallback() {
            @Override
            public void process(OutputStream out) throws IOException {
                // TODO Auto-generated method stub
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(bos);
                oos.writeObject(tableData.next());
                oos.flush();
                out.write(bos.toByteArray());
            }
        });
        session.getProvenanceReporter().create(flowFile);
        session.transfer(flowFile, SUCCESS);
    } catch (Exception e) {
        // TODO: handle exception
        getLogger().error("IOException while reading BigQuery item: " + e.getMessage());
        flowFile = session.putAttribute(flowFile, "error_message",
            "IOException while reading BigQuery item: " + e.getMessage());
        session.transfer(flowFile, REL_FAILURE);
    }
}
}
...