Не применяется с ParDo и DoFn с использованием Apache Beam - PullRequest
0 голосов
/ 08 ноября 2018

Я внедряю конвейер Pub / Sub в BigQuery. Это похоже на Как создать преобразование для чтения с использованием ParDo и DoFn в Apache Beam , но здесь я уже создал PCollection.

Я следую тому, что описано в документации Apache Beam для реализации операции ParDo для подготовки строки таблицы с использованием следующего конвейера:

static class convertToTableRowFn extends DoFn<PubsubMessage, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        PubsubMessage message = c.element();
        // Retrieve data from message
        String rawData = message.getData();
        Instant timestamp = new Instant(new Date());
        // Prepare TableRow
        TableRow row = new TableRow().set("message", rawData).set("ts_reception", timestamp);
        c.output(row);
    }
}

// Read input from Pub/Sub
pipeline.apply("Read from Pub/Sub",PubsubIO.readMessagesWithAttributes().fromTopic(topicPath))
        .apply("Prepare raw data for insertion", ParDo.of(new convertToTableRowFn()))
        .apply("Insert in Big Query", BigQueryIO.writeTableRows().to(BQTable));

Я нашел функцию DoFn в gist .

Я получаю следующую ошибку:

The method apply(String, PTransform<? super PCollection<PubsubMessage>,OutputT>) in the type PCollection<PubsubMessage> is not applicable for the arguments (String, ParDo.SingleOutput<PubsubMessage,TableRow>)

Я всегда понимал, что операции ParDo / DoFn - это поэтапная операция PTransform, я не прав? Я никогда не сталкивался с подобными ошибками в Python, поэтому я немного озадачен, почему это происходит.

1 Ответ

0 голосов
/ 08 ноября 2018

Вы правы, ParDos - это поэлементные преобразования, и ваш подход выглядит правильно.

То, что вы видите, это ошибка компиляции. Примерно так происходит, когда тип аргумента метода apply(), который был выведен компилятором java, не соответствует типу фактического ввода, например convertToTableRowFn.

Из ошибки, которую вы видите, похоже, что Java выводит, что второй параметр для apply() имеет тип PTransform<? super PCollection<PubsubMessage>,OutputT>, в то время как вы передаете подкласс ParDo.SingleOutput<PubsubMessage,TableRow> (ваш convertToTableRowFn). Глядя на определение SingleOutput ваш convertToTableRowFn в основном PTransform<PCollection<? extends PubsubMessage>, PCollection<TableRow>>. И Java не может использовать его в apply, где он ожидает PTransform<? super PCollection<PubsubMessage>,OutputT>.

Что выглядит подозрительным, так это то, что java не выводил OutputT на PCollection<TableRow>. Одна из причин, по которой это не удастся сделать, если у вас есть другие ошибки. Вы уверены, что у вас нет других ошибок?

Например, глядя на convertToTableRowFn, вы звоните message.getData(), которого не существует, когда я пытаюсь это сделать, и он терпит неудачу при компиляции. В моем случае мне нужно сделать что-то вроде этого: rawData = new String(message.getPayload(), Charset.defaultCharset()). Также .to(BQTable)) ожидает строку (например, строку, представляющую имя таблицы BQ) в качестве аргумента, и вы передаете некоторый неизвестный символ BQTable (возможно, он где-то существует в вашей программе, хотя в вашем случае это не является проблемой) ).

После того, как я исправлю эти две ошибки, ваш код компилируется для меня, apply() полностью выведен и типы совместимы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...