diff --git a/src/Makefile.am b/src/Makefile.am index 75cb842..f211b7d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -19,10 +19,10 @@ dabcast_SOURCES = \ clock_ab.cc \ clock_cb.cc \ timestamp.cc \ - messagecell_ab.cc \ - messagecell_cb.cc + messagecell_ab.cc dabcast_SOURCES += \ + macros.h \ groupmember.h \ config.h \ dabroadcast.h \ @@ -37,8 +37,7 @@ dabcast_SOURCES += \ clock_cb.h \ protocol.h \ timestamp.h \ - messagecell_ab.h \ - messagecell_cb.h + messagecell_ab.h INCLUDES = -I./ @GLIBMM_CFLAGS@ @GTHREAD_CFLAGS@ diff --git a/src/highreceiver.cc b/src/highreceiver.cc index 547b95b..34b67a3 100644 --- a/src/highreceiver.cc +++ b/src/highreceiver.cc @@ -1,12 +1,15 @@ -#include "highreceiver.h" - #include #include #include #include #include +#include "macros.h" +#include "highreceiver.h" + +#define DEBUG 0 + HighReceiver::HighReceiver(short low_port){ printf("LowReceiver::LowReceiver --"); @@ -54,6 +57,8 @@ HighReceiver::~HighReceiver(){ void HighReceiver::run(){ while(1){ sleep(1); - printf("HighReceiver::run -- \n"); + pDEBUG(" "); } } + +#undef DEBUG diff --git a/src/lowreceiver.cc b/src/lowreceiver.cc index a2ff4f1..a60366d 100644 --- a/src/lowreceiver.cc +++ b/src/lowreceiver.cc @@ -1,6 +1,9 @@ +#include "macros.h" #include "lowreceiver.h" +#define DEBUG 1 + LowReceiver::LowReceiver(short port_low, short port_high, Group &grp, @@ -10,41 +13,59 @@ LowReceiver::LowReceiver(short port_low, printf("LowReceiver::LowReceiver --\n"); - _socket_desc = socket(AF_INET, SOCK_DGRAM, 0); + /* *** SOCKET DE RECEPTION *** */ + _sock_recv_fd = socket(AF_INET, SOCK_DGRAM, 0); /* et l'autre variante : AF_UNIX */ - if (_socket_desc < 0){ + if (_sock_recv_fd < 0){ /* error */ - perror("Creation de la socket impossible\n"); + perror("Creation de la socket de reception impossible\n"); fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__); // FIXME: throw something exit(-1); } - _socket_addr = new sockaddr_in; + _addr_recv = new sockaddr_in; // port_low = externe // port_high = interne - bzero(_socket_addr,sizeof(sockaddr_in)); - _socket_addr->sin_family = AF_INET; - _socket_addr->sin_port = htons(_port_low); - _socket_addr->sin_addr.s_addr = htonl(INADDR_ANY); + bzero(_addr_recv,sizeof(sockaddr_in)); + _addr_recv->sin_family = AF_INET; + _addr_recv->sin_port = htons(_port_low); + _addr_recv->sin_addr.s_addr = htonl(INADDR_ANY); // chopper une socket - if (bind(_socket_desc, - (struct sockaddr *)_socket_addr, + if (bind(_sock_recv_fd, + (struct sockaddr *)_addr_recv, sizeof(sockaddr_in)) < 0) { //FIXME : throw something perror("Attachement de la socket impossible\n"); fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__); exit(-1); } + + /* *** SOCKET D'EMISSION *** */ + _sock_send_fd = socket(AF_INET, SOCK_DGRAM, 0); + + /* et l'autre variante : AF_UNIX */ + if (_sock_send_fd < 0){ + /* FIXME throw something */ + perror("Creation de la socket d'emission impossible"); + fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__); + exit(-1); + } + + _addr_send = new sockaddr_in; + _addr_send->sin_family = AF_INET; + _addr_send->sin_addr.s_addr = htonl(INADDR_ANY); + _addr_send->sin_port = htons(_port_high); + } LowReceiver::~LowReceiver() { - delete _socket_addr; + delete _addr_recv; } void LowReceiver::run(){ @@ -58,7 +79,7 @@ void LowReceiver::run(){ memset(buffer, '\0', buffer_len); bzero(&repaddr,sizeof(struct sockaddr_in)); int read_buffer_len = recvfrom( - _socket_desc, + _sock_recv_fd, buffer, buffer_len, 0, @@ -104,3 +125,28 @@ void LowReceiver::manage(Message * mesg){ } } +void LowReceiver::deliver(Message * mesg){ + + pDEBUG("LowReceiver::delivering\n"); + + int message_len = mesg->getRawSize(); + char * message = mesg->getRaw(); + + int en; + + if ((en = ::sendto(_sock_send_fd, + message, + message_len, + 0, + (struct sockaddr*)_addr_send, + sizeof(struct sockaddr_in)) < 0)){ + + perror("sendto failed\n"); + /* error */ + throw new eGroupUnableToSend(); + } else { + pDEBUG("LowReceiver::deliver -- done\n"); + } +} + +#undef DEBUG diff --git a/src/lowreceiver.h b/src/lowreceiver.h index 76aa5d4..ac4fd63 100644 --- a/src/lowreceiver.h +++ b/src/lowreceiver.h @@ -11,8 +11,10 @@ class LowReceiver { Clock & _clock; int _port_low; int _port_high; - int _socket_desc; - struct sockaddr_in * _socket_addr; + int _sock_recv_fd; + int _sock_send_fd; + struct sockaddr_in * _addr_recv; + struct sockaddr_in * _addr_send; protected: @@ -24,6 +26,7 @@ class LowReceiver { void manage(Message * mesg); void manage_abcast(Message * mesg); void manage_cbcast(Message * mesg); + void deliver(Message * mesg); }; diff --git a/src/lowreceiver_ab.cc b/src/lowreceiver_ab.cc index 6563d7c..d293e7f 100644 --- a/src/lowreceiver_ab.cc +++ b/src/lowreceiver_ab.cc @@ -22,7 +22,7 @@ void LowReceiver::manage_abcast(Message * mesg) { //on faire la gestion du abcast/send ici, c'est plus simple que //de partager une variable+mutex avec le sender - //FIXME: chercher si l'on a déja recu ce message + // chercher si l'on a déja recu ce message MessageCellAb * cell = NULL; for (iter = fifo_send.begin(); iter != fifo_send.end(); iter++){ //on fait pointer cell sur la cellule si égale a l'id du message @@ -44,7 +44,7 @@ void LowReceiver::manage_abcast(Message * mesg) { fifo_send.push_back(cell); } - //FIXME: comparer le timestamp max a ceux que l'on recoit + // comparer le timestamp max a ceux que l'on recoit cell->count += 1; if (cell->count == _group.getCount()){ // broadcaster le nouveau timestamp du message @@ -59,6 +59,7 @@ void LowReceiver::manage_abcast(Message * mesg) { cell->maximum->getRaw(), cell->maximum->getRawSize()); _group.broadcast(*nMsg); + // FIXME: délivrer le message } } else { printf("LowReceiver::manage_abcast - Received a message from a friend\n"); @@ -78,7 +79,7 @@ void LowReceiver::manage_abcast(Message * mesg) { MessageCellAb * cell = new MessageCellAb(); cell->message = new Message(*mesg); cell->type = MessageCellAb::TYPE_TEMPORARY; - // - on retourne une estampille(reception) a l'emeteur + // - FIXME: on retourne une estampille(reception) a l'emeteur } else { // sinon diff --git a/src/lowreceiver_cb.cc b/src/lowreceiver_cb.cc index fb00762..cbe35a8 100644 --- a/src/lowreceiver_cb.cc +++ b/src/lowreceiver_cb.cc @@ -1,37 +1,66 @@ #include "lowreceiver.h" -#include "messagecell_cb.h" - void LowReceiver::manage_cbcast(Message * mesg) { - static std::list fifo_undelivered; - //static std::list fifo_send; + static std::list fifo_undelivered; + static std::list fifo_deliverable; - std::list::iterator iter; + std::list::iterator iter; printf("LowReceiver::manage_cbcast -- init\n"); // identifiant = horloge + id_site_emeteur bool iAmTheEmitter = false; - bool firstSeenMessage = true; + bool canAdjust = false; if (mesg->getStamp().getIndex() == _group.getIndex()){ iAmTheEmitter = true; } if (iAmTheEmitter){ printf("LowReceiver::manage_cbcast - Received my own message -- delivering automatically\n"); - //on faire la gestion du cbcast/send ici, c'est plus simple que - //de partager une variable+mutex avec le sender - // - _clock.adjust(mesg->getStamp()); + // ajouter dans la file des message à délivrer... + fifo_deliverable.push_back(mesg); } else { printf("LowReceiver::manage_cbcast - Received a message from a friend\n"); - _clock.adjust(mesg->getStamp()); - // FIXME: si l'horloge est ajustable - // (donc les contraintes TS_m[j] = TS_m[j]+1 && ... ), - // alors on délivre directement + canAdjust == _clock.adjust(mesg->getStamp()); + printf("LowReceiver::manage_cbcast - Can Adjust %d\n",canAdjust); + + // si l'horloge est ajustable (donc les contraintes FIFO et reception) + // alors on délivre directement + if (canAdjust){ + fifo_deliverable.push_back(mesg); + } + } + + bool foundDeliveredMsg = false; + // boucler sur la file des message non-délivrés, tant qu'on trouve un message à délivrer... + // et ajouter les délivrables à la FIFO + do { + printf("LowReceiver::manage_cbcast - looking for old deliverable messages...\n"); + Message * mesg = NULL; + foundDeliveredMsg = false; + for (iter = fifo_undelivered.begin(); iter != fifo_undelivered.end(); iter++){ + mesg = *iter; + if (_clock.adjust(mesg->getStamp())){ + // ajouter dans la file des message à délivrer + fifo_deliverable.push_back(mesg); + // supprimer le message de la liste des non-délivrés + fifo_undelivered.erase(iter); + foundDeliveredMsg = true; + printf("LowReceiver::manage_cbcast - found old deliverable...\n"); + break; + } + } + } while (foundDeliveredMsg == true); + + for (int idx = fifo_deliverable.size(); idx > 0; idx--){ + printf("LowReceiver::manage_cbcast - delivering message...\n"); + // on POP_FRONT et on send... + Message * deliverableMsg = fifo_deliverable.front(); + fifo_deliverable.pop_front(); + //FIXME: délivrer } } diff --git a/src/lowsender.cc b/src/lowsender.cc index b4f1a24..43ee79b 100644 --- a/src/lowsender.cc +++ b/src/lowsender.cc @@ -1,6 +1,9 @@ +#include +#include "macros.h" #include "lowsender.h" +#define DEBUG 0 #define MESSAGE "AB or CB cast from LowSender ? hehe...." LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type) : _group(grp), _clock(clk), _type(type) { @@ -9,18 +12,22 @@ LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type) : _group(grp), void LowSender::run(){ // thread part + srand(time(NULL)); while(1){ sleep(3); - printf("LowSender -- Broadcasting '%s'\n", MESSAGE); + if (rand() % 3 == 0) { + printf("LowSender::run -- Broadcasting '%s'\n", MESSAGE); + TimeStamp ts = _clock.inc(); + pDEBUG("Timestamp done\n"); - TimeStamp ts = _clock.inc(); - printf("LowSender::run -- Timestamp done\n"); + Message msg(_type, ts, MESSAGE, strlen(MESSAGE)); + pDEBUG("Mesg done and ready to send\n"); - Message msg(_type, ts, MESSAGE, strlen(MESSAGE)); - printf("LowSender::run -- Mesg done and ready to send\n"); - - _group.broadcast(msg); - printf("LowSender::run -- Mesg sent\n"); + _group.broadcast(msg); + pDEBUG("Mesg sent\n"); + } else { + pDEBUG("Not sending\n"); + } } } diff --git a/src/message.cc b/src/message.cc index b9a26e1..a62b267 100644 --- a/src/message.cc +++ b/src/message.cc @@ -42,6 +42,7 @@ Message::Message(void * data, int len) { // initialise message from the following data _data_size = -1; memcpy(&_data_size, (_raw + index), 2); + _data_size = ntohs(_data_size); index += 2; if (DEBUG_INPUT) diff --git a/src/messagecell_cb.cc b/src/messagecell_cb.cc deleted file mode 100644 index 7a89856..0000000 --- a/src/messagecell_cb.cc +++ /dev/null @@ -1,10 +0,0 @@ - -#include "messagecell_cb.h" - -MessageCellCb::MessageCellCb(){ - printf("MessageCellCb::MessageCellCb -- constructor\n"); - this->message = NULL; - this->type = MessageCellCb::TYPE_UNDEF; - this->count = 0; - this->maximum = NULL; -} diff --git a/src/messagecell_cb.h b/src/messagecell_cb.h deleted file mode 100644 index 7a167c4..0000000 --- a/src/messagecell_cb.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef _MESSAGE_CELL_CB -#define _MESSAGE_CELL_CB - -#include "message.h" - -class MessageCellCb { - public: - typedef enum { - TYPE_UNDEF, - TYPE_TEMPORARY, - TYPE_DEFINITIVE - } Type; - - Message * message; - MessageCellCb::Type type; - TimeStamp * maximum; - short count; - - MessageCellCb(); -}; - -#endif -