Google Cloud Data Fusion - создание конвейера из конечной точки API REST - PullRequest
1 голос
/ 31 января 2020

Попытка построить конвейер для чтения из стороннего источника данных конечной точки API REST.

Я использую подключаемый модуль HTTP (версия 1.2.0), найденный в концентраторе.

URL запроса ответа: https://api.example.io/v2/somedata?return_count=false

Пример тела ответа:

{
  "paging": {
    "token": "12456789",
    "next": "https://api.example.io/v2/somedata?return_count=false&__paging_token=123456789"
  },
  "data": [
    {
      "cID": "aerrfaerrf",
      "first": true,
      "_id": "aerfaerrfaerrf",
      "action": "aerrfaerrf",
      "time": "1970-10-09T14:48:29+0000",
      "email": "example@aol.com"
    },
    {...}
  ]
}

Основная ошибка в журналах:

java.lang.NullPointerException: null
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.getNextPage(BaseHttpPaginationIterator.java:118) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.ensurePageIterable(BaseHttpPaginationIterator.java:161) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.hasNext(BaseHttpPaginationIterator.java:203) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.batch.HttpRecordReader.nextKeyValue(HttpRecordReader.java:60) ~[1580429892615-0/:na]
    at io.cdap.cdap.etl.batch.preview.LimitingRecordReader.nextKeyValue(LimitingRecordReader.java:51) ~[cdap-etl-core-6.1.1.jar:na]
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:214) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.3.jar:2.3.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]

Возможные проблемы

После некоторой попытки устранить неполадку, я думаю, что проблема может быть в

Pagination

  • В плагине Data Fusion HTTP есть много способов справиться с нумерацией страниц.
    • Исходя из приведенного выше тела ответа, кажется, что лучший вариант для Тип нумерации страниц равен Link in Response Body
    • Для требуемого Следующая страница JSON / XML Field Path параметр, я пробовал $.paging.next и paging/next. Ни одна из них не работает.
    • Я проверил, что ссылка в /paging/next работает при открытии в Chrome

Аутентификация

  • Когда вы просто пытаетесь просмотреть URL-адрес ответа в Chrome, появится подсказка с запросом имени пользователя и пароля
    • Необходимо только ввести ключ API для имени пользователя, чтобы пройти это приглашение в Chrome
    • Для этого в плагине Data Fusion HTTP используется ключ API для имени пользователя в разделе Basi c Аутентификация

Anyone удалось создать конвейер в Google Cloud Data Fusion, где источником данных является REST API?

1 Ответ

1 голос
/ 04 февраля 2020

В ответ на

У кого-нибудь есть успехи в создании конвейера в Google Cloud Data Fusion, где источником данных является REST API?

Это не оптимальный способ для достижения этой цели. лучшим способом было бы получить данные Обзор API сервисов для публикации / подписки вашего, а затем использовать pub / sub в качестве источника для вашего конвейера, это обеспечило бы простое и надежное место размещения ваших данных для их обработки, хранение и анализ, см. документацию для API pub / sub. Чтобы использовать это вместе с Dataflow, выполните следующие шаги в официальной документации здесь Использование Pub / Sub с Dataflow

...