Разбор большого количества текста - PullRequest
3 голосов
/ 14 января 2011

Чтобы дать краткий обзор программы 1.) Открывает соединение с сокетом и читает данные 2.) Разбивает данные на символ новой строки 3.) Помещает сегменты данных в очередь для обработки в отдельном потоке.

Я использую библиотеку curlpp, так как она обрабатывает аутентификацию и поиск DNS. Очередь представляет собой просто деку с мьютексами для обеспечения безопасности потоков.

Это метод, который я сейчас использую.

std::string input;
size_t socketIO::dataCallBack(char* ptr, size_t size, size_t nmemb) {
    // Calculate the real size of the incoming buffer
    size_t realsize = size * nmemb;

    //Append the new input to the old input
    input.append(ptr, realsize);

    //Find all the complete strings and push them to the queue
    size_t oldPosition = 0;
    size_t position = 0;
    position = input.find('\r', oldPosition);
    while (position != std::string::npos) {
        queueObject.push(input.substr(oldPosition, position))
        oldPosition = position + 1;
        position = input.find('\r', oldPosition);
    }

    //Save off the partial string as you'll get the rest of it on the next data callback
    input = input.substr(oldPosition);

    return realsize;
}

У меня есть несколько проблем. У меня проблемы с утечками памяти, и valgrind показывает серьезную утечку из этой функции.

==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359
==12867==    at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261)
==12867==    by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006)
==12867==    by 0x509C657: utilspp::Functor<unsigned long, utilspp::tl::TypeList<char*, utilspp::tl::TypeList<unsigned long, utilspp::tl::TypeList<unsigned long, utilspp::NullType> > > >::operator()(char*, unsigned long, unsigned long) (Functor.hpp:106)
==12867==    by 0x509B6E4: curlpp::internal::CurlHandle::executeWriteFunctor(char*, unsigned long, unsigned long) (CurlHandle.cpp:171)
==12867==    by 0x509F509: curlpp::internal::Callbacks::WriteCallback(char*, unsigned long, unsigned long, curlpp::internal::CurlHandle*) (OptionSetter.cpp:47)
==12867==    by 0x4E3D667: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x4E5407B: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x4E505A1: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x4E51A8F: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867==    by 0x509A78B: curlpp::internal::CurlHandle::perform() (CurlHandle.cpp:52)
==12867==    by 0x5093A6B: curlpp::Easy::perform() (Easy.cpp:48)
==12867==    by 0x41EDC3: socketIO::processLoop() (socketIO.cpp:126)

Что бы вы предложили сделать. Я рассмотрел использование istringstream, но я не уверен, как работает его распределение памяти и будет ли он восстанавливать память, которую я уже прочитал. У меня проблема в том, что мне нужно хранить данные между обратными вызовами, но делать это так, чтобы не было утечки памяти.

UPDATE По запросу больше кода. Я написал с мыслью, что чем больше, тем лучше.

main.cpp

/**
 * The main driver for the twitter capture app.  Starts multiple threads for processors, 1 io thread and 2 db threads. One for user
 * information and the other for tweet information 
 */

#include "types.h"
#include "threadBase.h"
#include "socketIO.h"
#include "processor.h"
#include "dbTweetQueue.h"
#include "dbUserQueue.h"

#include <vector>


stringQueue twitToProc;
tweetQueue tweetQ;
userQueue userQ;
deleteQueue deleteQ;
std::vector<ThreadBase *> threadGroup;

std::string dbBase::dbUser(DBUSER);
std::string dbBase::dbURL(DBURL);
std::string dbBase::dbPass(DBPASS);

/*
 * Handle the signal for interupt
 */
void sigquit(int param)
{
    std::cout<<"Received sigquit"<<std::endl;
    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->interupt();
    }
}


