Как сделать асинхронный вызов Hive в Java? - PullRequest
3 голосов
/ 02 февраля 2010

Я хотел бы выполнить запрос Hive на сервере асинхронным способом. Запрос Hive, вероятно, займет много времени, поэтому я предпочел бы не блокировать вызов. В настоящее время я использую Thirft, чтобы сделать блокирующий вызов (блокирует client.execute ()), но я не видел пример того, как сделать неблокирующий вызов. Вот код блокировки:

        TSocket transport = new TSocket("hive.example.com", 10000);
        transport.setTimeout(999999999);
        TBinaryProtocol protocol = new TBinaryProtocol(transport);
        Client client = new ThriftHive.Client(protocol);
        transport.open();
        client.execute(hql);  // Omitted HQL

        List<String> rows;
        while ((rows = client.fetchN(1000)) != null) {
            for (String row : rows) {
                // Do stuff with row
            }
        }

        transport.close();

В приведенном выше коде отсутствуют блоки try / catch, чтобы сделать его коротким.

У кого-нибудь есть идеи, как сделать асинхронный вызов? Может ли Hive / Thrift это поддержать? Есть ли лучший способ?

Спасибо!

Ответы [ 6 ]

2 голосов
/ 10 января 2012

Теперь возможно сделать асинхронный вызов в клиенте Java Thrift после установки этого патча: https://issues.apache.org/jira/browse/THRIFT-768

Сгенерируйте асинхронный Java-клиент, используя новое решение, и инициализируйте ваш клиент следующим образом:

TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9160);
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
Hive.AsyncClient client = new Hive.AsyncClient(protocolFactory, clientManager, transport);

Теперь вы можете выполнять методы на этом клиенте так же, как на синхронном интерфейсе. Единственное изменение состоит в том, что все методы принимают дополнительный параметр обратного вызова.

2 голосов
/ 22 февраля 2010

AFAIK, на момент написания Thrift не генерировал асинхронных клиентов. Причина, описанная в этой ссылке здесь (текст для поиска «асинхронный»), заключается в том, что Thrift был разработан для центра обработки данных, в котором задержка считается низкой.

К сожалению, как вы знаете, задержка между вызовом и результатом вызвана не всегда сетью, а выполняемой логикой! У нас есть проблема при вызове базы данных Cassandra с сервера приложений Java, где мы хотим ограничить общее количество потоков.

Резюме: на данный момент все, что вы можете сделать, - это убедиться, что у вас достаточно ресурсов для обработки необходимого количества заблокированных одновременных потоков и ожидания более эффективной реализации.

1 голос
/ 13 февраля 2010

После разговора со списком рассылки Hive Hive не поддерживает асинхронные вызовы с использованием Thirft.

1 голос
/ 02 февраля 2010

Я ничего не знаю о Hive, но, в крайнем случае, вы можете использовать библиотеку параллелизма Java:

 Callable<SomeResult> c = new Callable<SomeResult>(){public SomeResult call(){

    // your Hive code here

 }};

 Future<SomeResult> result = executorService.submit(c);

 // when you need the result, this will block
 result.get();

Или, если вам не нужно ждать результата, используйте Runnable вместо Callable .

0 голосов
/ 08 февраля 2010

Мы запускаем асинхронные вызовы на AWS Elastic MapReduce . AWS MapReduce может выполнять задания hadoop / hive в облаке Amazon с помощью вызова веб-сервисов AWS MapReduce.

Вы также можете следить за состоянием своих заданий и получать результаты с S3 после завершения задания.

Поскольку вызовы веб-служб носят асинхронный характер, мы никогда не блокируем другие наши операции. Мы продолжаем следить за состоянием наших работ в отдельном потоке и собираем результаты по завершении работы.

0 голосов
/ 02 февраля 2010

Я не знаю, в частности, о Hive, но любой блокирующий вызов может быть превращен в асинхронный вызов путем создания нового потока и использования обратного вызова. Вы могли бы взглянуть на java.util.concurrent.FutureTask, который был разработан, чтобы облегчить обработку такой асинхронной операции.

...