This commit is contained in:
glenux 2006-03-06 15:16:12 +00:00
parent cdf102089a
commit a30edadac9
10 changed files with 137 additions and 79 deletions

View file

@ -19,10 +19,10 @@ dabcast_SOURCES = \
clock_ab.cc \
clock_cb.cc \
timestamp.cc \
messagecell_ab.cc \
messagecell_cb.cc
messagecell_ab.cc
dabcast_SOURCES += \
macros.h \
groupmember.h \
config.h \
dabroadcast.h \
@ -37,8 +37,7 @@ dabcast_SOURCES += \
clock_cb.h \
protocol.h \
timestamp.h \
messagecell_ab.h \
messagecell_cb.h
messagecell_ab.h
INCLUDES = -I./ @GLIBMM_CFLAGS@ @GTHREAD_CFLAGS@

View file

@ -1,12 +1,15 @@
#include "highreceiver.h"
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "macros.h"
#include "highreceiver.h"
#define DEBUG 0
HighReceiver::HighReceiver(short low_port){
printf("LowReceiver::LowReceiver --");
@ -54,6 +57,8 @@ HighReceiver::~HighReceiver(){
void HighReceiver::run(){
while(1){
sleep(1);
printf("HighReceiver::run -- \n");
pDEBUG(" ");
}
}
#undef DEBUG

View file

@ -1,6 +1,9 @@
#include "macros.h"
#include "lowreceiver.h"
#define DEBUG 1
LowReceiver::LowReceiver(short port_low,
short port_high,
Group &grp,
@ -10,41 +13,59 @@ LowReceiver::LowReceiver(short port_low,
printf("LowReceiver::LowReceiver --\n");
_socket_desc = socket(AF_INET, SOCK_DGRAM, 0);
/* *** SOCKET DE RECEPTION *** */
_sock_recv_fd = socket(AF_INET, SOCK_DGRAM, 0);
/* et l'autre variante : AF_UNIX */
if (_socket_desc < 0){
if (_sock_recv_fd < 0){
/* error */
perror("Creation de la socket impossible\n");
perror("Creation de la socket de reception impossible\n");
fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__);
// FIXME: throw something
exit(-1);
}
_socket_addr = new sockaddr_in;
_addr_recv = new sockaddr_in;
// port_low = externe
// port_high = interne
bzero(_socket_addr,sizeof(sockaddr_in));
_socket_addr->sin_family = AF_INET;
_socket_addr->sin_port = htons(_port_low);
_socket_addr->sin_addr.s_addr = htonl(INADDR_ANY);
bzero(_addr_recv,sizeof(sockaddr_in));
_addr_recv->sin_family = AF_INET;
_addr_recv->sin_port = htons(_port_low);
_addr_recv->sin_addr.s_addr = htonl(INADDR_ANY);
// chopper une socket
if (bind(_socket_desc,
(struct sockaddr *)_socket_addr,
if (bind(_sock_recv_fd,
(struct sockaddr *)_addr_recv,
sizeof(sockaddr_in)) < 0) {
//FIXME : throw something
perror("Attachement de la socket impossible\n");
fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__);
exit(-1);
}
/* *** SOCKET D'EMISSION *** */
_sock_send_fd = socket(AF_INET, SOCK_DGRAM, 0);
/* et l'autre variante : AF_UNIX */
if (_sock_send_fd < 0){
/* FIXME throw something */
perror("Creation de la socket d'emission impossible");
fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__);
exit(-1);
}
_addr_send = new sockaddr_in;
_addr_send->sin_family = AF_INET;
_addr_send->sin_addr.s_addr = htonl(INADDR_ANY);
_addr_send->sin_port = htons(_port_high);
}
LowReceiver::~LowReceiver() {
delete _socket_addr;
delete _addr_recv;
}
void LowReceiver::run(){
@ -58,7 +79,7 @@ void LowReceiver::run(){
memset(buffer, '\0', buffer_len);
bzero(&repaddr,sizeof(struct sockaddr_in));
int read_buffer_len = recvfrom(
_socket_desc,
_sock_recv_fd,
buffer,
buffer_len,
0,
@ -104,3 +125,28 @@ void LowReceiver::manage(Message * mesg){
}
}
void LowReceiver::deliver(Message * mesg){
pDEBUG("LowReceiver::delivering\n");
int message_len = mesg->getRawSize();
char * message = mesg->getRaw();
int en;
if ((en = ::sendto(_sock_send_fd,
message,
message_len,
0,
(struct sockaddr*)_addr_send,
sizeof(struct sockaddr_in)) < 0)){
perror("sendto failed\n");
/* error */
throw new eGroupUnableToSend();
} else {
pDEBUG("LowReceiver::deliver -- done\n");
}
}
#undef DEBUG

View file

@ -11,8 +11,10 @@ class LowReceiver {
Clock & _clock;
int _port_low;
int _port_high;
int _socket_desc;
struct sockaddr_in * _socket_addr;
int _sock_recv_fd;
int _sock_send_fd;
struct sockaddr_in * _addr_recv;
struct sockaddr_in * _addr_send;
protected:
@ -24,6 +26,7 @@ class LowReceiver {
void manage(Message * mesg);
void manage_abcast(Message * mesg);
void manage_cbcast(Message * mesg);
void deliver(Message * mesg);
};

View file

@ -22,7 +22,7 @@ void LowReceiver::manage_abcast(Message * mesg) {
//on faire la gestion du abcast/send ici, c'est plus simple que
//de partager une variable+mutex avec le sender
//FIXME: chercher si l'on a déja recu ce message
// chercher si l'on a déja recu ce message
MessageCellAb * cell = NULL;
for (iter = fifo_send.begin(); iter != fifo_send.end(); iter++){
//on fait pointer cell sur la cellule si égale a l'id du message
@ -44,7 +44,7 @@ void LowReceiver::manage_abcast(Message * mesg) {
fifo_send.push_back(cell);
}
//FIXME: comparer le timestamp max a ceux que l'on recoit
// comparer le timestamp max a ceux que l'on recoit
cell->count += 1;
if (cell->count == _group.getCount()){
// broadcaster le nouveau timestamp du message
@ -59,6 +59,7 @@ void LowReceiver::manage_abcast(Message * mesg) {
cell->maximum->getRaw(),
cell->maximum->getRawSize());
_group.broadcast(*nMsg);
// FIXME: délivrer le message
}
} else {
printf("LowReceiver::manage_abcast - Received a message from a friend\n");
@ -78,7 +79,7 @@ void LowReceiver::manage_abcast(Message * mesg) {
MessageCellAb * cell = new MessageCellAb();
cell->message = new Message(*mesg);
cell->type = MessageCellAb::TYPE_TEMPORARY;
// - on retourne une estampille(reception) a l'emeteur
// - FIXME: on retourne une estampille(reception) a l'emeteur
} else {
// sinon

View file

@ -1,37 +1,66 @@
#include "lowreceiver.h"
#include "messagecell_cb.h"
void LowReceiver::manage_cbcast(Message * mesg) {
static std::list<MessageCellCb *> fifo_undelivered;
//static std::list<MessageCellCb *> fifo_send;
static std::list<Message *> fifo_undelivered;
static std::list<Message *> fifo_deliverable;
std::list<MessageCellCb *>::iterator iter;
std::list<Message *>::iterator iter;
printf("LowReceiver::manage_cbcast -- init\n");
// identifiant = horloge + id_site_emeteur
bool iAmTheEmitter = false;
bool firstSeenMessage = true;
bool canAdjust = false;
if (mesg->getStamp().getIndex() == _group.getIndex()){
iAmTheEmitter = true;
}
if (iAmTheEmitter){
printf("LowReceiver::manage_cbcast - Received my own message -- delivering automatically\n");
//on faire la gestion du cbcast/send ici, c'est plus simple que
//de partager une variable+mutex avec le sender
//
_clock.adjust(mesg->getStamp());
// ajouter dans la file des message à délivrer...
fifo_deliverable.push_back(mesg);
} else {
printf("LowReceiver::manage_cbcast - Received a message from a friend\n");
_clock.adjust(mesg->getStamp());
// FIXME: si l'horloge est ajustable
// (donc les contraintes TS_m[j] = TS_m[j]+1 && ... ),
// alors on délivre directement
canAdjust == _clock.adjust(mesg->getStamp());
printf("LowReceiver::manage_cbcast - Can Adjust %d\n",canAdjust);
// si l'horloge est ajustable (donc les contraintes FIFO et reception)
// alors on délivre directement
if (canAdjust){
fifo_deliverable.push_back(mesg);
}
}
bool foundDeliveredMsg = false;
// boucler sur la file des message non-délivrés, tant qu'on trouve un message à délivrer...
// et ajouter les délivrables à la FIFO
do {
printf("LowReceiver::manage_cbcast - looking for old deliverable messages...\n");
Message * mesg = NULL;
foundDeliveredMsg = false;
for (iter = fifo_undelivered.begin(); iter != fifo_undelivered.end(); iter++){
mesg = *iter;
if (_clock.adjust(mesg->getStamp())){
// ajouter dans la file des message à délivrer
fifo_deliverable.push_back(mesg);
// supprimer le message de la liste des non-délivrés
fifo_undelivered.erase(iter);
foundDeliveredMsg = true;
printf("LowReceiver::manage_cbcast - found old deliverable...\n");
break;
}
}
} while (foundDeliveredMsg == true);
for (int idx = fifo_deliverable.size(); idx > 0; idx--){
printf("LowReceiver::manage_cbcast - delivering message...\n");
// on POP_FRONT et on send...
Message * deliverableMsg = fifo_deliverable.front();
fifo_deliverable.pop_front();
//FIXME: délivrer
}
}

View file

@ -1,6 +1,9 @@
#include <time.h>
#include "macros.h"
#include "lowsender.h"
#define DEBUG 0
#define MESSAGE "AB or CB cast from LowSender ? hehe...."
LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type) : _group(grp), _clock(clk), _type(type) {
@ -9,18 +12,22 @@ LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type) : _group(grp),
void LowSender::run(){
// thread part
srand(time(NULL));
while(1){
sleep(3);
printf("LowSender -- Broadcasting '%s'\n", MESSAGE);
if (rand() % 3 == 0) {
printf("LowSender::run -- Broadcasting '%s'\n", MESSAGE);
TimeStamp ts = _clock.inc();
pDEBUG("Timestamp done\n");
TimeStamp ts = _clock.inc();
printf("LowSender::run -- Timestamp done\n");
Message msg(_type, ts, MESSAGE, strlen(MESSAGE));
pDEBUG("Mesg done and ready to send\n");
Message msg(_type, ts, MESSAGE, strlen(MESSAGE));
printf("LowSender::run -- Mesg done and ready to send\n");
_group.broadcast(msg);
printf("LowSender::run -- Mesg sent\n");
_group.broadcast(msg);
pDEBUG("Mesg sent\n");
} else {
pDEBUG("Not sending\n");
}
}
}

View file

@ -42,6 +42,7 @@ Message::Message(void * data, int len) {
// initialise message from the following data
_data_size = -1;
memcpy(&_data_size, (_raw + index), 2);
_data_size = ntohs(_data_size);
index += 2;
if (DEBUG_INPUT)

View file

@ -1,10 +0,0 @@
#include "messagecell_cb.h"
MessageCellCb::MessageCellCb(){
printf("MessageCellCb::MessageCellCb -- constructor\n");
this->message = NULL;
this->type = MessageCellCb::TYPE_UNDEF;
this->count = 0;
this->maximum = NULL;
}

View file

@ -1,23 +0,0 @@
#ifndef _MESSAGE_CELL_CB
#define _MESSAGE_CELL_CB
#include "message.h"
class MessageCellCb {
public:
typedef enum {
TYPE_UNDEF,
TYPE_TEMPORARY,
TYPE_DEFINITIVE
} Type;
Message * message;
MessageCellCb::Type type;
TimeStamp * maximum;
short count;
MessageCellCb();
};
#endif