This commit is contained in:
glenux 2006-02-05 17:43:00 +00:00
parent 20f0ac0ceb
commit ce77225682
33 changed files with 1660 additions and 0 deletions

5
Makefile.am Normal file
View file

@ -0,0 +1,5 @@
SUBDIRS = src
EXTRA_DIST = doc INSTALL README Doxyfile autogen.sh rapport

11
autogen.sh Executable file
View file

@ -0,0 +1,11 @@
echo "libtoolize..."
libtoolize
echo "aclocal..."
aclocal
echo "autoheader..."
autoheader
echo "autoconf..."
autoconf
echo "automake..."
automake -a
echo "ok."

152
configure.in Normal file
View file

@ -0,0 +1,152 @@
dnl Copyright (C) 2004-2005 Glenn ROLLAND.
dnl
dnl This program is free software; you can redistribute it and/or modify
dnl it under the terms of the GNU General Public License as published by
dnl the Free Software Foundation; either version 2 of the License, or
dnl (at your option) any later version.
dnl
dnl This program is distributed in the hope that it will be useful,
dnl but WITHOUT ANY WARRANTY; without even the implied warranty of
dnl MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
dnl GNU General Public License for more details.
dnl
dnl You should have received a copy of the GNU General Public License
dnl along with this program; if not, write to the Free Software
dnl Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
dnl
## Autoconf requirements
AC_INIT([dabroadcast], [0.0.1], [glenux@fr.st])
AC_PREREQ(2.50)
AC_CANONICAL_HOST
AC_CANONICAL_TARGET
AM_INIT_AUTOMAKE([dabroadcast],[0.0.1])
#AC_CANONICAL_SYSTEM
AM_CONFIG_HEADER(config.h)
AC_CONFIG_SRCDIR([src/dabroadcast.h])
AC_PROG_INSTALL
## information on the package
## ## checks for programs
## checks for libraries
## checks for header files
## checks for types
## checks for structures
## checks for compiler characteristics
## checks for library functions
## checks for system services
# AM_CONFIG_HEADER(config.h)
## required for c sources
AC_ISC_POSIX
AC_PROG_CC
## required for c++ sources
AC_PROG_CXX
AC_PROG_CPP
# AC_PROG_RANLIB obsoleted bu LIBTOOL
AC_LANG_SAVE
AC_LANG_CPLUSPLUS
# AC_PROG_YACC
# AC_PROG_LEX
# AM_PROG_LEX
dnl Checks for library functions.
AC_HEADER_STDC([])
#TODO: uncomment
AC_CHECK_HEADERS([vector set map iostream sstream string algorithm list memory iterator exception utility],,
AC_MSG_ERROR([You need to have the libstdc++ headers installed]))
dnl ***************************************************************************
dnl Gettext stuff.
dnl ***************************************************************************
#GETTEXT_PACKAGE=yalag-0.1
#AC_SUBST(GETTEXT_PACKAGE)
#AC_DEFINE_UNQUOTED(GETTEXT_PACKAGE,"$GETTEXT_PACKAGE", [Gettext package.])
#ALL_LINGUAS="fr"
#AM_GLIB_GNU_GETTEXT
dnl Check for programs
dnl Check for build configuration.
#AC_PROG_INTLTOOL
AM_PROG_LIBTOOL
dnl Checks for header files.
#AC_MSG_NOTICE([checking presence of SQLite])
#AC_CHECK_HEADERS([sqlite.h],[],
# [
# AC_MSG_NOTICE([*** Please install/upgrade SQLite before proceeding])
# AC_MSG_NOTICE([*** with Unlost Memories installation.])
# AC_MSG_ERROR([cannot find sqlite.h])
# ])
#sqlite_deps="sqlite3 >= 3.0"
#PKG_CHECK_MODULES(SQLITE3,$sqlite_deps,build_sqlite=yes,build_sqlite=no)
#AC_SUBST(SQLITE3_CFLAGS)
#AC_SUBST(SQLITE3_LIBS)
#glib_deps="glib-2.0 >= 2.0"
#PKG_CHECK_MODULES(GLIB2,$glib_deps, build_glib=yes, build_glib=no)
#AC_SUBST(GLIB2_CFLAGS)
#AC_SUBST(GLIB2_LIBS)
glibmm_deps="glibmm-2.4 >= 2.4"
PKG_CHECK_MODULES(GLIBMM,$glibmm_deps, build_glibmm=yes, build_glibmm=no)
AC_SUBST(GLIBMM_CFLAGS)
AC_SUBST(GLIBMM_LIBS)
if test "$build_glibmm" = "no"; then
AC_MSG_NOTICE([*** Please install/upgrade libglibmm-2.4-dev before proceeding])
AC_MSG_NOTICE([*** with installation.])
AC_MSG_ERROR([cannot find glibmm-2.4])
fi
gthread_deps="gthread-2.0 >= 2.8"
PKG_CHECK_MODULES(GTHREAD,$gthread_deps, build_gthread=yes, build_gthread=no)
AC_SUBST(GTHREAD_CFLAGS)
AC_SUBST(GTHREAD_LIBS)
if test "$build_gthread" = "no"; then
AC_MSG_NOTICE([*** Please install/upgrade libgthread-dev before proceeding])
AC_MSG_NOTICE([*** with installation.])
AC_MSG_ERROR([cannot find gthread])
fi
#xsock_deps="xsock >= 0.9.0"
#PKG_CHECK_MODULES(XSOCK,$xsock_deps, build_xsock=yes, build_xsock=no)
#AC_SUBST(XSOCK_CFLAGS)
#AC_SUBST(XSOCK_LIBS)
#if test "$build_xsock" = "no"; then
# AC_MSG_NOTICE([*** Please install/upgrade libxsock-dev before proceeding])
# AC_MSG_NOTICE([*** with installation.])
# AC_MSG_ERROR([cannot find xsock])
#fi
AC_CONFIG_FILES([
Makefile
src/Makefile
])
AC_OUTPUT
dnl configure.in ends here