int main(int argc, char* argv[])
{
    try{
    //Setting the signal handler up.
    struct sigaction act;
    act.sa_handler = sigquit;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    sigaction(SIGQUIT, &act, 0);


    int MaxThreads = 5;
    if(argc < 3)
    {
        std::cout<<"Usage: >"<<argv[0]<<" TwitterUserName TwitterPassWord"<<std::endl;
        std::cout<<"Using Defaults: "<<TWITTERACCT<<" "<<TWITTERPASS<<std::endl;
    }

    // Create socketIO, and add it to the thread group
    if(argc == 3)
    {
        threadGroup.push_back(new socketIO(twitToProc, argv[1], argv[2]));
    }
    else
    {
        threadGroup.push_back(new socketIO(twitToProc));
    }


   // Create processorThreads and add them to the thread group
    for(int i = 0; i < MaxThreads; i++)
    {
        threadGroup.push_back(new processor(twitToProc, tweetQ, deleteQ, userQ));
    }

    //Create DB Threads and add them to the thread group.
    threadGroup.push_back(new dbTweetQueue(tweetQ, deleteQ));
    threadGroup.push_back(new dbUserQueue(userQ));


    // Start the threads
    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->start();
    }

    // Join the threads
    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->join();
    }

           } catch (std::exception & e) {
            std::cerr << e.what() <<  std::endl;
        } 

    for(unsigned int i = 0; i < threadGroup.size(); i++)
    {
        threadGroup[i]->();
    }
    return 0;
}

threadBase.h

#ifndef _THREADBASE_H
#define _THREADBASE_H

#include <boost/thread.hpp>

class ThreadBase
{
public:
    virtual void join() = 0;
    virtual void start() = 0;
    void interupt(){thread.interrupt();}
protected:
    boost::thread thread;

};



#endif  /* _THREADBASE_H */

socketIO.h

#ifndef _SOCKETIO_H
#define _SOCKETIO_H

#include "types.h"
#include "threadBase.h"

#include <boost/bind.hpp>
#include <curlpp/cURLpp.hpp>
#include <curlpp/Multi.hpp>
#include <curlpp/Easy.hpp>
#include <curlpp/Options.hpp>
#include <curlpp/Exception.hpp>
#include <curlpp/Infos.hpp>
#include <curl/curl.h>

#include <signal.h>
#include <string>
#include <sstream>
#include <cstdlib>


#define defaultRepeatInterval 10;

class socketIO: public ThreadBase {
private:
    int repeatInterval;
    double previousDownloadSize;
    int failCount;
    int writeRound;
    std::string userPassword;
    stringQueue&  queueObject;
    std::string input;


public:
    socketIO(stringQueue & messageQueue):
                queueObject(messageQueue)
    {
        userPassword.append(TWITTERACCT);
        userPassword.append(":");
        userPassword.append(TWITTERPASS);
    }

    socketIO(stringQueue & messageQueue, char* userName, char* password):
                queueObject(messageQueue)
    {
        userPassword.append(userName);
        userPassword.append(":");
        userPassword.append(password);
    }

    virtual ~socketIO();

    void join();
    void start();
    std::auto_ptr<curlpp::Easy> createRequest(int);



    void processLoop();
    size_t write(char* ptr, size_t size, size_t nmemb);
    int progress(double, double, double, double);

};

#endif  /* _SOCKETIO_H */

socketIO.cpp

#include "socketIO.h"

socketIO::~socketIO() {
}

/*
 * This method starts a new thread with the processLoop method
 */
void socketIO::start() {
    thread = boost::thread(&socketIO::processLoop, this);
}

/*
 * This method blocks waiting for the thread to exit
 */
void socketIO::join() {
    thread.join();
}

/*
 * The datacall back function for the open twitter connection.\
 */
size_t socketIO::write(char* ptr, size_t size, size_t nmemb) {
    // Calculate the real size of the incoming buffer
    size_t realsize = size * nmemb;
    std::string temp;
    temp.append(input);
    temp.append(ptr, realsize);
    size_t oldPosition = 0;
    size_t position = 0;
    position = temp.find('\r', oldPosition);
    while (position != std::string::npos) {
        queueObject.push(temp.substr(oldPosition, position));
        ++writeRound;
        oldPosition = position + 1;
        position = temp.find('\r', oldPosition);
    }
    input = temp.substr(oldPosition);
    return realsize;
}

