Ошибка «Отказано в соединении» с внешними таблицами Spark через Hive через файлы Parquet.
Полная ошибка повторяется много раз, но имеет этот шаблон (системная спецификация замаскирована):
java.io.IOException: Failed to connect to server.org.com/10.xxx.xxx.23:53209
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:106)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:92)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:585)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:62)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: server.org.com/10.xxx.xxx.23:53209
Использование Spark 2.1.1 в Hive 1.2 для Had oop 2.7.
Может быть последовательно воспроизведено с помощью этого простого запроса в таблицах тестов TP C -H:
-- FAILS.
select
c_nationkey,
count(o_orderkey)
from
h_order,
h_customer
where
o_custkey = c_custkey
group by
c_nationkey
;
DDL для рассматриваемых таблиц:
create external table if not exists h_customer_pq
( c_custkey integer
, c_name string
, c_address string
, c_nationkey integer
, c_phone string
, c_acctbal float
, c_mktsegment string
, c_comment string
)
stored as parquet
location '/path/to/customer'
;
create external table if not exists h_order_pq
( o_orderkey integer
, o_custkey integer
, o_orderstatus string
, o_totalprice float -- Obscure note: Spark 2.1 has issues with external floats? Maybe, maybe not. See below.
, o_orderdate date
, o_orderpriority string
, o_clerk string
, o_shippriority integer
, o_comment string
)
stored as parquet
location '/path/to/order'
;
Я могу получить ошибку до go, если я превращу таблицы во внутренние таблицы Hive. То есть, если я сделаю это:
create table h_customer_int as select * from h_customer_pq;
create table h_order_int as select * from h_order_pq;
Затем перенаправьте запрос на эти внутренние таблицы, это нормально. Я подчеркиваю это, чтобы помочь в устранении неполадок, но использование только внутренних таблиц не является жизнеспособным решением в реальной системе.
И все же я могу выполнить ряд других запросов к внешним таблицам, некоторые даже более сложные:
-- SUCCEEDS. Meaning success against the external tables over Parquet.
select
r_name,
count(c_nationkey) as customer_count
from
h_region, -- More tables than the failing query
h_nation,
h_customer,
h_supplier
where
r_name = 'AMERICA' -- Still has some type of where clause.
and r_regionkey = n_regionkey
and n_nationkey = s_nationkey
and s_nationkey = c_nationkey
group by
r_name -- Still has a grouping.
order by
customer_count desc
;
И они работают нормально.
Обратите внимание, что запрос содержит таблицу клиентов, показывая, что таблица клиентов выглядит нормально. И потом, если я просто запустил таблицу заказов, это тоже работает:
-- SUCCEEDS
select
o_orderpriority,
count(o_orderkey) -- Still doing some type of aggregate.
from
h_order
where
o_custkey < 1000 -- Still doing some type of filter.
group by
o_orderpriority -- Still doing some type of grouping.
;
Общие объемы данных тривиальны, так что это не должно иметь к этому никакого отношения.
Когда запросы успешны по отношению к внутренним таблицам, данные верны, не похоже, что данные в файлах Parquet искажены.
Я нашел что-то о Spark 2.1, не способном обрабатывать значения с плавающей запятой во внешних файлах, но способ, которым эта ошибка проявлялась, был совсем другим - но может ли здесь быть использовано то же самое основное ограничение? Если так, может кто-нибудь процитировать какую-либо официальную ошибку или документацию, которая отмечает это? Может быть красная сельдь, но заставляет меня задуматься.
Подводя итог:
- Spark 2.1.
- Улей 1.2.
- Внешние таблицы закончились Parquet.
- Неудачный запрос включает таблицу "orders" в объединении с агрегатом.
- Более сложные таблицы без "orders" завершаются успешно.
- Запрос "orders" по внутреннему таблицы с одинаковыми типами данных и строками успешно выполняются.
- И даже более простой запрос "заказов", в котором содержится только эта таблица.
- Переход на внутренние таблицы, а затем неудачный запрос "заказов" успешно.
- Приведенный выше небольшой пример устранения неполадок также встречается со многими другими фактическими системными запросами к внешним таблицам.
Почему некоторые конструкции запросов к определенным конструкциям внешних таблиц не работают с "отказано в соединении"