56
src/Makefile.am Normal file
View file

@ -0,0 +1,56 @@
#AM_YFLAGS = -d -v
SUBDIRS = .
bin_PROGRAMS = dabcast
dabcast_SOURCES = \
config.cc \
dabroadcast.cc \
group.cc \
lowreceiver.cc \
lowsender.cc \
highreceiver.cc \
message.cc \
clock.cc \
clock_test.cc \
clock_ab.cc \
clock_cb.cc \
timestamp.cc \
messagecell_ab.cc
dabcast_SOURCES += \
groupmember.h \
config.h \
dabroadcast.h \
group.h \
lowreceiver.h \
lowsender.h \
highreceiver.h \
message.h \
clock.h \
clock_test.h \
clock_ab.h \
clock_cb.h \
protocol.h \
timestamp.h \
messagecell_ab.h
INCLUDES = -I./ @GLIBMM_CFLAGS@ @GTHREAD_CFLAGS@
#BUILT_SOURCES = source_parser.h source_lexer.cc
# LEXLIB@
# libyalag.la
# miniftpc_INCLUDE = a
#ytem_LDFLAGS = -L../lib -L../lib/.libs @LDFLAGS@ @GLIBMM_LDFLAGS@ @GDKMM_LDFLAGS@ @GTKMM_LDFLAGS@
#ytem_LDADD = \
# ${top_builddir}/src/lib/ytembase/libytembase.la \
# ${top_builddir}/src/lib/ytemgrapher/libytemgrapher.la \
# ${top_builddir}/src/lib/ytempert/libytempert.la
dabcast_LDFLAGS = @LDFLAGS@ @GLIBMM_LIBS@ @GTHREAD_LIBS@

2
src/TODO Normal file
View file

@ -0,0 +1,2 @@
dans la RFC, indiquer si le temps commence à 0 ou à 1
indiquer si l'index de la machine commence à 0 ou à 1

38
src/clock.cc Normal file
View file

@ -0,0 +1,38 @@
#include "clock.h"
//Clock::Clock(){
//}
/*int numb, int myindex){
// lock jusqu'a la fin de la fonction
if ((myindex < 0) || (myindex > numb)) {
// throw ClockInitError();
// throw exception
}
_myidx = myindex;
for (int idx = 0; idx < numb; idx++){
//printf("ext vector %d\n",idx);
_ticks.push_back(0);
}
}*/
TimeStamp Clock::inc(){
}
/*
// lock jusqu'a la fin de la fonction
Glib::Mutex::Lock lock(_mutex);
_ticks[_myidx] = _ticks.at(_myidx) + 1;
TimeStamp ts;
for(int i=0; i<_ticks.size(); i++){
ts.push_back(_ticks.at(i));
}
return ts;
}*/

29
src/clock.h Normal file
View file

@ -0,0 +1,29 @@
#ifndef _GYR_CLOCK
#define _GYR_CLOCK
#include <glibmm.h>
#include <vector>
#include <exception>
#include "timestamp.h"
class ClockUnadjustable : public std::exception { };
class ClockInitError : public std::exception { };
class Clock {
private:
protected:
size_t _size;
size_t _cur_index;
Glib::Mutex _mutex;
public:
virtual TimeStamp inc() = 0;
virtual void adjust(TimeStamp timestamp) = 0;
};
#endif

41
src/clock_ab.cc Normal file
View file

@ -0,0 +1,41 @@
#include "clock_ab.h"
ClockAb::ClockAb(size_t myindex){
Glib::Mutex::Lock lock(_mutex);
printf("ClockAb::ClockAb -- constructor\n");
_cur_index = myindex;
_ticks = 0;
}
/*int numb, int myindex){
// lock jusqu'a la fin de la fonction
if ((myindex < 0) || (myindex > numb)) {
throw ClockInitError();
// throw exception
}
Glib::Mutex::Lock lock(_mutex);
_myidx = myindex;
for (int idx = 0; idx < numb; idx++){
//printf("ext vector %d\n",idx);
_ticks.push_back(0);
}
}*/
TimeStamp ClockAb::inc(){
// lock jusqu'a la fin de la fonction
Glib::Mutex::Lock lock(_mutex);
printf("ClockAb::inc -- creating timestamp\n");
_ticks++;
TimeStamp ts(Protocol::TYPE_ABCAST, _cur_index);
ts.push_back(_ticks);
return ts;
}
void ClockAb::adjust(TimeStamp ts){
printf("ClockAb::adjust -- NOT IMPLEMENTED\n");
}

19
src/clock_ab.h Normal file
View file

@ -0,0 +1,19 @@
#ifndef _GYR_CLOCK_AB
#define _GYR_CLOCK_AB
#include "clock.h"
class ClockAb : public Clock {
private:
int _ticks;
protected:
public:
ClockAb(size_t index);
virtual TimeStamp inc();
virtual void adjust(TimeStamp stamp);
};
#endif