/*
 * The timed callback function, called every second, used to monitor that the connection is still receiving data
 * Return 1 if requesting break or data flow stops, 0 if continuing normally
 */
int socketIO::progress(double dltotal, double dlnow, double ultotal, double ulnow) {
    // Allows us to break out on interruption
    if (boost::this_thread::interruption_requested())
        return 1;

    if (dlnow == previousDownloadSize) {
        if (failCount < 15)
            failCount++;
        else {
            repeatInterval = repeatInterval * 2;
            return 1;
        }
    } else {
        repeatInterval = 10;
        previousDownloadSize = dlnow;
    }
    return 0;
}

/*
 * This method creates a new connection to the twitter service with the required settings
 */
std::auto_ptr<curlpp::Easy> socketIO::createRequest(int source) {
    //Reset the input buffer when the connection is made.
    input = std::string("");
    std::auto_ptr<curlpp::Easy> newRequest(new curlpp::Easy);

    curlpp::types::ProgressFunctionFunctor progressFunctor(this, &socketIO::progress);
    newRequest->setOpt(new curlpp::options::ProgressFunction(progressFunctor));

    curlpp::types::WriteFunctionFunctor functor(this, &socketIO::write);
    newRequest->setOpt(new curlpp::options::WriteFunction(functor));

    newRequest->setOpt(new curlpp::options::FailOnError(true));
    newRequest->setOpt(new curlpp::options::NoProgress(0));
    newRequest->setOpt(new curlpp::options::Verbose(true));
    newRequest->setOpt(new curlpp::options::UserPwd(userPassword));


    //Code for debugging and using alternate sources
    std::string params = "track=basketball,football,baseball,footy,soccer";

    switch (source) {
        case 1: // Testing Locally
            newRequest->setOpt(new curlpp::options::Url("127.0.0.1:17000"));
            break;
        case 2: // Filtered
            newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/filter.json"));
            newRequest->setOpt(new curlpp::options::PostFields(params));
            newRequest->setOpt(new curlpp::options::PostFieldSize(params.size()));
            break;
        case 3: //Twitter Main Stream
            newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/sample.json"));
            break;
    }

    return newRequest;
}


/*
 * The main method of the thread.  Creates a new instance of the request
 */
void socketIO::processLoop() {
    repeatInterval = defaultRepeatInterval;
    std::auto_ptr<curlpp::Easy> request;
    while (true) {
        try {
            previousDownloadSize = 0;
            failCount = 0;
            request.reset(createRequest(3));
            request->perform();
        } catch (curlpp::UnknowException & e) {
            std::cout << "Unknown Exception: " << e.what() << std::endl;
        } catch (curlpp::RuntimeError & e) {
            std::cout << "Runtime Exception: " << e.what() << std::endl;
        } catch (curlpp::LogicError & e) {
            std::cout << "Logic Exception: " << e.what() << std::endl;
        }


        if (boost::this_thread::interruption_requested())
            break;
        else
            boost::this_thread::sleep(boost::posix_time::seconds(repeatInterval));
    }
}

types.h

#ifndef _TYPES_H
#define _TYPES_H

#include <string>
#include <concurrent_queue.hpp>

#define DBUSER "****"
#define DBPASS "****"
#define DBURL "****"
#define TWITTERACCT "****"
#define TWITTERPASS "****"

typedef struct tweet {
...
} tweet;

typedef struct user {
...
} user;


typedef concurrent_queue<std::string> stringQueue;
typedef std::pair<int, std::string> dbPair;
typedef concurrent_queue<dbPair> dbQueue;

typedef concurrent_queue<tweet> tweetQueue;
typedef concurrent_queue<user> userQueue;
typedef concurrent_queue<boost::int64_t> deleteQueue;

#endif  /* _TYPES_H */

concurrent_queue.hpp

#ifndef _CONCURRENT_QUEUE_
#define _CONCURRENT_QUEUE_

#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <deque>

