У меня есть следующий код сервера:
#include <signal.h>
#include <sys/stat.h>
#include "utils.h"
#include "kissdb.h"
#define MY_PORT 6767
#define BUF_SIZE 1160
#define KEY_SIZE 128
#define HASH_SIZE 1024
#define VALUE_SIZE 1024
#define MAX_PENDING_CONNECTIONS 10
// Definition of the operation type.
typedef enum operation {
PUT,
GET
} Operation;
// Definition of the request.
typedef struct request {
Operation operation;
char key[KEY_SIZE];
char value[VALUE_SIZE];
} Request;
// Definition of the database.
KISSDB *db = NULL;
/**
* @name parse_request - Parses a received message and generates a new request.
* @param buffer: A pointer to the received message.
*
* @return Initialized request on Success. NULL on Error.
*/
Request *parse_request(char *buffer) {
char *token = NULL;
Request *req = NULL;
// Check arguments.
if (!buffer)
return NULL;
// Prepare the request.
req = (Request *) malloc(sizeof(Request));
memset(req->key, 0, KEY_SIZE);
memset(req->value, 0, VALUE_SIZE);
// Extract the operation type.
token = strtok(buffer, ":");
if (!strcmp(token, "PUT")) {
req->operation = PUT;
} else if (!strcmp(token, "GET")) {
req->operation = GET;
} else {
free(req);
return NULL;
}
// Extract the key.
token = strtok(NULL, ":");
if (token) {
strncpy(req->key, token, KEY_SIZE);
} else {
free(req);
return NULL;
}
// Extract the value.
token = strtok(NULL, ":");
if (token) {
strncpy(req->value, token, VALUE_SIZE);
} else if (req->operation == PUT) {
free(req);
return NULL;
}
return req;
}
/*
* @name process_request - Process a client request.
* @param socket_fd: The accept descriptor.
*
* @return
*/
void process_request(const int socket_fd) {
char response_str[BUF_SIZE], request_str[BUF_SIZE];
int numbytes = 0;
Request *request = NULL;
// Clean buffers.
memset(response_str, 0, BUF_SIZE);
memset(request_str, 0, BUF_SIZE);
// receive message.
numbytes = read_str_from_socket(socket_fd, request_str, BUF_SIZE);
// parse the request.
if (numbytes) {
request = parse_request(request_str);
if (request) {
switch (request->operation) {
case GET:
// Read the given key from the database.
if (KISSDB_get(db, request->key, request->value))
sprintf(response_str, "GET ERROR\n");
else
sprintf(response_str, "GET OK: %s\n", request->value);
break;
case PUT:
// Write the given key/value pair to the database.
if (KISSDB_put(db, request->key, request->value))
sprintf(response_str, "PUT ERROR\n");
else
sprintf(response_str, "PUT OK\n");
break;
default:
// Unsupported operation.
sprintf(response_str, "UNKOWN OPERATION\n");
}
// Reply to the client.
write_str_to_socket(socket_fd, response_str, strlen(response_str));
if (request)
free(request);
request = NULL;
return;
}
}
// Send an Error reply to the client.
sprintf(response_str, "FORMAT ERROR\n");
write_str_to_socket(socket_fd, response_str, strlen(response_str));
}
/*
* @name main - The main routine.
*
* @return 0 on success, 1 on error.
*/
int main() {
int socket_fd, // listen on this socket for new connections
new_fd; // use this socket to service a new connection
socklen_t clen;
struct sockaddr_in server_addr, // my address information
client_addr; // connector's address information
// create socket
if ((socket_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
ERROR("socket()");
// Ignore the SIGPIPE signal in order to not crash when a
// client closes the connection unexpectedly.
signal(SIGPIPE, SIG_IGN);
// create socket adress of server (type, IP-adress and port number)
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // any local interface
server_addr.sin_port = htons(MY_PORT);
// bind socket to address
if (bind(socket_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1)
ERROR("bind()");
// start listening to socket for incomming connections
listen(socket_fd, MAX_PENDING_CONNECTIONS);
fprintf(stderr, "(Info) main: Listening for new connections on port %d ...\n", MY_PORT);
clen = sizeof(client_addr);
// Allocate memory for the database.
if (!(db = (KISSDB *)malloc(sizeof(KISSDB)))) {
fprintf(stderr, "(Error) main: Cannot allocate memory for the database.\n");
return 1;
}
// Open the database.
if (KISSDB_open(db, "mydb.db", KISSDB_OPEN_MODE_RWCREAT, HASH_SIZE, KEY_SIZE, VALUE_SIZE)) {
fprintf(stderr, "(Error) main: Cannot open the database.\n");
return 1;
}
// main loop: wait for new connection/requests
while (1) {
// wait for incomming connection
if ((new_fd = accept(socket_fd, (struct sockaddr *)&client_addr, &clen)) == -1) {
ERROR("accept()");
}
// got connection, serve request
fprintf(stderr, "(Info) main: Got connection from '%s'\n", inet_ntoa(client_addr.sin_addr));
process_request(new_fd);
close(new_fd);
}
// Destroy the database.
// Close the database.
KISSDB_close(db);
// Free memory.
if (db)
free(db);
db = NULL;
return 0;
}
, который обрабатывает базу данных (здесь KISSDB), которая хранит и печатает значения на основе ключа. Я хочу изменить его, добавив threads
, производителя и потребителя. Поток производителя должен принять запрос клиента, а поток потребителя выполнить его. Я думал о том, чтобы поместить все функции сокетов в функцию *producer()
, которая бы обрабатывала их и позволяла потребителю обрабатывать 'process_request()
как есть. В основном темы создаются и называются. Кажется, что он не запускается, он компилируется нормально, но при выполнении ./client -a localhost -i 1 -p
для запуска client
в терминале программа не останавливается. Я подозреваю, что
pthread_create (& consumer_thread, NULL, process_request, (void *) & thread_args.new_fd);
является проблемой. Когда я пытаюсь запустить сервер, я получаю:
(Info) main: Listening for new connections on port 6767 ...
Error in read_from_socket(). Cause: Bad file descriptor
Посмотрите, что я пробовал:
#define _POSIX_C_SOURCE 200809L
#include <inttypes.h>
#include <math.h>
#include<time.h>
#include <sys/time.h>
#include <sys/resource.h>
#include<pthread.h>
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t c = PTHREAD_COND_INITIALIZER;
#include <signal.h>
#include <sys/stat.h>
#include "utils.h"
#include "kissdb.h"
//we have to use it
#define MY_PORT 6767
#define BUF_SIZE 1160
#define KEY_SIZE 128
#define HASH_SIZE 1024
#define VALUE_SIZE 1024
#define MAX_PENDING_CONNECTIONS 10
// =======================================
//struct timespec { // INCLUDE IN <time.h> ?
// time_t tv_sec; /* seconds */
// long tv_nsec; /* nanoseconds */
//};
/*
int socket_fd, // listen on this socket for new connections
new_fd; // use this socket to service a new connection
socklen_t clen;
struct sockaddr_in server_addr, // my address information
client_addr; // connector's address information
*/
struct data {
int socket_fd; // listen on this socket for new connections
int new_fd; // use this socket to service a new connection
};
// Definition of the operation type.
typedef enum operation {
PUT,
GET
} Operation;
// Definition of the request.
typedef struct request {
Operation operation;
char key[KEY_SIZE];
char value[VALUE_SIZE];
} Request;
// Definition of the database.
KISSDB *db = NULL;
/**
* @name parse_request - Parses a received message and generates a new request.
* @param buffer: A pointer to the received message.
*
* @return Initialized request on Success. NULL on Error.
*/
Request *parse_request(char *buffer) {
char *token = NULL;
Request *req = NULL;
// Check arguments.
if (!buffer)
return NULL;
// Prepare the request.
req = (Request *) malloc(sizeof(Request));
memset(req->key, 0, KEY_SIZE);
memset(req->value, 0, VALUE_SIZE);
// Extract the operation type.
token = strtok(buffer, ":");
if (!strcmp(token, "PUT")) {
req->operation = PUT;
} else if (!strcmp(token, "GET")) {
req->operation = GET;
} else {
free(req);
return NULL;
}
// Extract the key.
token = strtok(NULL, ":");
if (token) {
strncpy(req->key, token, KEY_SIZE);
} else {
free(req);
return NULL;
}
// Extract the value.
token = strtok(NULL, ":");
if (token) {
strncpy(req->value, token, VALUE_SIZE);
} else if (req->operation == PUT) {
free(req);
return NULL;
}
return req;
}
/*
* @name process_request - Process a client request.
* @param socket_fd: The accept descriptor.
*
* @return
*/
//for consumer
void *process_request(void * socket_fd) {
char response_str[BUF_SIZE], request_str[BUF_SIZE];
int numbytes = 0;
Request *request = NULL;
// Clean buffers.
memset(response_str, 0, BUF_SIZE);
memset(request_str, 0, BUF_SIZE);
// receive message.
numbytes = read_str_from_socket(socket_fd, request_str, BUF_SIZE);
// parse the request.
if (numbytes) {
request = parse_request(request_str);
if (request) {
switch (request->operation) {
case GET:
// Read the given key from the database.
if (KISSDB_get(db, request->key, request->value))
sprintf(response_str, "GET ERROR\n");
else
sprintf(response_str, "GET OK: %s\n", request->value);
break;
case PUT:
// Write the given key/value pair to the database.
if (KISSDB_put(db, request->key, request->value))
sprintf(response_str, "PUT ERROR\n");
else
sprintf(response_str, "PUT OK\n");
break;
default:
// Unsupported operation.
sprintf(response_str, "UNKOWN OPERATION\n");
}
// Reply to the client.
write_str_to_socket(socket_fd, response_str, strlen(response_str));
if (request)
free(request);
request = NULL;
return;
}
}
// Send an Error reply to the client.
sprintf(response_str, "FORMAT ERROR\n");
write_str_to_socket(socket_fd, response_str, strlen(response_str));
}
///////////////////////////////////////////////////////////////////////////////
void *producer(void *arg) {
// int socket_fd, // listen on this socket for new connections
// new_fd; // use this socket to service a new connection
socklen_t clen;
struct sockaddr_in server_addr, // my address information
client_addr; // connector's address information
struct data *d = (struct data *) arg;
// create socket
if ((d->socket_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
ERROR("socket()");
// Ignore the SIGPIPE signal in order to not crash when a
// client closes the connection unexpectedly.
signal(SIGPIPE, SIG_IGN);
// create socket adress of server (type, IP-adress and port number)
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // any local interface
server_addr.sin_port = htons(MY_PORT);
// bind socket to address
if (bind(d->socket_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1)
ERROR("bind()");
// start listening to socket for incomming connections
listen(d->socket_fd, MAX_PENDING_CONNECTIONS);
fprintf(stderr, "(Info) main: Listening for new connections on port %d ...\n", MY_PORT);
clen = sizeof(client_addr);
// wait for incomming connection EDW 8A KANOUME TA THREADS ?
if ((d->new_fd = accept(d->socket_fd, (struct sockaddr *)&client_addr, &clen)) == -1) { // to client_socket pou 8a peira3oume
ERROR("accept()");
}
//clock_gettime(CLOCK_REALTIME, &spec);
//s = spec.tv_sec;
//ms = spec.tv_nsec;
//fprintf(stderr, "Current time: %li . %03ld seconds since the connection\n",(long)s, ms);
// got connection, serve request
fprintf(stderr, "(Info) main: Got connection from '%s'\n", inet_ntoa(client_addr.sin_addr));
}
/*
* @name main - The main routine.
*
* @return 0 on success, 1 on error.
*/
int main() {
long ms; // Milliseconds
time_t s; // Seconds
struct timespec spec;
struct data thread_args;
// Allocate memory for the database.
if (!(db = (KISSDB *)malloc(sizeof(KISSDB)))) {
fprintf(stderr, "(Error) main: Cannot allocate memory for the database.\n");
return 1;
}
// Open the database.
if (KISSDB_open(db, "mydb.db", KISSDB_OPEN_MODE_RWCREAT, HASH_SIZE, KEY_SIZE, VALUE_SIZE)) {
fprintf(stderr, "(Error) main: Cannot open the database.\n");
return 1;
}
pthread_t producer_thread;
pthread_create(&producer_thread,NULL,producer,(void *) &thread_args);
// main loop: wait for new connection/requests
while (1) {
//process_request(new_fd); // nai !
/*teo's new code , for (i) part of project */
pthread_t consumer_thread;
pthread_create(&consumer_thread,NULL,process_request,(void*)&thread_args.new_fd);
pthread_join(consumer_thread,NULL); // necessary
/* */
//close(new_fd);
}
// Destroy the database.
// Close the database.
KISSDB_close(db);
// Free memory.
if (db)
free(db);
db = NULL;
return 0;
}