12
src/clock_cb.cc Normal file
View file

@ -0,0 +1,12 @@
#include "clock_cb.h"
ClockCb::ClockCb(size_t size, size_t index){
printf("ClockCb::ClockCb -- create\n");
}
void ClockCb::adjust(TimeStamp ts){
}
TimeStamp ClockCb::inc(){
}

19
src/clock_cb.h Normal file
View file

@ -0,0 +1,19 @@
#ifndef _GYR_CLOCK_CB
#define _GYR_CLOCK_CB
#include "clock.h"
class ClockCb : public Clock {
private:
std::vector<int> _ticks;
protected:
public:
ClockCb(size_t size, size_t index);
virtual TimeStamp inc();
virtual void adjust(TimeStamp stamp);
};
#endif

15
src/clock_test.cc Normal file
View file

@ -0,0 +1,15 @@
#include "clock_test.h"
ClockTest::ClockTest(){
printf("ClockTest::ClockTest -- create\n");
}
TimeStamp ClockTest::inc(){
//send an empty timestamp
//prin
}
void ClockTest::adjust(TimeStamp ts){
//do nothing
}

20
src/clock_test.h Normal file
View file

@ -0,0 +1,20 @@
#ifndef _GYR_CLOCK_TEST
#define _GYR_CLOCK_TEST
#include "clock.h"
class ClockTest : public Clock {
private:
int _ticks;
//std::vector<int> _ticks;
Glib::Mutex _mutex;
int _myidx;
protected:
public:
ClockTest(); //int truc, int truc);
virtual TimeStamp inc();
virtual void adjust(TimeStamp stamp);
};
#endif

139
src/config.cc Normal file
View file

@ -0,0 +1,139 @@
#include <sstream>
#include <iostream>
#include "config.h"
#include "group.h"
#include "message.h"
using namespace std;
Config::Config(int argc, char **argv) {
_port = -1;
_mode = Protocol::TYPE_UNKNOWN;
int groupPort;
while (1) {
static struct option long_options[] = {
//
//{"compress1", no_argument, 0, 'c'},
{"test", no_argument, 0, 'T'},
{"abcast", no_argument, 0, 'A'},
{"cbcast", no_argument, 0, 'C'},
{"group", required_argument, 0, 'g'},
{"port", required_argument, 0, 'p'},
{0, 0, 0, 0}
};
int option_index = 0;
int c = getopt_long(argc, argv, "TACg:p:",
long_options, &option_index);
/* detect the end of options */
if (c == -1) {
break;
}
switch (c) {
case 0:
printf("case NULL\n");
break;
case 'A':
{
_mode = Protocol::TYPE_ABCAST;
break;
}
case 'C':
{
_mode = Protocol::TYPE_CBCAST;
break;
}
case 'T':
{
_mode = Protocol::TYPE_TEST;
break;
}
case 'p':
{
stringstream s;
printf("Port -> %s\n",optarg);
s << string(optarg);
s >> _port;
}
break;
case 'g':
{
HostId g_host;
string optstr(optarg);
stringstream s_host, s_port;
int idx = optstr.find(":");
if (idx > 0){
cout << "Group -> "<< optstr <<" (: at "<< idx <<")\n";
// on oblige la forme XXXXXX:YY
s_host << optstr.substr(0,idx);
s_host >> (g_host.host);
s_port << optstr.substr(idx+1, optstr.size()-idx-1);
s_port >> (g_host.port);
} else {
cerr <<"Invalid host : '"<< optstr <<"'\n";
}
_group_hosts.push_back(g_host);
}
break;
case '?':
printf("unknow\n");
break;
default:
printf("default\n");
return;
}
}
}
bool Config::isValid() {
int score = 0;
int valid = 0;
if (_group_hosts.size() > 0) {
score++;
}
valid++;
if (_port > 0) {
score++;
}
valid++;
return (valid == score);
}
std::list<HostId> Config::getGroupHosts(){
return _group_hosts;
}
Protocol::Type Config::getMode(){
return _mode;
}
int Config::getPort(){
return _port;
}
void Config::usage() {
printf("Usage: webreducer <mode> [options]\n");
printf("\n");
printf("Modes (mutualy exclusive):\n");
printf("-T, -test Test mode (simple broadcast)\n");
printf("-A, -abcast ABcast mode\n");
printf("-C, -cbcast CBcast mode\n");
printf("Mandatory options:\n");
printf("-g, -group <host:port> Add an host to the group\n");
printf("-p, -port <port> Use this port on localhost\n");
}

30
src/config.h Normal file
View file

@ -0,0 +1,30 @@
#ifndef _GYR_CONFIG_H
#define _GYR_CONFIG_H
#include <set>
#include <getopt.h>
#include <glibmm.h>
#include "protocol.h"
#include "group.h"
class Config {
private:
std::list<HostId> _group_hosts;
int _port;
Protocol::Type _mode;
protected:
public:
Config(int argc, char **argv);
void usage();
bool isValid();
std::list<HostId> getGroupHosts();
int getPort();
Protocol::Type getMode();
};
#endif // _GYR_CONFIG_H

84
src/dabroadcast.cc Normal file
View file

