Чтобы дать краткий обзор программы
1.) Открывает соединение с сокетом и читает данные
2.) Разбивает данные на символ новой строки
3.) Помещает сегменты данных в очередь для обработки в отдельном потоке.
Я использую библиотеку curlpp, так как она обрабатывает аутентификацию и поиск DNS.
Очередь представляет собой просто деку с мьютексами для обеспечения безопасности потоков.
Это метод, который я сейчас использую.
std::string input;
size_t socketIO::dataCallBack(char* ptr, size_t size, size_t nmemb) {
// Calculate the real size of the incoming buffer
size_t realsize = size * nmemb;
//Append the new input to the old input
input.append(ptr, realsize);
//Find all the complete strings and push them to the queue
size_t oldPosition = 0;
size_t position = 0;
position = input.find('\r', oldPosition);
while (position != std::string::npos) {
queueObject.push(input.substr(oldPosition, position))
oldPosition = position + 1;
position = input.find('\r', oldPosition);
}
//Save off the partial string as you'll get the rest of it on the next data callback
input = input.substr(oldPosition);
return realsize;
}
У меня есть несколько проблем. У меня проблемы с утечками памяти, и valgrind показывает серьезную утечку из этой функции.
==12867== 813,287,102 bytes in 390,337 blocks are possibly lost in loss record 359 of 359
==12867== at 0x4C27CC1: operator new(unsigned long) (vg_replace_malloc.c:261)
==12867== by 0x5AA8D98: std::string::_Rep::_S_create(unsigned long, unsigned long, std::allocator<char> const&) (in /usr/lib/libstdc++.so.6.0.13)
==12867== by 0x5AA9B64: ??? (in /usr/lib/libstdc++.so.6.0.13)
==12867== by 0x5AA9D38: std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&, unsigned long, unsigned long) (in /usr/lib/libstdc++.so.6.0.13)
==12867== by 0x41E4F5: socketIO::write(char*, unsigned long, unsigned long) (basic_string.h:2006)
==12867== by 0x509C657: utilspp::Functor<unsigned long, utilspp::tl::TypeList<char*, utilspp::tl::TypeList<unsigned long, utilspp::tl::TypeList<unsigned long, utilspp::NullType> > > >::operator()(char*, unsigned long, unsigned long) (Functor.hpp:106)
==12867== by 0x509B6E4: curlpp::internal::CurlHandle::executeWriteFunctor(char*, unsigned long, unsigned long) (CurlHandle.cpp:171)
==12867== by 0x509F509: curlpp::internal::Callbacks::WriteCallback(char*, unsigned long, unsigned long, curlpp::internal::CurlHandle*) (OptionSetter.cpp:47)
==12867== by 0x4E3D667: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x4E5407B: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x4E505A1: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x4E51A8F: ??? (in /usr/lib/libcurl-gnutls.so.4.1.1)
==12867== by 0x509A78B: curlpp::internal::CurlHandle::perform() (CurlHandle.cpp:52)
==12867== by 0x5093A6B: curlpp::Easy::perform() (Easy.cpp:48)
==12867== by 0x41EDC3: socketIO::processLoop() (socketIO.cpp:126)
Что бы вы предложили сделать. Я рассмотрел использование istringstream, но я не уверен, как работает его распределение памяти и будет ли он восстанавливать память, которую я уже прочитал. У меня проблема в том, что мне нужно хранить данные между обратными вызовами, но делать это так, чтобы не было утечки памяти.
UPDATE
По запросу больше кода. Я написал с мыслью, что чем больше, тем лучше.
main.cpp
/**
* The main driver for the twitter capture app. Starts multiple threads for processors, 1 io thread and 2 db threads. One for user
* information and the other for tweet information
*/
#include "types.h"
#include "threadBase.h"
#include "socketIO.h"
#include "processor.h"
#include "dbTweetQueue.h"
#include "dbUserQueue.h"
#include <vector>
stringQueue twitToProc;
tweetQueue tweetQ;
userQueue userQ;
deleteQueue deleteQ;
std::vector<ThreadBase *> threadGroup;
std::string dbBase::dbUser(DBUSER);
std::string dbBase::dbURL(DBURL);
std::string dbBase::dbPass(DBPASS);
/*
* Handle the signal for interupt
*/
void sigquit(int param)
{
std::cout<<"Received sigquit"<<std::endl;
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->interupt();
}
}
int main(int argc, char* argv[])
{
try{
//Setting the signal handler up.
struct sigaction act;
act.sa_handler = sigquit;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
sigaction(SIGQUIT, &act, 0);
int MaxThreads = 5;
if(argc < 3)
{
std::cout<<"Usage: >"<<argv[0]<<" TwitterUserName TwitterPassWord"<<std::endl;
std::cout<<"Using Defaults: "<<TWITTERACCT<<" "<<TWITTERPASS<<std::endl;
}
// Create socketIO, and add it to the thread group
if(argc == 3)
{
threadGroup.push_back(new socketIO(twitToProc, argv[1], argv[2]));
}
else
{
threadGroup.push_back(new socketIO(twitToProc));
}
// Create processorThreads and add them to the thread group
for(int i = 0; i < MaxThreads; i++)
{
threadGroup.push_back(new processor(twitToProc, tweetQ, deleteQ, userQ));
}
//Create DB Threads and add them to the thread group.
threadGroup.push_back(new dbTweetQueue(tweetQ, deleteQ));
threadGroup.push_back(new dbUserQueue(userQ));
// Start the threads
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->start();
}
// Join the threads
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->join();
}
} catch (std::exception & e) {
std::cerr << e.what() << std::endl;
}
for(unsigned int i = 0; i < threadGroup.size(); i++)
{
threadGroup[i]->();
}
return 0;
}
threadBase.h
#ifndef _THREADBASE_H
#define _THREADBASE_H
#include <boost/thread.hpp>
class ThreadBase
{
public:
virtual void join() = 0;
virtual void start() = 0;
void interupt(){thread.interrupt();}
protected:
boost::thread thread;
};
#endif /* _THREADBASE_H */
socketIO.h
#ifndef _SOCKETIO_H
#define _SOCKETIO_H
#include "types.h"
#include "threadBase.h"
#include <boost/bind.hpp>
#include <curlpp/cURLpp.hpp>
#include <curlpp/Multi.hpp>
#include <curlpp/Easy.hpp>
#include <curlpp/Options.hpp>
#include <curlpp/Exception.hpp>
#include <curlpp/Infos.hpp>
#include <curl/curl.h>
#include <signal.h>
#include <string>
#include <sstream>
#include <cstdlib>
#define defaultRepeatInterval 10;
class socketIO: public ThreadBase {
private:
int repeatInterval;
double previousDownloadSize;
int failCount;
int writeRound;
std::string userPassword;
stringQueue& queueObject;
std::string input;
public:
socketIO(stringQueue & messageQueue):
queueObject(messageQueue)
{
userPassword.append(TWITTERACCT);
userPassword.append(":");
userPassword.append(TWITTERPASS);
}
socketIO(stringQueue & messageQueue, char* userName, char* password):
queueObject(messageQueue)
{
userPassword.append(userName);
userPassword.append(":");
userPassword.append(password);
}
virtual ~socketIO();
void join();
void start();
std::auto_ptr<curlpp::Easy> createRequest(int);
void processLoop();
size_t write(char* ptr, size_t size, size_t nmemb);
int progress(double, double, double, double);
};
#endif /* _SOCKETIO_H */
socketIO.cpp
#include "socketIO.h"
socketIO::~socketIO() {
}
/*
* This method starts a new thread with the processLoop method
*/
void socketIO::start() {
thread = boost::thread(&socketIO::processLoop, this);
}
/*
* This method blocks waiting for the thread to exit
*/
void socketIO::join() {
thread.join();
}
/*
* The datacall back function for the open twitter connection.\
*/
size_t socketIO::write(char* ptr, size_t size, size_t nmemb) {
// Calculate the real size of the incoming buffer
size_t realsize = size * nmemb;
std::string temp;
temp.append(input);
temp.append(ptr, realsize);
size_t oldPosition = 0;
size_t position = 0;
position = temp.find('\r', oldPosition);
while (position != std::string::npos) {
queueObject.push(temp.substr(oldPosition, position));
++writeRound;
oldPosition = position + 1;
position = temp.find('\r', oldPosition);
}
input = temp.substr(oldPosition);
return realsize;
}
/*
* The timed callback function, called every second, used to monitor that the connection is still receiving data
* Return 1 if requesting break or data flow stops, 0 if continuing normally
*/
int socketIO::progress(double dltotal, double dlnow, double ultotal, double ulnow) {
// Allows us to break out on interruption
if (boost::this_thread::interruption_requested())
return 1;
if (dlnow == previousDownloadSize) {
if (failCount < 15)
failCount++;
else {
repeatInterval = repeatInterval * 2;
return 1;
}
} else {
repeatInterval = 10;
previousDownloadSize = dlnow;
}
return 0;
}
/*
* This method creates a new connection to the twitter service with the required settings
*/
std::auto_ptr<curlpp::Easy> socketIO::createRequest(int source) {
//Reset the input buffer when the connection is made.
input = std::string("");
std::auto_ptr<curlpp::Easy> newRequest(new curlpp::Easy);
curlpp::types::ProgressFunctionFunctor progressFunctor(this, &socketIO::progress);
newRequest->setOpt(new curlpp::options::ProgressFunction(progressFunctor));
curlpp::types::WriteFunctionFunctor functor(this, &socketIO::write);
newRequest->setOpt(new curlpp::options::WriteFunction(functor));
newRequest->setOpt(new curlpp::options::FailOnError(true));
newRequest->setOpt(new curlpp::options::NoProgress(0));
newRequest->setOpt(new curlpp::options::Verbose(true));
newRequest->setOpt(new curlpp::options::UserPwd(userPassword));
//Code for debugging and using alternate sources
std::string params = "track=basketball,football,baseball,footy,soccer";
switch (source) {
case 1: // Testing Locally
newRequest->setOpt(new curlpp::options::Url("127.0.0.1:17000"));
break;
case 2: // Filtered
newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/filter.json"));
newRequest->setOpt(new curlpp::options::PostFields(params));
newRequest->setOpt(new curlpp::options::PostFieldSize(params.size()));
break;
case 3: //Twitter Main Stream
newRequest->setOpt(new curlpp::options::Url("http://stream.twitter.com/1/statuses/sample.json"));
break;
}
return newRequest;
}
/*
* The main method of the thread. Creates a new instance of the request
*/
void socketIO::processLoop() {
repeatInterval = defaultRepeatInterval;
std::auto_ptr<curlpp::Easy> request;
while (true) {
try {
previousDownloadSize = 0;
failCount = 0;
request.reset(createRequest(3));
request->perform();
} catch (curlpp::UnknowException & e) {
std::cout << "Unknown Exception: " << e.what() << std::endl;
} catch (curlpp::RuntimeError & e) {
std::cout << "Runtime Exception: " << e.what() << std::endl;
} catch (curlpp::LogicError & e) {
std::cout << "Logic Exception: " << e.what() << std::endl;
}
if (boost::this_thread::interruption_requested())
break;
else
boost::this_thread::sleep(boost::posix_time::seconds(repeatInterval));
}
}
types.h
#ifndef _TYPES_H
#define _TYPES_H
#include <string>
#include <concurrent_queue.hpp>
#define DBUSER "****"
#define DBPASS "****"
#define DBURL "****"
#define TWITTERACCT "****"
#define TWITTERPASS "****"
typedef struct tweet {
...
} tweet;
typedef struct user {
...
} user;
typedef concurrent_queue<std::string> stringQueue;
typedef std::pair<int, std::string> dbPair;
typedef concurrent_queue<dbPair> dbQueue;
typedef concurrent_queue<tweet> tweetQueue;
typedef concurrent_queue<user> userQueue;
typedef concurrent_queue<boost::int64_t> deleteQueue;
#endif /* _TYPES_H */
concurrent_queue.hpp
#ifndef _CONCURRENT_QUEUE_
#define _CONCURRENT_QUEUE_
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <deque>
template<typename Data>
class concurrent_queue
{
private:
std::deque<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push_back(data);
lock.unlock();
the_condition_variable.notify_one();
}
bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop_front();
return true;
}
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop_front();
}
};
#endif /* _CONCURRENT_QUEUE_ */