Spark SQL «соединение отказано» при запросе внешних таблиц через паркет - PullRequest
0 голосов
/ 30 апреля 2020

Ошибка «Отказано в соединении» с внешними таблицами Spark через Hive через файлы Parquet.

Полная ошибка повторяется много раз, но имеет этот шаблон (системная спецификация замаскирована):

java.io.IOException: Failed to connect to server.org.com/10.xxx.xxx.23:53209
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
        at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:106)
        at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
        at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:585)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:62)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: server.org.com/10.xxx.xxx.23:53209

Использование Spark 2.1.1 в Hive 1.2 для Had oop 2.7.

Может быть последовательно воспроизведено с помощью этого простого запроса в таблицах тестов TP C -H:

-- FAILS.
select
   c_nationkey,
   count(o_orderkey)
from
   h_order,
   h_customer
where
   o_custkey = c_custkey
group by
   c_nationkey
;

DDL для рассматриваемых таблиц:

create external table if not exists h_customer_pq
( c_custkey     integer
, c_name        string
, c_address     string
, c_nationkey   integer
, c_phone       string
, c_acctbal     float
, c_mktsegment  string
, c_comment     string
)
stored as parquet
location '/path/to/customer'
;

create external table if not exists h_order_pq
( o_orderkey       integer
, o_custkey        integer
, o_orderstatus    string
, o_totalprice     float -- Obscure note: Spark 2.1 has issues with external floats?  Maybe, maybe not.  See below.
, o_orderdate      date
, o_orderpriority  string
, o_clerk          string
, o_shippriority   integer
, o_comment        string
)
stored as parquet
location '/path/to/order'
;

Я могу получить ошибку до go, если я превращу таблицы во внутренние таблицы Hive. То есть, если я сделаю это:

create table h_customer_int as select * from h_customer_pq;
create table h_order_int as select * from h_order_pq;

Затем перенаправьте запрос на эти внутренние таблицы, это нормально. Я подчеркиваю это, чтобы помочь в устранении неполадок, но использование только внутренних таблиц не является жизнеспособным решением в реальной системе.

И все же я могу выполнить ряд других запросов к внешним таблицам, некоторые даже более сложные:

-- SUCCEEDS.  Meaning success against the external tables over Parquet.
select
    r_name,
    count(c_nationkey) as customer_count
from
    h_region, -- More tables than the failing query
    h_nation,
    h_customer,
    h_supplier
where
    r_name = 'AMERICA' -- Still has some type of where clause.
    and r_regionkey = n_regionkey
    and n_nationkey = s_nationkey
    and s_nationkey = c_nationkey
group by
    r_name -- Still has a grouping.
order by
    customer_count desc
;

И они работают нормально.

Обратите внимание, что запрос содержит таблицу клиентов, показывая, что таблица клиентов выглядит нормально. И потом, если я просто запустил таблицу заказов, это тоже работает:

-- SUCCEEDS
select
   o_orderpriority,
   count(o_orderkey) -- Still doing some type of aggregate.
from
   h_order
where
   o_custkey < 1000 -- Still doing some type of filter.
group by
   o_orderpriority -- Still doing some type of grouping.
;

Общие объемы данных тривиальны, так что это не должно иметь к этому никакого отношения.

Когда запросы успешны по отношению к внутренним таблицам, данные верны, не похоже, что данные в файлах Parquet искажены.

Я нашел что-то о Spark 2.1, не способном обрабатывать значения с плавающей запятой во внешних файлах, но способ, которым эта ошибка проявлялась, был совсем другим - но может ли здесь быть использовано то же самое основное ограничение? Если так, может кто-нибудь процитировать какую-либо официальную ошибку или документацию, которая отмечает это? Может быть красная сельдь, но заставляет меня задуматься.

Подводя итог:

  • Spark 2.1.
  • Улей 1.2.
  • Внешние таблицы закончились Parquet.
  • Неудачный запрос включает таблицу "orders" в объединении с агрегатом.
  • Более сложные таблицы без "orders" завершаются успешно.
  • Запрос "orders" по внутреннему таблицы с одинаковыми типами данных и строками успешно выполняются.
  • И даже более простой запрос "заказов", в котором содержится только эта таблица.
  • Переход на внутренние таблицы, а затем неудачный запрос "заказов" успешно.
  • Приведенный выше небольшой пример устранения неполадок также встречается со многими другими фактическими системными запросами к внешним таблицам.

Почему некоторые конструкции запросов к определенным конструкциям внешних таблиц не работают с "отказано в соединении"

...