@ -0,0 +1,84 @@
#include <iostream>
#include <glibmm.h>
#include <sigc++/class_slot.h>
#include "config.h"
// #include "groupmember.h"
#include "lowreceiver.h"
#include "highreceiver.h"
#include "lowsender.h"
#include "clock.h"
#include "clock_test.h"
#include "clock_ab.h"
#include "clock_cb.h"
/*
* - on doit passer en parametre les gars du groupe...
* -
*/
int main(int argc, char ** argv){
Config config(argc, argv);
if (config.isValid()){
Glib::thread_init();
Group grp(config.getGroupHosts());
Clock * clk;
//FIXME non-dynamic port !
int portHigh = 2310;
switch(config.getMode()){
case Protocol::TYPE_TEST:
clk = new ClockTest();
printf("Test mode!\n");
break;
case Protocol::TYPE_ABCAST:
clk = new ClockAb(1);
printf("AbCast mode!\n");
break;
case Protocol::TYPE_CBCAST:
clk = new ClockCb(4,1);
printf("CbCast mode!\n");
break;
case Protocol::TYPE_UNKNOWN:
clk = NULL;
printf("Unknow mode!\n");
exit(-1);
break;
default:
clk = NULL;
printf("Unknow(2) mode!\n");
exit(-1);
break;
}
HighReceiver high_receiver(portHigh);
LowReceiver low_receiver(config.getPort(),
portHigh,
grp,
*clk);
LowSender low_sender(grp,*clk,config.getMode());
Glib::Thread *const high_receiver_thr = Glib::Thread::create(
sigc::mem_fun(high_receiver, &HighReceiver::run), true);
Glib::Thread *const low_sender_thr = Glib::Thread::create(
sigc::mem_fun(low_sender, &LowSender::run), true);
Glib::Thread *const low_receiver_thr = Glib::Thread::create(
sigc::mem_fun(low_receiver, &LowReceiver::run), true);
low_sender_thr->join();
low_receiver_thr->join();
high_receiver_thr->join();
std::cout <<"Happy end"<<std::endl;
} else {
std::cout <<"Missing arguments"<<std::endl;
config.usage();
}
return 0;
}

0
src/dabroadcast.h Normal file
View file

113
src/group.cc Normal file
View file

@ -0,0 +1,113 @@
#include "group.h"
#include <iostream>
#define DEBUG 0
Group::Group(std::list<HostId> group){
_hosts = group;
_socket_desc = socket(AF_INET, SOCK_DGRAM, 0);
/* et l'autre variante : AF_UNIX */
if (_socket_desc < 0){
/* error */
perror("Creation de la socket impossible");
fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__);
// FIXME: throw something
exit(-1);
}
struct sockaddr_in * myaddr = new sockaddr_in;
bzero(myaddr,sizeof(sockaddr_in));
myaddr->sin_family = AF_INET;
myaddr->sin_port = htons(0);
myaddr->sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(_socket_desc,(struct sockaddr *)myaddr,sizeof(sockaddr_in)) < 0) {
//FIXME : throw something
perror("Attachement de la socket impossible");
fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__);
exit(-1);
}
//FIXME: définir une liste avec les structures...
std::list<HostId>::iterator hiIter;
for (hiIter = _hosts.begin();
hiIter != _hosts.end();
hiIter++){
// pour chacun des hostId, on ouvre une socket
HostId &hid = *hiIter;
std::cout << "HOST: " <<hid.host <<std::endl;
//printf("HOST: %s\n",hid.host.c_str());
/* structure hostent qui sera remplie par gethostbyname */
struct hostent * hp;
struct sockaddr_in * addr = new sockaddr_in;
int addr_len = sizeof(struct sockaddr_in);
/* Recuperer l'adresse IP du serveur */
if((hp = gethostbyname(hid.host.c_str())) == NULL) {
fprintf(stderr,
"Erreur client : echec gethostbyname sur %s\n",
hid.host.c_str()) ;
//FIXME: throw something
exit(-1);
}
addr->sin_family = AF_INET;
addr->sin_addr.s_addr = ((struct in_addr *) (hp->h_addr))->s_addr;
addr->sin_port = htons(hid.port);
_addrs.push_back(addr);
}
}
void Group::broadcast(Message & msg){
if (DEBUG)
printf("Group::broadcast -- enter\n");
std::vector<sockaddr_in *>::iterator saIter;
for (int index = 0; index < _addrs.size(); index++){
this->sendto(msg, index);
}
if (DEBUG)
printf("Group::broadcast -- exit\n");
}
void Group::sendto(Message &msg, int index){
sockaddr_in * addr = _addrs[index];
if (DEBUG)
printf("Group::sendto -- for\n");
int message_len = msg.getRawSize();
char * message = msg.getRaw();
int en;
if (DEBUG)
printf("Group::sendto -- got mesg\n");
if ((en = ::sendto(_socket_desc,
message,
message_len,
0,
(struct sockaddr*)addr,
sizeof(struct sockaddr_in)) < 0)){
perror("sendto failed\n");
/* error */
exit(-2);
} else {
if (DEBUG)
printf("Group::sendto -- done\n");
}
}
#undef DEBUG

39
src/group.h Normal file
View file

