m2.mbcp/src/lowreceiver.cc
2006-03-06 15:16:12 +00:00

153 lines
3.3 KiB
C++

#include "macros.h"
#include "lowreceiver.h"
#define DEBUG 1
LowReceiver::LowReceiver(short port_low,
short port_high,
Group &grp,
Clock &clk) :
_port_low(port_low), _port_high(port_high), _group(grp), _clock(clk)
{
printf("LowReceiver::LowReceiver --\n");
/* *** SOCKET DE RECEPTION *** */
_sock_recv_fd = socket(AF_INET, SOCK_DGRAM, 0);
/* et l'autre variante : AF_UNIX */
if (_sock_recv_fd < 0){
/* error */
perror("Creation de la socket de reception impossible\n");
fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__);
// FIXME: throw something
exit(-1);
}
_addr_recv = new sockaddr_in;
// port_low = externe
// port_high = interne
bzero(_addr_recv,sizeof(sockaddr_in));
_addr_recv->sin_family = AF_INET;
_addr_recv->sin_port = htons(_port_low);
_addr_recv->sin_addr.s_addr = htonl(INADDR_ANY);
// chopper une socket
if (bind(_sock_recv_fd,
(struct sockaddr *)_addr_recv,
sizeof(sockaddr_in)) < 0) {
//FIXME : throw something
perror("Attachement de la socket impossible\n");
fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__);
exit(-1);
}
/* *** SOCKET D'EMISSION *** */
_sock_send_fd = socket(AF_INET, SOCK_DGRAM, 0);
/* et l'autre variante : AF_UNIX */
if (_sock_send_fd < 0){
/* FIXME throw something */
perror("Creation de la socket d'emission impossible");
fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__);
exit(-1);
}
_addr_send = new sockaddr_in;
_addr_send->sin_family = AF_INET;
_addr_send->sin_addr.s_addr = htonl(INADDR_ANY);
_addr_send->sin_port = htons(_port_high);
}
LowReceiver::~LowReceiver() {
delete _addr_recv;
}
void LowReceiver::run(){
int buffer_len = 1500;
char * buffer = new char[buffer_len];
struct sockaddr_in repaddr;
socklen_t sockaddr_len = sizeof(struct sockaddr_in);
Message * mesg = NULL;
while(1){
// recevoir les données
memset(buffer, '\0', buffer_len);
bzero(&repaddr,sizeof(struct sockaddr_in));
int read_buffer_len = recvfrom(
_sock_recv_fd,
buffer,
buffer_len,
0,
(struct sockaddr*)&repaddr,
&sockaddr_len);
mesg = new Message(buffer, read_buffer_len);
printf("LowReceiver::run -- READ size %d\n", read_buffer_len);
char * str = new char[mesg->getDataSize() + 1];
strncpy(str, mesg->getData(), mesg->getDataSize());
str[mesg->getDataSize()] = '\0';
printf("LowReceiver::run -- READ '%s'\n", str);
this->manage(mesg);
delete(mesg);
mesg = NULL;
// on dispatche les données en fonction du protocole...
}
}
void LowReceiver::manage(Message * mesg){
switch(mesg->getType()){
case Protocol::TYPE_TEST :
{
printf("LowReceiver::manage -- NOT IMPLEMENTED\n");
}
break;
case Protocol::TYPE_ABCAST :
{
this->manage_abcast(mesg);
}
break;
case Protocol::TYPE_CBCAST :
{
this->manage_cbcast(mesg);
}
break;
default:
printf("LowReceiver::manage -- ERROR\n");
break;
}
}
void LowReceiver::deliver(Message * mesg){
pDEBUG("LowReceiver::delivering\n");
int message_len = mesg->getRawSize();
char * message = mesg->getRaw();
int en;
if ((en = ::sendto(_sock_send_fd,
message,
message_len,
0,
(struct sockaddr*)_addr_send,
sizeof(struct sockaddr_in)) < 0)){
perror("sendto failed\n");
/* error */
throw new eGroupUnableToSend();
} else {
pDEBUG("LowReceiver::deliver -- done\n");
}
}
#undef DEBUG