template<typename Data>
class concurrent_queue
{
private:
    std::deque<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push_back(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop_front();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }

        popped_value=the_queue.front();
        the_queue.pop_front();
    }

};

#endif  /* _CONCURRENT_QUEUE_ */

Ответы [ 4 ]

3 голосов
/ 14 января 2011

Не то, чтобы у меня действительно было достаточно информации, чтобы быть уверенным в этом ответе, но вот мое предположение.

Глядя на свой стек valgrind для того, когда память была выделена, вы видите:

==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359
==12867==    at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261)
==12867==    by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13)
==12867==    by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006)

Это в значительной степени означает, что строка была создана в вашем методе записи.std :: string, как и большинство контейнеров stl, ничего не выделяет в куче до тех пор, пока это не будет необходимо, что в данном случае происходит при добавлении к ней данных.

Теперь память выделена, и это нормально, но этоникогда не освобождается, потому что деструктор std :: string input никогда не вызывается.Для этого может быть несколько причин, но наиболее распространенными из них являются:

  • Вы выделили кучу socketIO и забыли освободить его.
  • У вас есть виртуальные функции, но вы забыли виртуальный деструкторгде-то.
2 голосов
/ 10 марта 2011

Я знаю, что взвешиваюсь здесь довольно поздно (фактически, более чем на несколько месяцев), но в случае, если другие следуют этой теме, у меня возникает похожая проблема, и мне удалось отследить ее до библиотек curlpp.

При этом я далеко не эксперт по C ++ и уверен, что именно так я и использую библиотеки. Они говорят, что используют стиль очистки памяти в стиле RAII, но даже явно создавая и уничтожая настройки параметров в моем запросе (который повторно используется во время выполнения моей программы), я все еще вижу серьезный рост объема памяти процесса.

Когда я удаляю вызовы библиотек curlpp, моя программа работает в очень статичном состоянии, поскольку она требует памяти. Большинство примеров, которые они предоставляют, представляют собой простые программы, которые с main () что-то делают и завершают работу, поэтому не так просто создать исполняемый файл типа демона, который использует класс с Easy HTTPClient (то, что я использую), который создается при создании экземпляра. и используется повторно во время выполнения программы.

2 голосов
/ 14 января 2011

ThreadBase не имеет виртуального деструктора.

Результат применения delete к ThreadBase*, когда объект, на который указывает объект, не является ThreadBase, а, следовательно, производным типом является не определен .На практике это часто утечка, если какой-либо из производных классов выделяет память (прямо или косвенно).

class ThreadBase
{
public:
    virtual ~ThreadBase() {} // <-- There you go!

    virtual void join() = 0;
    virtual void start() = 0;

    void interupt() { thread.interrupt(); }

protected:
    boost::thread thread;
};

С точки зрения дизайна:

  • избегать protected атрибуты, предпочитают предоставлять методы для инкапсуляции их использования.
  • Идиома NVI (Non-Virtual Interface) предусматривает, что использование методов, которые одновременно являются public и virtual, является плохой идеей (не может проверить предварительные условия и постусловиянапример), лучше использовать общедоступный не виртуальный метод, который будет вызывать частный виртуальный метод для подробностей реализации.
  • вы, вероятно, можете ThreadBase наследовать конфиденциально от boost::noncopyable, чтобы документально подтвердить, что это не таккопируемый.
1 голос
/ 14 января 2011

Проблема, по-видимому, в другом месте целиком или в сочетании с этим кодом и в других местах.

Нет утечки памяти в опубликованном вами коде.Существует вероятность того, что вы преобразовали код для публикации на SO, который упустил важные детали.Например, вы исключили блокировку очереди (что, как вы упоминаете, требуется, и я верю, что вы на самом деле это делаете), что может привести к повреждению и утечкам.Другой пример - входная переменная: действительно ли она глобальная или является элементом данных? Микрофон упоминает еще несколько потенциальных ошибок, которые могут быть или не быть ошибками транскрипции.

Нам действительно нужен полный, компилируемый пример, который демонстрирует проблему.

...