@ -0,0 +1,39 @@
#ifndef _GYR_GROUP_HOST_H
#define _GYR_GROUP_HOST_H
#include <string>
#include <list>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <errno.h>
#include <unistd.h>
#include "message.h"
class HostId {
public:
std::string host;
int port;
};
class Group {
public:
private:
std::list<HostId> _hosts;
std::vector<sockaddr_in *> _addrs;
int _socket_desc;
protected:
public:
Group(std::list<HostId> group);
void sendto(Message &msg, int index);
void broadcast(Message &msg);
};
#endif

22
src/groupmember.h Normal file
View file

@ -0,0 +1,22 @@
#ifndef _GYR_GROUP_MEMBER_HH
#define _GYR_GROUP_MEMBER_HH
#include "lowreceiver.h"
#include "highreceiver.h"
#include "lowsender.h"
class GroupMember {
private:
std::list<int> _group;
HighReceiver * _high_receiver; // high level
LowReceiver * _low_receiver; // low level
LowSender * _sender; // low level
protected:
public:
void addToGroup(int port);
};
#endif // _GYR_COMPONENT_HH

53
src/highreceiver.cc Normal file
View file

@ -0,0 +1,53 @@
#include "highreceiver.h"
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
HighReceiver::HighReceiver(short int portHigh){
printf("LowReceiver::LowReceiver --");
_socket_desc = socket(AF_INET, SOCK_DGRAM, 0);
/* et l'autre variante : AF_UNIX */
if (_socket_desc < 0){
/* error */
perror("Creation de la socket impossible");
fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__);
// FIXME: throw something
exit(-1);
}
struct sockaddr_in * _socket_addr = new sockaddr_in;
// port_low = externe
// port_high = interne
bzero(_socket_addr,sizeof(sockaddr_in));
_socket_addr->sin_family = AF_INET;
_socket_addr->sin_port = htons(portHigh);
_socket_addr->sin_addr.s_addr = htonl(INADDR_ANY);
// chopper une socket
if (bind(_socket_desc,(struct sockaddr *)_socket_addr,sizeof(sockaddr_in)) < 0) {
//FIXME : throw something
perror("Attachement de la socket impossible");
fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__);
exit(-1);
}
}
HighReceiver::~HighReceiver(){
delete _socket_addr;
}
void HighReceiver::run(){
while(1){
sleep(1);
printf("HighReceiver::run -- \n");
}
}

17
src/highreceiver.h Normal file
View file

@ -0,0 +1,17 @@
#ifndef _GYR_HIGH_RECEIVER_H
#define _GYR_HIGH_RECEIVER_H
class HighReceiver {
private:
int _socket_desc;
struct sockaddr_in * _socket_addr;
protected:
public:
HighReceiver(short int portHigh);
~HighReceiver();
void run(); // thread part
};
#endif // _GYR_HIGH_RECEIVER_H

144
src/lowreceiver.cc Normal file
View file

@ -0,0 +1,144 @@
#include "lowreceiver.h"
#include "messagecell_ab.h"
LowReceiver::LowReceiver(short port_low,
short port_high,
Group &grp,
Clock &clk) :
_port_low(port_low), _port_high(port_high), _group(grp), _clock(clk)
{
printf("LowReceiver::LowReceiver --\n");
_socket_desc = socket(AF_INET, SOCK_DGRAM, 0);
/* et l'autre variante : AF_UNIX */
if (_socket_desc < 0){
/* error */
perror("Creation de la socket impossible\n");
fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__);
// FIXME: throw something
exit(-1);
}
_socket_addr = new sockaddr_in;
// port_low = externe
// port_high = interne
bzero(_socket_addr,sizeof(sockaddr_in));
_socket_addr->sin_family = AF_INET;
_socket_addr->sin_port = htons(_port_low);
_socket_addr->sin_addr.s_addr = htonl(INADDR_ANY);
// chopper une socket
if (bind(_socket_desc,
(struct sockaddr *)_socket_addr,
sizeof(sockaddr_in)) < 0) {
//FIXME : throw something
perror("Attachement de la socket impossible\n");
fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__);
exit(-1);
}
}
LowReceiver::~LowReceiver() {
delete _socket_addr;
}
void LowReceiver::run(){
int buffer_len = 1500;
char * buffer = new char[buffer_len];
struct sockaddr_in repaddr;
socklen_t sockaddr_len = sizeof(struct sockaddr_in);
Message * mesg = NULL;
while(1){
// recevoir les données
memset(buffer, '\0', buffer_len);
bzero(&repaddr,sizeof(struct sockaddr_in));
int read_buffer_len = recvfrom(
_socket_desc,
buffer,
buffer_len,
0,
(struct sockaddr*)&repaddr,
&sockaddr_len);
mesg = new Message(buffer, read_buffer_len);
printf("LowReceiver::run -- READ\n");
char * str = new char[mesg->getDataSize() + 1];
strncpy(str, mesg->getData(), mesg->getDataSize());
str[mesg->getDataSize()] = '\0';
printf("LowReceiver::run -- READ '%s'\n", str);
this->manage(mesg);
delete(mesg);
mesg = NULL;
// on dispatche les données en fonction du protocole...
}
}
void LowReceiver::manage(Message * mesg){
switch(mesg->getType()){
case Protocol::TYPE_TEST :
{
printf("LowReceiver::manage -- NOT IMPLEMENTED\n");
}
break;
case Protocol::TYPE_ABCAST :
{
this->manage_abcast(mesg);
}
break;
case Protocol::TYPE_CBCAST :
{
this->manage_cbcast(mesg);
}
break;
default:
printf("LowReceiver::manage -- ERROR\n");
break;
}
}
void LowReceiver::manage_abcast(Message * mesg) {
static std::list<MessageCellAb *> fifo;
std::list<MessageCellAb *>::iterator iter;
printf("LowReceiver::manage_abcast -- init\n");
// FIXME: on suppose ne pas etre l'emetteur
// identifiant = horloge + id_site_emeteur
bool firstSeenMessage = true;
for (iter = fifo.begin(); iter != fifo.end(); iter++){
MessageCellAb * cur = *iter;
if (cur->message == mesg) {
printf("LowReceiver::manage_abcast -- message seen\n");
firstSeenMessage = false;
break;
}
}
if (firstSeenMessage){
// si le message est vu pour la premiere fois:
// - on l'ajoute dans la liste d'attente
MessageCellAb * cell = new MessageCellAb();
cell->message = new Message(*mesg); //FIXME: make a copy;
cell->type = MessageCellAb::TYPE_TEMPORARY;
// - on retourne une estampille(reception) a l'emeteur
} else {
// sinon
// - l'estampille du message est mise a jour
// - le message est marqué comme final
// - on défile les estampille finale la
}
}
void LowReceiver::manage_cbcast(Message * mesg) {
printf("LowReceiver::manage_cbcast -- init\n");
}

