У меня есть код, который запускает два потока,
Первый поток ожидает на sender
данные, используя recv()
, а затем перенаправляет данные в receiver
, используя send
.
Второй поток ожидает на receiver
данные, используя recv()
, а затем перенаправляет данные в sender
, используя send
.
. Важно, чтобы оба этих метода работали параллельно.
Предположим, что отправитель отключается, первый поток обнаруживает это и закрывает соединение.
Как мне сообщить второму потоку, который все еще ожидает на приемнике, что соединение было закрыто и не более требуется связь?
recieve_packet
реализован с использованием recv()
.
send_packet
реализован с использованием send()
.
sender_fd
- это сокет дескриптор файла отправителя.
reciever_fd
- дескриптор файла сокета получателя.
void* sender_to_reciever(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=recieve_packet(sender_fd,&p))!=0){
send_packet(reciever_fd,p);
}
close(sender_fd);
}
void* reciever_to_sender(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=recieve_packet(reciever_fd,&p))!=0){
send_packet(sender_fd,p);
}
close(reciever_fd);
}
Я не хочу менять реализацию send_packet
и recieve_packet
вызовы функций.
Я пытался закрыть оба * 10 40 * и reciever_d
, если либо, пока l oop выходит. Однако это не сработало.
Код для канала. c, который обрабатывает отправителя и получателя: -
#include "packets.c"
#define SERVER_PORT "8642"
#define QUEUE_LENGTH 10
void handle_connection(int);
void* sender_to_receiver();
void* receiver_to_sender();
int open_outgoing_connection(char*, char*);
int sender_fd;
int receiver_fd;
int main(int argc, char* argv[]){
int sock_fd,new_fd,rv,yes;yes=1;
struct addrinfo hints,*res;
struct sockaddr_storage client_addr;
socklen_t addr_size;
char client_details[INET6_ADDRSTRLEN];
struct sigaction sa;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if((rv=getaddrinfo(NULL, SERVER_PORT, &hints, &res))!=0){
printf("Error getaddrinfo : %s\n",gai_strerror(rv));
return 1;
}
if((sock_fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol))==-1){
printf("Error socket file descriptor\n");
return 1;
}
if(setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1){
printf("Error setsockopt\n");
return 1;
}
if(bind(sock_fd,res->ai_addr,res->ai_addrlen)==-1){
close(sock_fd);
printf("Error bind\n");
return 1;
}
if(listen(sock_fd, QUEUE_LENGTH)==-1){
printf("Error listen\n");
return 1;
}
freeaddrinfo(res);
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if(sigaction(SIGCHLD,&sa,NULL) == -1){
printf("Error sigaction\n");
return 1;
}
// Now we have prepared the socket(ip+port) for accepting incoming connections.
printf("Server PID : %d\n",getpid());
while(1){
addr_size = sizeof client_addr;
new_fd = accept(sock_fd,(struct sockaddr*)&client_addr, &addr_size);
if(new_fd==-1){
printf("Error Accepting Request %d\n",getpid());
return 1;
}
inet_ntop(client_addr.ss_family,get_in_addr((struct sockaddr*)&client_addr),client_details,sizeof client_details);
if(!fork()){ // Child Process
close(sock_fd);
printf("Connection Accepted From : %s by PID:%d\n",client_details,getpid());
handle_connection(new_fd);
exit(0);
}
}
return 1;
}
void handle_connection(int socket_sender){
packet* p = malloc(sizeof(packet));
int t1;
if((t1=receive_packet(socket_sender, &p))==0){
printf("Closed Connection\n");
}else{
int socket_receiver;
if((socket_receiver=open_outgoing_connection(p->destination_ip,p->destination_port))!=-1){
sender_fd = socket_sender;
receiver_fd = socket_receiver;
pthread_t str,rts;
str = pthread_self();
rts = pthread_self();
pthread_create(&str,NULL,sender_to_receiver,NULL);
pthread_create(&rts,NULL,receiver_to_sender,NULL);
pthread_join(str,NULL);
pthread_join(rts,NULL);
}else{
printf("Error Connecting to receiver\n");
}
}
}
void* sender_to_receiver(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=receive_packet(sender_fd,&p))!=0){
printf("SENDER\n");
display_packet(p);
send_packet(receiver_fd,p);
}
printf("Sender Disconnected\n");
close(sender_fd);close(receiver_fd);
}
void* receiver_to_sender(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=receive_packet(receiver_fd,&p))!=0){
printf("Receiver\n");
display_packet(p);
send_packet(sender_fd,p);
}
printf("Receiver Disconnected\n");
close(receiver_fd);close(sender_fd);
}
int open_outgoing_connection(char* ip, char* port){
int gai;
char server_ip[100];memset(server_ip,'\0',sizeof(server_ip));
struct addrinfo hints,*server;
memset(&hints,0,sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
int socket_fd;
if((gai=getaddrinfo(ip,port,&hints,&server)) != 0){
printf("GetAddrInfo Error: %s\n",gai_strerror(gai));
return -1;
}
if((socket_fd = socket(server->ai_family, server->ai_socktype, server->ai_protocol)) == -1){
printf("Socket Error\n");
return -1;
}
if(connect(socket_fd,server->ai_addr,server->ai_addrlen) == -1){
printf("Connect Error\n");
return -1;
}
freeaddrinfo(server);
inet_ntop(server->ai_family, get_in_addr((struct sockaddr*)server->ai_addr), server_ip, sizeof(server_ip));
printf("Connected to: %s\n",server_ip);
return socket_fd;
}
Код для получателя: -
#include "packets.c"
#define QUEUE_LENGTH 10
void handle_connection(int);
int main(int argc, char* argv[]){
if(argc!=2){
printf("Enter PORT\n");
return 1;
}
int sock_fd,new_fd,rv,yes;yes=1;
struct addrinfo hints,*res;
struct sockaddr_storage client_addr;
socklen_t addr_size;
char client_details[INET6_ADDRSTRLEN];
struct sigaction sa;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if((rv=getaddrinfo(NULL, argv[1], &hints, &res))!=0){
printf("Error getaddrinfo : %s\n",gai_strerror(rv));
return 1;
}
if((sock_fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol))==-1){
printf("Error socket file descriptor\n");
return 1;
}
if(setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1){
printf("Error setsockopt\n");
return 1;
}
if(bind(sock_fd,res->ai_addr,res->ai_addrlen)==-1){
close(sock_fd);
printf("Error bind\n");
return 1;
}
if(listen(sock_fd, QUEUE_LENGTH)==-1){
printf("Error listen\n");
return 1;
}
freeaddrinfo(res);
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if(sigaction(SIGCHLD,&sa,NULL) == -1){
printf("Error sigaction\n");
return 1;
}
// Now we have prepared the socket(ip+port) for accepting incoming connections.
printf("Server PID : %d\n",getpid());
while(1){
addr_size = sizeof client_addr;
new_fd = accept(sock_fd,(struct sockaddr*)&client_addr, &addr_size);
if(new_fd==-1){
printf("Error Accepting Request %d\n",getpid());
return 1;
}
inet_ntop(client_addr.ss_family,get_in_addr((struct sockaddr*)&client_addr),client_details,sizeof client_details);
if(!fork()){ // Child Process
close(sock_fd);
printf("Connection Accepted From : %s by PID:%d\n",client_details,getpid());
handle_connection(new_fd);
exit(0);
}
close(new_fd);
}
return 1;
}
void handle_connection(int socket_fd){
int t1;packet* p = malloc(sizeof(packet));
while((t1=receive_packet(socket_fd,&p))!=0){
display_packet(p);
p->message = "ACK";
p->timestamp = get_time_in_ns();
send_packet(socket_fd,p);
}
printf("Sender Disconnected\n");
}
Код для отправителя: -
#include "packets.c"
#define CHANNEL_PORT "8642"
#define CHANNEL_IP "127.0.0.1"
int socket_fd;
char* destination_ip;
char* destination_port;
int number_of_packets;
char message[MESSAGE_BUFFER_LEN];
void prepare_packet_header(packet *p){
p->destination_ip=destination_ip;
p->destination_port=destination_port;
p->timestamp = get_time_in_ns();
p->length=0;
}
void divide_message_and_send_packets(){
if(number_of_packets>strlen(message)){
number_of_packets = strlen(message);
}
int indi_len = strlen(message)/number_of_packets;
int lm = 0;int i,j;
packet* all_packets[number_of_packets];
for(i=0;i<number_of_packets;i+=1)all_packets[i]=malloc(sizeof(packet));
// HANDSHAKE
prepare_packet_header(all_packets[0]);
all_packets[0]->message="SYN";
all_packets[0]->uid=-1;
send_packet(socket_fd,all_packets[0]);
// HANDSHAKE OVER
for(i=0;i<number_of_packets;i+=1){
// printf("Processing Packet: %d with message[%d:%d]\n",i,lm,lm+indi_len);
if(i!=number_of_packets-1){
char temp[indi_len+1];memset(temp,'\0', sizeof temp);
for(j=lm;j<lm+indi_len;j+=1)temp[j-lm]=message[j];
all_packets[i]->message = malloc(sizeof temp);
strcpy(all_packets[i]->message, temp);
all_packets[i]->uid = i;
}else{
char temp[strlen(message)-lm+1];memset(temp,'\0', sizeof temp);
for(j=lm;j<strlen(message);j+=1)temp[j-lm]=message[j];
all_packets[i]->message = malloc(sizeof temp);
strcpy(all_packets[i]->message, temp);
all_packets[i]->uid = i;
}
prepare_packet_header(all_packets[i]);lm+=indi_len;
display_packet(all_packets[i]);
send_packet(socket_fd,all_packets[i]);
}
}
void main(int argc,char* argv[]){
if(argc!=5){
printf("Enter DESTINATION_IP DESTINATION_PORT NUMBER_OF_PACKETS MESSAGE\n");
return;
}
strcpy(message,argv[4]);
number_of_packets=atoi(argv[3]);
destination_ip=malloc(sizeof argv[1] + 1);memset(destination_ip,'\0', sizeof destination_ip);
destination_port=malloc(sizeof argv[2] + 1);memset(destination_port,'\0', sizeof destination_port);
strcpy(destination_ip,argv[1]);
strcpy(destination_port,argv[2]);
int gai;
char server_ip[100];memset(server_ip,'\0',sizeof(server_ip));
struct addrinfo hints,*server;
memset(&hints,0,sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if((gai=getaddrinfo(CHANNEL_IP,CHANNEL_PORT,&hints,&server)) != 0){
printf("GetAddrInfo Error: %s\n",gai_strerror(gai));
return;
}
if((socket_fd = socket(server->ai_family, server->ai_socktype, server->ai_protocol)) == -1){
printf("Socket Error\n");
}
if(connect(socket_fd,server->ai_addr,server->ai_addrlen) == -1){
printf("Connect Error\n");
}
freeaddrinfo(server);
inet_ntop(server->ai_family, get_in_addr((struct sockaddr*)server->ai_addr), server_ip, sizeof(server_ip));
printf("Connected to: %s\n",server_ip);
divide_message_and_send_packets();
while(1){} // busy wait taht simulates future work
}
Код для пакетов. c (Содержит функции, связанные с пакетами): -
#include "helper.c"
typedef struct StupidAssignment{
long length;
char* destination_ip;
char* destination_port;
long timestamp;
long uid;
char* message;
}packet;
int receive_packet(int socket,packet** p1){
packet* p = *p1;
int remaining=0;int i;
int received=0;
long content_length=0;
remaining=11;
char buffer[MESSAGE_BUFFER_LEN];memset(buffer,'\0',sizeof(buffer));
while(remaining>0){
int t1 = recv(socket, buffer+received, remaining, 0);
if(t1==0)return 0;
remaining-=t1;
received+=t1;
}
content_length = read_long(buffer, received);
received=0;
remaining=content_length;p->length=content_length;
memset(buffer,'\0',sizeof(buffer));
while(remaining>0){
int t1 = recv(socket, buffer+received, remaining, 0);
if(t1==0)return 0;
remaining-=t1;
received+=t1;
}
char part[MESSAGE_BUFFER_LEN];memset(part,'\0',sizeof(part));int part_len=0;int nlmkr=0;
for(i=0;i<=content_length;i+=1){
if(buffer[i]=='\n' || i==content_length){
nlmkr+=1;
if(nlmkr==1) read_char(&(p->destination_ip), part, part_len);
else if(nlmkr==2) read_char(&(p->destination_port), part, part_len);
else if(nlmkr==3) p->timestamp = read_long(part, part_len);
else if(nlmkr==4) p->uid = read_long(part, part_len);
else if(nlmkr==6) read_char(&(p->message), part, part_len);
part_len=0;memset(part, '\0', sizeof part);
}else{
part[part_len++]=buffer[i];
}
}
return 1;
}
void send_packet(int socket,packet *p){
char temp[MESSAGE_BUFFER_LEN];memset(temp,'\0',sizeof temp);
strcat(temp,p->destination_ip);strcat(temp,"\n");
strcat(temp,p->destination_port);strcat(temp,"\n");
snprintf(temp+strlen(temp),100,"%ld\n",p->timestamp);
snprintf(temp+strlen(temp),100,"%ld\n",p->uid);
// write_long(p->timestamp,temp);strcat(temp,"\n");
// write_long(p->uid,temp);strcat(temp,"\n");
strcat(temp,"\n");
strcat(temp,p->message);
char buffer[MESSAGE_BUFFER_LEN];memset(buffer, '\0', sizeof buffer);
p->length = strlen(temp);
snprintf(buffer,100,"%10ld\n",strlen(temp));
strcat(buffer, temp);
sendAll(buffer,socket);
}
void display_packet(packet* p){
printf("----PACKET START----\n");
printf("%ld\n",p->length);
printf("%s\n",p->destination_ip);
printf("%s\n",p->destination_port);
printf("%ld\n",p->timestamp);
printf("%ld\n",p->uid);
printf("%s\n",p->message);
printf("----PACKET END-----\n");
}
Код для помощника. c (Некоторые функции используются всеми другими кодами): -
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <poll.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <time.h>
#include <sys/stat.h>
#include <ctype.h>
#include <fcntl.h>
#include <pthread.h>
#define MESSAGE_BUFFER_LEN 20480
void read_char(char** into, char* from, int length){
*into = malloc(length+1);memset(*into, '\0', sizeof *into);
strcpy(*into, from);
// printf("%s\n",into);
}
long read_long(char* from, int length){
int i;long temp=0;
for(i=0;i<length;i+=1){
if(isdigit(*(from+i))){
temp=temp*10;temp+=(long)(*(from+i) - '0');
}
}
return temp;
}
void write_long(long t1,char* m){
int mkr=0;
char temp[100];memset(temp,'\0',sizeof temp);
while(t1!=0){
temp[mkr] = ((int)(t1%10)) + '0';
t1 = t1/10;
}
for(mkr=strlen(temp)-1;mkr>=0;mkr-=1){
*(m+strlen(m))=temp[mkr];
}
}
void *get_in_addr(struct sockaddr* sa){
if(sa->sa_family == AF_INET){
return &(((struct sockaddr_in *)sa)->sin_addr);
}else{
return &(((struct sockaddr_in6*)sa)->sin6_addr);
}
}
void sigchld_handler(int s){
int saved_errno=errno;
while(waitpid(-1, NULL, WNOHANG) > 0);
errno = saved_errno;
}
int sendAll(char* data_to_send,int socket_fd){
int bytesleft = strlen(data_to_send);
int total=0;int n;
while(bytesleft>0){
n = send(socket_fd,data_to_send + total, bytesleft, 0);
if(n==-1)break;
total+=n;
bytesleft-=n;
}
return n==-1?-1:0;
}
long get_time_in_ns(){
struct timespec start;clock_gettime(CLOCK_REALTIME,&start);
long ct = ((long)start.tv_sec)*1e9 + ((long)start.tv_nsec);
return ct;
}
Код вообще не задокументирован.