Инкрементная обработка потокового api в твиттере с использованием apache httpclient? - PullRequest
2 голосов
/ 28 марта 2012

Я использую Apache HTTPClient 4 для подключения к потоковому api в твиттере с уровнем доступа по умолчанию.В начале он работает отлично, но через несколько минут после извлечения данных выдает следующее сообщение:

2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443]
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated.
    at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216)
Make sure to release the connection before allocating another one.
    at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190)

Я понимаю, почему я сталкиваюсь с этой проблемой.Я пытаюсь использовать этот HttpClient в кластере потока в качестве источника потока.Код выглядит следующим образом:

public Event next() throws IOException, InterruptedException {

    try {

        HttpHost target = new HttpHost("stream.twitter.com", 443, "https");
        new BasicHttpContext();
        HttpPost httpPost = new HttpPost("/1/statuses/filter.json");
        StringEntity postEntity = new StringEntity("track=birthday",
                "UTF-8");
        postEntity.setContentType("application/x-www-form-urlencoded");
        httpPost.setEntity(postEntity);
        HttpResponse response = httpClient.execute(target, httpPost,
                new BasicHttpContext());
        BufferedReader reader = new BufferedReader(new InputStreamReader(
                response.getEntity().getContent()));
        String line = null;
        StringBuffer buffer = new StringBuffer();
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>30000) break;
        }
        return new EventImpl(buffer.toString().getBytes());
    } catch (IOException ie) {
        throw ie;
    }

}

Я пытаюсь буферизовать 30 000 символов в потоке ответов в StringBuffer, а затем вернуть это как полученные данные.Я, очевидно, не закрываю соединение - но я не хочу его закрывать, пока, я думаю.Об этом говорится в руководстве по Твиттеру:

Некоторые клиентские библиотеки HTTP возвращают тело ответа только после того, как соединение было закрыто сервером.Эти клиенты не будут работать для доступа к потоковому API.Вы должны использовать HTTP-клиент, который будет возвращать данные ответа постепенно.Большинство надежных клиентских библиотек HTTP обеспечат эту функциональность.Например, Apache HttpClient будет обрабатывать этот сценарий использования.

Он четко говорит вам, что HttpClient будет возвращать ответные данные постепенно.Я просмотрел примеры и учебные пособия, но не нашел ничего похожего на это.Если вы, ребята, использовали httpclient (если не apache) и постепенно читали API потоковой передачи Twitter, пожалуйста, дайте мне знать, как вы достигли этого навыка.Те, кто не имеет, пожалуйста, не стесняйтесь вносить свой вклад в ответы.TIA.

ОБНОВЛЕНИЕ

Я попытался сделать это: 1) Я переместил дескриптор потока в метод open источника flume.2) Использование простого потока и чтение данных в байтовый буфер.Итак, вот как выглядит тело метода:

        byte[] buffer = new byte[30000];

        while (true) {
            int count = instream.read(buffer);
            if (count == -1)
                continue;
            else
                break;
        }
        return new EventImpl(buffer);

Это работает до некоторой степени - я получаю твиты, они приятно записываются в место назначения.Проблема в возвращаемом значении instream.read (buffer).Даже если в потоке нет данных, а в буфере есть байты по умолчанию и 30000 из них, поэтому это значение записывается в место назначения.Таким образом, файл назначения выглядит следующим образом .. "твиты .. твиты .. твиты .. \ u0000 \ u0000 \ u0000 \ u0000 \ u0000 ... твиты .. твиты ...".Я понимаю, что счетчик не вернет -1, потому что это бесконечный поток, так как мне узнать, есть ли в буфере новый контент из команды чтения?

Ответы [ 2 ]

0 голосов
/ 01 апреля 2012

Оказывается, это была проблема с каналом.Flume оптимизирован для передачи событий размером 32 КБ.Все, что за 32 КБ, Flume выручает.(Временное решение: настроить размер события более 32 КБ).Итак, я изменил свой код для буферизации не менее 20 000 символов.Это работает, но это не доказательство.Это все равно может потерпеть неудачу, если длина буфера превышает 32 КБ, однако за час тестирования это не сработало - я считаю, что это связано с тем, что Twitter не отправляет много данных в своем публичном потоке.

while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>20000) break;
        }
0 голосов
/ 28 марта 2012

Проблема в том, что ваш код пропускает соединения.Пожалуйста, убедитесь, что независимо от того, что вы либо закрываете поток контента, либо прерываете запрос.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...