30
src/lowreceiver.h Normal file
View file

@ -0,0 +1,30 @@
#ifndef _GYR_LOW_RECEIVER
#define _GYR_LOW_RECEIVER
#include "group.h"
#include "clock.h"
#include "message.h"
class LowReceiver {
private:
Group & _group;
Clock & _clock;
int _port_low;
int _port_high;
int _socket_desc;
struct sockaddr_in * _socket_addr;
protected:
public:
LowReceiver(short port_low, short port_high, Group & grp, Clock & clk);
~LowReceiver();
void run(); // thread part
void manage(Message * mesg);
void manage_abcast(Message * mesg);
void manage_cbcast(Message * mesg);
};
#endif

26
src/lowsender.cc Normal file
View file

@ -0,0 +1,26 @@
#include "lowsender.h"
#define MESSAGE "AB or CB cast from LowSender ? hehe...."
LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type) : _group(grp), _clock(clk), _type(type) {
}
void LowSender::run(){
// thread part
while(1){
sleep(3);
printf("LowSender -- Broadcasting '%s'\n", MESSAGE);
TimeStamp ts = _clock.inc();
printf("LowSender::run -- Timestamp done\n");
Message msg(_type, ts, MESSAGE, strlen(MESSAGE));
printf("LowSender::run -- Mesg done and ready to send\n");
_group.broadcast(msg);
printf("LowSender::run -- Mesg sent\n");
}
}

22
src/lowsender.h Normal file
View file

@ -0,0 +1,22 @@
#ifndef _GYR_LOW_SENDER_H
#define _GYR_LOW_SENDER_H
#include "protocol.h"
#include "group.h"
#include "clock.h"
#include "message.h"
class LowSender{
private:
Group & _group;
Clock & _clock;
Protocol::Type _type;
protected:
public:
LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type);
void run(); // thread part
};
#endif

244
src/message.cc Normal file
View file

@ -0,0 +1,244 @@
#include <netinet/in.h>
#include "message.h"
#define DEBUG_INPUT 0
#define DEBUG_OUTPUT 0
Message::Message(const Message & original){
printf("Message::Message -- copy\n");
_size = original._size;
_raw = new char[_size];
memcpy(_raw, original._raw, _size);
_data_size = original._data_size;
_data = new char[_data_size];
memcpy(_data, original._data, _data_size);
_type = original._type;
_stamp = new TimeStamp(*(original._stamp));
}
Message::Message(void * data, int len) {
_raw = new char[len];
memcpy(_raw, data, len);
_size = len;
if (DEBUG_INPUT)
printf("Message::Message(void* , int) -- length %d\n",len);
int index = 0;
// découper le type
_type = (Protocol::Type)(_raw[index]);
index += 1;
/*
short stamp_index = -1;
short stamp_len = -1;
switch(_type){
case Protocol::TYPE_ABCAST :
{
memcpy(&stamp_index, (_raw + index), 2);
stamp_index = ntohs(stamp_index);
stamp_len = 1; // 2 * 2 octets;
index += 2;
}
break;
case Protocol::TYPE_CBCAST :
{
// on est super embétés...
// on regarde d'abord l'index...
// stamp_len = 4i
memcpy(&stamp_index, (_raw + index), 2);
stamp_index = ntohs(stamp_index);
index += 2;
memcpy(&stamp_len, (_raw + index), 2);
index += 2;
}
break;
case Protocol::TYPE_TEST :
{
// taille = 0;
stamp_len = 0;
}
break;
case Protocol::TYPE_UNKNOWN :
break;
default :
break;
}
if (DEBUG_INPUT) {
printf("Message::Message(void *, int) -- stamp_index %d\n", stamp_index);
printf("Message::Message(void *, int) -- stamp_len %d\n", stamp_len);
}
if (stamp_len < 0){
fprintf(stderr, "Longueur du timestamp inconnue\n !");
exit(-1);
}
_stamp = new TimeStamp(_type, stamp_index);
// on itere stamp_len fois sur 2 octets
for (int i = 0; i< stamp_len; i++){
short site_value;
memcpy(&site_value, (_raw + index), 2);
site_value = ntohs(site_value);
if (DEBUG_INPUT)
printf("Message::Message -- horloge[%d] = %d\n", i, site_value);
index += 2;
_stamp->push_back(site_value);
}
*/
_stamp = new TimeStamp(_type, (_raw + index), len - index);
index += _stamp->getRawSize();
// initialise message from the following data
_data_size = -1;
memcpy(&_data_size, (_raw + index), 2);
index += 2;
if (DEBUG_INPUT)
printf("Message::Message -- message length %d\n", _data_size);
_data = new char[_data_size];
memcpy(_data, (_raw+index), _data_size);
// if (DEBUG_INPUT)
// printf("Message::Message -- message data '%s'\n",_data); //FIXME (buffer overflow)
}
Message::Message(Protocol::Type type, TimeStamp &timestamp, char * mesg, short mesg_size) :
_type(type)
{
_data = new char[mesg_size];
_data_size = mesg_size;
strncpy(_data, mesg, _data_size);
_stamp = new TimeStamp(timestamp);
if (DEBUG_OUTPUT)
printf("Message::Message -- constructing from type, stamp, string\n");
// construire le message qui va bien a partir des parametres
unsigned char dgram_type;
unsigned int dgram_size;
char * dgram_stamp;
unsigned int dgram_type_idx;
unsigned int dgram_stamp_idx;
unsigned int dgram_messagesize_idx;
unsigned int dgram_messagedata_idx;
printf("Message::Message -- initialising\n");
dgram_type_idx = 0;
dgram_stamp_idx = 1;
if (DEBUG_OUTPUT)
printf("Message::Message -- computing message length\n");
dgram_messagesize_idx = dgram_stamp_idx + _stamp->getRawSize();
if (DEBUG_OUTPUT)
printf("Message::Message -- computing message length %d\n",dgram_messagesize_idx);
dgram_messagedata_idx = dgram_messagesize_idx + 2;
if (DEBUG_OUTPUT)
printf("Message::Message -- computing message length %d\n",dgram_messagedata_idx);
dgram_size = dgram_messagedata_idx + _data_size;
_size = dgram_size;
if (DEBUG_OUTPUT)
printf("Message::Message -- creating datagram (%d)\n", dgram_size);
// we create the dgram
if (_raw != NULL){ free(_raw); }
_raw = new char[dgram_size];
// fill the dgram with type
_raw[0] = _type;
// fill the dgram with timestamp
dgram_stamp = _stamp->getRaw();
memcpy((_raw + dgram_stamp_idx),
dgram_stamp, _stamp->getRawSize());
free(dgram_stamp);
// fill the dgram with message
short ds = htons(_data_size);
memcpy((_raw + dgram_messagesize_idx),
&ds,
2);
memcpy((_raw + dgram_messagedata_idx),
_data,
_data_size);
if (DEBUG_OUTPUT)
printf("Message::raw -- MESSAGE(%c,\n\t",type);
for (int i=0; i < _size; i++){
if (DEBUG_OUTPUT)
printf("%#x ", _raw[i]);
}
if (DEBUG_OUTPUT) printf(")\n");
}
char * Message::getRaw(){
char * result = new char[_size];
memcpy(result, _raw, _size);
printf("Message::getRaw -- hex: \n");
for (int i=0; i < _size; i++){
printf("%#x ", result[i]);
}
printf("\n");
return result;
}
size_t Message::getRawSize(){
printf("Message::getRawSize -- %d\n",_size);
return _size;
}
char * Message::getData(){
char * result = new char[_data_size];
memcpy(result, _data, _data_size);
return result;
}
short Message::getDataSize(){
return _data_size;
}
TimeStamp Message::getStamp(){
return *_stamp;
}
Protocol::Type Message::getType(){
return _type;
}
bool Message::operator==(Message &message) {
printf("Message::operator== -- \n");
return (message.getStamp() == this->getStamp());
}
#undef DEBUG_INPUT
#undef DEBUG_OUTPUT

35
src/message.h Normal file
View file

@ -0,0 +1,35 @@
#ifndef _GYR_MESSAGE_H
#define _GYR_MESSAGE_H
#include <string>
#include "protocol.h"
#include "clock.h"
class Message {
public:
private:
char * _raw;
size_t _size;
char * _data;
short _data_size;
TimeStamp * _stamp; // construit dynamiquement
Protocol::Type _type;
protected:
public:
Message(Protocol::Type type, TimeStamp &ts, char * data, short data_size);
Message(void * data, int len);
Message(const Message & mesg);
char * getRaw();
size_t getRawSize();
char * getData();
short getDataSize();
Protocol::Type getType();
TimeStamp getStamp();
bool operator==(Message &message);
};
#endif

8
src/messagecell_ab.cc Normal file
View file

@ -0,0 +1,8 @@
#include "messagecell_ab.h"
MessageCellAb::MessageCellAb(){
printf("MessageCellAb::MessageCellAb -- constructor\n");
this->message = NULL;
this->type = MessageCellAb::TYPE_UNDEF;
}

21
src/messagecell_ab.h Normal file
View file

@ -0,0 +1,21 @@
#ifndef _MESSAGE_CELL_AB
#define _MESSAGE_CELL_AB
#include "message.h"
class MessageCellAb {
public:
typedef enum {
TYPE_UNDEF,
TYPE_TEMPORARY,
TYPE_DEFINITIVE
} Type;
Message * message;
MessageCellAb::Type type;
MessageCellAb();
};
#endif

14
src/protocol.h Normal file
View file

@ -0,0 +1,14 @@
#ifndef _GYR_PROTOCOL_H
#define _GYR_PROTOCOL_H
class Protocol {
public:
typedef enum {
TYPE_UNKNOWN='#',
TYPE_TEST='T',
TYPE_ABCAST='A',
TYPE_CBCAST='C'
} Type;
};
#endif // _GYR_PROTOCOL_H

175
src/timestamp.cc Normal file
View file

@ -0,0 +1,175 @@
#include <netinet/in.h>
#include "timestamp.h"
#define DEBUG 1
TimeStamp::TimeStamp(Protocol::Type type, short index){
_index = index;
_type = type;
}
TimeStamp::TimeStamp(Protocol::Type type, char * raw, short raw_size){
int pos_idx = 0;
short stamp_len = -1;
_index = -1;
_type = type;
switch(_type){
case Protocol::TYPE_ABCAST :
{
memcpy(&_index, raw , 2);
_index = ntohs(_index);
pos_idx += 2;
stamp_len = 1;
}
break;
case Protocol::TYPE_CBCAST :
{
// on est super embétés...
// on regarde d'abord l'index...
// stamp_len = 4i
memcpy(&_index, raw , 2);
_index = ntohs(_index);
pos_idx += 2;
memcpy(&stamp_len, (raw + pos_idx), 2);
stamp_len = ntohs(stamp_len);
pos_idx += 2;
}
break;
case Protocol::TYPE_TEST :
{
// taille = 0;
stamp_len = 0;
}
break;
case Protocol::TYPE_UNKNOWN :
break;
default :
break;
}
if (DEBUG) {
printf("TimeStamp::TimeStamp(Protocol::Type, void *, int) -- stamp_index %d\n", _index);
printf("TimeStamp::TimeStamp(Protocol::Type, void *, int) -- stamp_len %d\n", stamp_len);
}
if (stamp_len < 0){
fprintf(stderr, "TimeStamp::TimeStamp -- Longueur du timestamp inconnue\n !");
exit(-1);
}
//_stamp = new TimeStamp(_type, stamp_index);
// on itere stamp_len fois sur 2 octets
for (int i = 0; i< stamp_len; i++){
short net_site_value, host_site_value;
memcpy(&net_site_value, (raw + pos_idx), 2);
host_site_value = ntohs(net_site_value);
if (DEBUG)
printf("TimeStamp::TimeStamp -- index %d horloge[%d] = %d -> %d\n",
pos_idx, i, net_site_value, host_site_value);
pos_idx += 2;
this->push_back(host_site_value);
}
}
Protocol::Type TimeStamp::getType(){
return _type;
}
short TimeStamp::getIndex(){
return _index;
}
char * TimeStamp::getRaw(){
int result_len = 0;
char * result = NULL;
switch(_type){
case Protocol::TYPE_TEST :
{
result = NULL;
}
break;
case Protocol::TYPE_ABCAST :
{
result_len = 2; // 2 bytes for site index
result_len += 2; // 2 bytes for clock value
result = new char[result_len];
short index_value = htons(_index);
if (DEBUG)
printf("TimeStamp::raw -- index_value %d -> %d\n", _index, index_value);
memcpy(result, &index_value, 2); // on fixe l'index
short host_clock_value, net_clock_value;
host_clock_value = (*this)[0];
net_clock_value = htons(host_clock_value);
if (DEBUG)
printf("TimeStamp::raw -- clock_value %d -> %d\n",host_clock_value, net_clock_value);
memcpy((result + 2),
&net_clock_value, 2); // on fixe l'index
}
break;
case Protocol::TYPE_CBCAST :
{
}
break;
default:
printf("TimeStamp::raw -- undef type\n");
break;
}
return result;
}
unsigned short TimeStamp::getRawSize(){
int result = -1;
switch(_type){
case Protocol::TYPE_TEST :
{
result = 0;
}
break;
case Protocol::TYPE_ABCAST :
{
result = 4;
}
break;
case Protocol::TYPE_CBCAST :
{
result = 4; // FIXME: plus la taille du vecteur * 2;
}
break;
default:
printf("TimeStamp::raw_len -- undef type\n");
break;
}
return result;
}
bool TimeStamp::operator==(TimeStamp &stamp){
printf("TimeStamp::operator== -- \n");
bool ident = true;
if (this->_index != stamp._index){
ident = false;
}
if (this->size() != stamp.size()){
ident = false;
} else {
for (int i = 0; i < this->size(); i++){
if ((*this)[i] != stamp[i]){
ident = false;
break;
}
}
}
return ident;
}
#undef DEBUG

25
src/timestamp.h Normal file
View file

@ -0,0 +1,25 @@
#ifndef _GYR_TIMESTAMP
#define _GYR_TIMESTAMP
#include "protocol.h"
#include <vector>
class TimeStamp : public std::vector<short int> {
private:
short _index;
Protocol::Type _type;
public:
TimeStamp(Protocol::Type t, short idx);
TimeStamp::TimeStamp(Protocol::Type type, char * raw, short raw_size);
char * getRaw();
unsigned short getRawSize();
Protocol::Type getType();
short getIndex();
bool operator==(TimeStamp &stamp);
};
#endif