From ce772256821732e18cb2956e58e7fab20011360e Mon Sep 17 00:00:00 2001 From: glenux Date: Sun, 5 Feb 2006 17:43:00 +0000 Subject: [PATCH] --- Makefile.am | 5 + autogen.sh | 11 ++ configure.in | 152 ++++++++++++++++++++++++++ src/Makefile.am | 56 ++++++++++ src/TODO | 2 + src/clock.cc | 38 +++++++ src/clock.h | 29 +++++ src/clock_ab.cc | 41 +++++++ src/clock_ab.h | 19 ++++ src/clock_cb.cc | 12 +++ src/clock_cb.h | 19 ++++ src/clock_test.cc | 15 +++ src/clock_test.h | 20 ++++ src/config.cc | 139 ++++++++++++++++++++++++ src/config.h | 30 ++++++ src/dabroadcast.cc | 84 +++++++++++++++ src/dabroadcast.h | 0 src/group.cc | 113 +++++++++++++++++++ src/group.h | 39 +++++++ src/groupmember.h | 22 ++++ src/highreceiver.cc | 53 +++++++++ src/highreceiver.h | 17 +++ src/lowreceiver.cc | 144 +++++++++++++++++++++++++ src/lowreceiver.h | 30 ++++++ src/lowsender.cc | 26 +++++ src/lowsender.h | 22 ++++ src/message.cc | 244 ++++++++++++++++++++++++++++++++++++++++++ src/message.h | 35 ++++++ src/messagecell_ab.cc | 8 ++ src/messagecell_ab.h | 21 ++++ src/protocol.h | 14 +++ src/timestamp.cc | 175 ++++++++++++++++++++++++++++++ src/timestamp.h | 25 +++++ 33 files changed, 1660 insertions(+) create mode 100644 Makefile.am create mode 100755 autogen.sh create mode 100644 configure.in create mode 100644 src/Makefile.am create mode 100644 src/TODO create mode 100644 src/clock.cc create mode 100644 src/clock.h create mode 100644 src/clock_ab.cc create mode 100644 src/clock_ab.h create mode 100644 src/clock_cb.cc create mode 100644 src/clock_cb.h create mode 100644 src/clock_test.cc create mode 100644 src/clock_test.h create mode 100644 src/config.cc create mode 100644 src/config.h create mode 100644 src/dabroadcast.cc create mode 100644 src/dabroadcast.h create mode 100644 src/group.cc create mode 100644 src/group.h create mode 100644 src/groupmember.h create mode 100644 src/highreceiver.cc create mode 100644 src/highreceiver.h create mode 100644 src/lowreceiver.cc create mode 100644 src/lowreceiver.h create mode 100644 src/lowsender.cc create mode 100644 src/lowsender.h create mode 100644 src/message.cc create mode 100644 src/message.h create mode 100644 src/messagecell_ab.cc create mode 100644 src/messagecell_ab.h create mode 100644 src/protocol.h create mode 100644 src/timestamp.cc create mode 100644 src/timestamp.h diff --git a/Makefile.am b/Makefile.am new file mode 100644 index 0000000..bcb0fba --- /dev/null +++ b/Makefile.am @@ -0,0 +1,5 @@ +SUBDIRS = src + +EXTRA_DIST = doc INSTALL README Doxyfile autogen.sh rapport + + diff --git a/autogen.sh b/autogen.sh new file mode 100755 index 0000000..b6037f1 --- /dev/null +++ b/autogen.sh @@ -0,0 +1,11 @@ +echo "libtoolize..." +libtoolize +echo "aclocal..." +aclocal +echo "autoheader..." +autoheader +echo "autoconf..." +autoconf +echo "automake..." +automake -a +echo "ok." diff --git a/configure.in b/configure.in new file mode 100644 index 0000000..25e6ac9 --- /dev/null +++ b/configure.in @@ -0,0 +1,152 @@ +dnl Copyright (C) 2004-2005 Glenn ROLLAND. +dnl +dnl This program is free software; you can redistribute it and/or modify +dnl it under the terms of the GNU General Public License as published by +dnl the Free Software Foundation; either version 2 of the License, or +dnl (at your option) any later version. +dnl +dnl This program is distributed in the hope that it will be useful, +dnl but WITHOUT ANY WARRANTY; without even the implied warranty of +dnl MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +dnl GNU General Public License for more details. +dnl +dnl You should have received a copy of the GNU General Public License +dnl along with this program; if not, write to the Free Software +dnl Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +dnl + +## Autoconf requirements +AC_INIT([dabroadcast], [0.0.1], [glenux@fr.st]) +AC_PREREQ(2.50) + +AC_CANONICAL_HOST +AC_CANONICAL_TARGET + + +AM_INIT_AUTOMAKE([dabroadcast],[0.0.1]) + +#AC_CANONICAL_SYSTEM + + + +AM_CONFIG_HEADER(config.h) +AC_CONFIG_SRCDIR([src/dabroadcast.h]) +AC_PROG_INSTALL + +## information on the package +## ## checks for programs +## checks for libraries +## checks for header files +## checks for types +## checks for structures +## checks for compiler characteristics +## checks for library functions +## checks for system services + +# AM_CONFIG_HEADER(config.h) +## required for c sources + +AC_ISC_POSIX +AC_PROG_CC +## required for c++ sources +AC_PROG_CXX +AC_PROG_CPP +# AC_PROG_RANLIB obsoleted bu LIBTOOL + +AC_LANG_SAVE +AC_LANG_CPLUSPLUS + +# AC_PROG_YACC +# AC_PROG_LEX +# AM_PROG_LEX + +dnl Checks for library functions. +AC_HEADER_STDC([]) + +#TODO: uncomment +AC_CHECK_HEADERS([vector set map iostream sstream string algorithm list memory iterator exception utility],, + AC_MSG_ERROR([You need to have the libstdc++ headers installed])) + + + +dnl *************************************************************************** +dnl Gettext stuff. +dnl *************************************************************************** + +#GETTEXT_PACKAGE=yalag-0.1 +#AC_SUBST(GETTEXT_PACKAGE) +#AC_DEFINE_UNQUOTED(GETTEXT_PACKAGE,"$GETTEXT_PACKAGE", [Gettext package.]) + +#ALL_LINGUAS="fr" +#AM_GLIB_GNU_GETTEXT + + +dnl Check for programs + +dnl Check for build configuration. +#AC_PROG_INTLTOOL + +AM_PROG_LIBTOOL + +dnl Checks for header files. + +#AC_MSG_NOTICE([checking presence of SQLite]) +#AC_CHECK_HEADERS([sqlite.h],[], +# [ +# AC_MSG_NOTICE([*** Please install/upgrade SQLite before proceeding]) +# AC_MSG_NOTICE([*** with Unlost Memories installation.]) +# AC_MSG_ERROR([cannot find sqlite.h]) +# ]) + +#sqlite_deps="sqlite3 >= 3.0" +#PKG_CHECK_MODULES(SQLITE3,$sqlite_deps,build_sqlite=yes,build_sqlite=no) +#AC_SUBST(SQLITE3_CFLAGS) +#AC_SUBST(SQLITE3_LIBS) + +#glib_deps="glib-2.0 >= 2.0" +#PKG_CHECK_MODULES(GLIB2,$glib_deps, build_glib=yes, build_glib=no) +#AC_SUBST(GLIB2_CFLAGS) +#AC_SUBST(GLIB2_LIBS) + +glibmm_deps="glibmm-2.4 >= 2.4" +PKG_CHECK_MODULES(GLIBMM,$glibmm_deps, build_glibmm=yes, build_glibmm=no) +AC_SUBST(GLIBMM_CFLAGS) +AC_SUBST(GLIBMM_LIBS) + +if test "$build_glibmm" = "no"; then + AC_MSG_NOTICE([*** Please install/upgrade libglibmm-2.4-dev before proceeding]) + AC_MSG_NOTICE([*** with installation.]) + AC_MSG_ERROR([cannot find glibmm-2.4]) +fi + +gthread_deps="gthread-2.0 >= 2.8" +PKG_CHECK_MODULES(GTHREAD,$gthread_deps, build_gthread=yes, build_gthread=no) +AC_SUBST(GTHREAD_CFLAGS) +AC_SUBST(GTHREAD_LIBS) + +if test "$build_gthread" = "no"; then + AC_MSG_NOTICE([*** Please install/upgrade libgthread-dev before proceeding]) + AC_MSG_NOTICE([*** with installation.]) + AC_MSG_ERROR([cannot find gthread]) +fi + +#xsock_deps="xsock >= 0.9.0" +#PKG_CHECK_MODULES(XSOCK,$xsock_deps, build_xsock=yes, build_xsock=no) +#AC_SUBST(XSOCK_CFLAGS) +#AC_SUBST(XSOCK_LIBS) + +#if test "$build_xsock" = "no"; then +# AC_MSG_NOTICE([*** Please install/upgrade libxsock-dev before proceeding]) +# AC_MSG_NOTICE([*** with installation.]) +# AC_MSG_ERROR([cannot find xsock]) +#fi + + + +AC_CONFIG_FILES([ + Makefile + src/Makefile + ]) +AC_OUTPUT + +dnl configure.in ends here diff --git a/src/Makefile.am b/src/Makefile.am new file mode 100644 index 0000000..136662e --- /dev/null +++ b/src/Makefile.am @@ -0,0 +1,56 @@ +#AM_YFLAGS = -d -v + +SUBDIRS = . + +bin_PROGRAMS = dabcast + +dabcast_SOURCES = \ + config.cc \ + dabroadcast.cc \ + group.cc \ + lowreceiver.cc \ + lowsender.cc \ + highreceiver.cc \ + message.cc \ + clock.cc \ + clock_test.cc \ + clock_ab.cc \ + clock_cb.cc \ + timestamp.cc \ + messagecell_ab.cc + +dabcast_SOURCES += \ + groupmember.h \ + config.h \ + dabroadcast.h \ + group.h \ + lowreceiver.h \ + lowsender.h \ + highreceiver.h \ + message.h \ + clock.h \ + clock_test.h \ + clock_ab.h \ + clock_cb.h \ + protocol.h \ + timestamp.h \ + messagecell_ab.h + +INCLUDES = -I./ @GLIBMM_CFLAGS@ @GTHREAD_CFLAGS@ + +#BUILT_SOURCES = source_parser.h source_lexer.cc + +# LEXLIB@ +# libyalag.la +# miniftpc_INCLUDE = a + +#ytem_LDFLAGS = -L../lib -L../lib/.libs @LDFLAGS@ @GLIBMM_LDFLAGS@ @GDKMM_LDFLAGS@ @GTKMM_LDFLAGS@ +#ytem_LDADD = \ +# ${top_builddir}/src/lib/ytembase/libytembase.la \ +# ${top_builddir}/src/lib/ytemgrapher/libytemgrapher.la \ +# ${top_builddir}/src/lib/ytempert/libytempert.la + +dabcast_LDFLAGS = @LDFLAGS@ @GLIBMM_LIBS@ @GTHREAD_LIBS@ + + + diff --git a/src/TODO b/src/TODO new file mode 100644 index 0000000..0e65ad0 --- /dev/null +++ b/src/TODO @@ -0,0 +1,2 @@ +dans la RFC, indiquer si le temps commence à 0 ou à 1 +indiquer si l'index de la machine commence à 0 ou à 1 diff --git a/src/clock.cc b/src/clock.cc new file mode 100644 index 0000000..adb3925 --- /dev/null +++ b/src/clock.cc @@ -0,0 +1,38 @@ + +#include "clock.h" + + +//Clock::Clock(){ +//} + +/*int numb, int myindex){ + // lock jusqu'a la fin de la fonction + if ((myindex < 0) || (myindex > numb)) { + // throw ClockInitError(); + // throw exception + } + + + _myidx = myindex; + for (int idx = 0; idx < numb; idx++){ + //printf("ext vector %d\n",idx); + _ticks.push_back(0); + } +}*/ + +TimeStamp Clock::inc(){ +} + +/* +// lock jusqu'a la fin de la fonction +Glib::Mutex::Lock lock(_mutex); + +_ticks[_myidx] = _ticks.at(_myidx) + 1; + +TimeStamp ts; +for(int i=0; i<_ticks.size(); i++){ +ts.push_back(_ticks.at(i)); +} +return ts; +}*/ + diff --git a/src/clock.h b/src/clock.h new file mode 100644 index 0000000..be06acd --- /dev/null +++ b/src/clock.h @@ -0,0 +1,29 @@ +#ifndef _GYR_CLOCK +#define _GYR_CLOCK + +#include + +#include +#include + +#include "timestamp.h" + +class ClockUnadjustable : public std::exception { }; +class ClockInitError : public std::exception { }; + + +class Clock { + private: + + protected: + size_t _size; + size_t _cur_index; + Glib::Mutex _mutex; + + public: + + virtual TimeStamp inc() = 0; + virtual void adjust(TimeStamp timestamp) = 0; +}; + +#endif diff --git a/src/clock_ab.cc b/src/clock_ab.cc new file mode 100644 index 0000000..75a1592 --- /dev/null +++ b/src/clock_ab.cc @@ -0,0 +1,41 @@ + +#include "clock_ab.h" + +ClockAb::ClockAb(size_t myindex){ + Glib::Mutex::Lock lock(_mutex); + printf("ClockAb::ClockAb -- constructor\n"); + _cur_index = myindex; + _ticks = 0; +} + +/*int numb, int myindex){ + // lock jusqu'a la fin de la fonction + if ((myindex < 0) || (myindex > numb)) { + throw ClockInitError(); + // throw exception + } + + Glib::Mutex::Lock lock(_mutex); + + _myidx = myindex; + for (int idx = 0; idx < numb; idx++){ + //printf("ext vector %d\n",idx); + _ticks.push_back(0); + } +}*/ + +TimeStamp ClockAb::inc(){ + // lock jusqu'a la fin de la fonction + Glib::Mutex::Lock lock(_mutex); + + printf("ClockAb::inc -- creating timestamp\n"); + _ticks++; + + TimeStamp ts(Protocol::TYPE_ABCAST, _cur_index); + ts.push_back(_ticks); + return ts; +} + +void ClockAb::adjust(TimeStamp ts){ + printf("ClockAb::adjust -- NOT IMPLEMENTED\n"); +} diff --git a/src/clock_ab.h b/src/clock_ab.h new file mode 100644 index 0000000..55656e7 --- /dev/null +++ b/src/clock_ab.h @@ -0,0 +1,19 @@ +#ifndef _GYR_CLOCK_AB +#define _GYR_CLOCK_AB + +#include "clock.h" + +class ClockAb : public Clock { + private: + int _ticks; + + protected: + + public: + ClockAb(size_t index); + + virtual TimeStamp inc(); + virtual void adjust(TimeStamp stamp); +}; + +#endif diff --git a/src/clock_cb.cc b/src/clock_cb.cc new file mode 100644 index 0000000..a160166 --- /dev/null +++ b/src/clock_cb.cc @@ -0,0 +1,12 @@ + +#include "clock_cb.h" + +ClockCb::ClockCb(size_t size, size_t index){ + printf("ClockCb::ClockCb -- create\n"); +} + +void ClockCb::adjust(TimeStamp ts){ +} + +TimeStamp ClockCb::inc(){ +} diff --git a/src/clock_cb.h b/src/clock_cb.h new file mode 100644 index 0000000..47e6e4d --- /dev/null +++ b/src/clock_cb.h @@ -0,0 +1,19 @@ +#ifndef _GYR_CLOCK_CB +#define _GYR_CLOCK_CB + +#include "clock.h" + +class ClockCb : public Clock { + private: + std::vector _ticks; + + protected: + + public: + ClockCb(size_t size, size_t index); + + virtual TimeStamp inc(); + virtual void adjust(TimeStamp stamp); +}; + +#endif diff --git a/src/clock_test.cc b/src/clock_test.cc new file mode 100644 index 0000000..f38977c --- /dev/null +++ b/src/clock_test.cc @@ -0,0 +1,15 @@ + +#include "clock_test.h" + +ClockTest::ClockTest(){ + printf("ClockTest::ClockTest -- create\n"); +} + +TimeStamp ClockTest::inc(){ + //send an empty timestamp + //prin +} + +void ClockTest::adjust(TimeStamp ts){ + //do nothing +} diff --git a/src/clock_test.h b/src/clock_test.h new file mode 100644 index 0000000..2feb759 --- /dev/null +++ b/src/clock_test.h @@ -0,0 +1,20 @@ +#ifndef _GYR_CLOCK_TEST +#define _GYR_CLOCK_TEST + +#include "clock.h" + +class ClockTest : public Clock { + private: + int _ticks; + //std::vector _ticks; + Glib::Mutex _mutex; + int _myidx; + + protected: + public: + ClockTest(); //int truc, int truc); + virtual TimeStamp inc(); + virtual void adjust(TimeStamp stamp); +}; + +#endif diff --git a/src/config.cc b/src/config.cc new file mode 100644 index 0000000..e91b2b7 --- /dev/null +++ b/src/config.cc @@ -0,0 +1,139 @@ +#include +#include + +#include "config.h" +#include "group.h" +#include "message.h" +using namespace std; + +Config::Config(int argc, char **argv) { + _port = -1; + _mode = Protocol::TYPE_UNKNOWN; + + int groupPort; + + while (1) { + static struct option long_options[] = { + // + //{"compress1", no_argument, 0, 'c'}, + {"test", no_argument, 0, 'T'}, + {"abcast", no_argument, 0, 'A'}, + {"cbcast", no_argument, 0, 'C'}, + + {"group", required_argument, 0, 'g'}, + {"port", required_argument, 0, 'p'}, + + {0, 0, 0, 0} + }; + + int option_index = 0; + + int c = getopt_long(argc, argv, "TACg:p:", + long_options, &option_index); + + /* detect the end of options */ + if (c == -1) { + break; + } + + switch (c) { + case 0: + printf("case NULL\n"); + break; + case 'A': + { + _mode = Protocol::TYPE_ABCAST; + break; + } + case 'C': + { + _mode = Protocol::TYPE_CBCAST; + break; + } + case 'T': + { + _mode = Protocol::TYPE_TEST; + break; + } + case 'p': + { + stringstream s; + printf("Port -> %s\n",optarg); + + s << string(optarg); + s >> _port; + } + break; + case 'g': + { + HostId g_host; + string optstr(optarg); + stringstream s_host, s_port; + + int idx = optstr.find(":"); + if (idx > 0){ + cout << "Group -> "<< optstr <<" (: at "<< idx <<")\n"; + + // on oblige la forme XXXXXX:YY + + s_host << optstr.substr(0,idx); + s_host >> (g_host.host); + + s_port << optstr.substr(idx+1, optstr.size()-idx-1); + s_port >> (g_host.port); + } else { + cerr <<"Invalid host : '"<< optstr <<"'\n"; + } + _group_hosts.push_back(g_host); + } + break; + case '?': + printf("unknow\n"); + break; + default: + printf("default\n"); + return; + } + } +} + +bool Config::isValid() { + int score = 0; + int valid = 0; + + if (_group_hosts.size() > 0) { + score++; + } + valid++; + + if (_port > 0) { + score++; + } + valid++; + + return (valid == score); +} + +std::list Config::getGroupHosts(){ + return _group_hosts; +} + +Protocol::Type Config::getMode(){ + return _mode; +} + +int Config::getPort(){ + return _port; +} + +void Config::usage() { + printf("Usage: webreducer [options]\n"); + printf("\n"); + printf("Modes (mutualy exclusive):\n"); + printf("-T, -test Test mode (simple broadcast)\n"); + printf("-A, -abcast ABcast mode\n"); + printf("-C, -cbcast CBcast mode\n"); + printf("Mandatory options:\n"); + printf("-g, -group Add an host to the group\n"); + printf("-p, -port Use this port on localhost\n"); +} diff --git a/src/config.h b/src/config.h new file mode 100644 index 0000000..e32aed8 --- /dev/null +++ b/src/config.h @@ -0,0 +1,30 @@ +#ifndef _GYR_CONFIG_H +#define _GYR_CONFIG_H + +#include + +#include +#include + +#include "protocol.h" +#include "group.h" + +class Config { + private: + std::list _group_hosts; + int _port; + Protocol::Type _mode; + + protected: + + public: + + Config(int argc, char **argv); + + void usage(); + bool isValid(); + std::list getGroupHosts(); + int getPort(); + Protocol::Type getMode(); +}; +#endif // _GYR_CONFIG_H diff --git a/src/dabroadcast.cc b/src/dabroadcast.cc new file mode 100644 index 0000000..69c91f8 --- /dev/null +++ b/src/dabroadcast.cc @@ -0,0 +1,84 @@ + +#include +#include +#include + +#include "config.h" +// #include "groupmember.h" + +#include "lowreceiver.h" +#include "highreceiver.h" +#include "lowsender.h" + +#include "clock.h" +#include "clock_test.h" +#include "clock_ab.h" +#include "clock_cb.h" + + +/* + * - on doit passer en parametre les gars du groupe... + * - + */ + +int main(int argc, char ** argv){ + Config config(argc, argv); + if (config.isValid()){ + Glib::thread_init(); + + Group grp(config.getGroupHosts()); + Clock * clk; + + //FIXME non-dynamic port ! + int portHigh = 2310; + + switch(config.getMode()){ + case Protocol::TYPE_TEST: + clk = new ClockTest(); + printf("Test mode!\n"); + break; + case Protocol::TYPE_ABCAST: + clk = new ClockAb(1); + printf("AbCast mode!\n"); + break; + case Protocol::TYPE_CBCAST: + clk = new ClockCb(4,1); + printf("CbCast mode!\n"); + break; + case Protocol::TYPE_UNKNOWN: + clk = NULL; + printf("Unknow mode!\n"); + exit(-1); + break; + default: + clk = NULL; + printf("Unknow(2) mode!\n"); + exit(-1); + break; + } + HighReceiver high_receiver(portHigh); + + LowReceiver low_receiver(config.getPort(), + portHigh, + grp, + *clk); + LowSender low_sender(grp,*clk,config.getMode()); + + Glib::Thread *const high_receiver_thr = Glib::Thread::create( + sigc::mem_fun(high_receiver, &HighReceiver::run), true); + Glib::Thread *const low_sender_thr = Glib::Thread::create( + sigc::mem_fun(low_sender, &LowSender::run), true); + Glib::Thread *const low_receiver_thr = Glib::Thread::create( + sigc::mem_fun(low_receiver, &LowReceiver::run), true); + + low_sender_thr->join(); + low_receiver_thr->join(); + high_receiver_thr->join(); + + std::cout <<"Happy end"< + +#define DEBUG 0 + +Group::Group(std::list group){ + _hosts = group; + _socket_desc = socket(AF_INET, SOCK_DGRAM, 0); + + /* et l'autre variante : AF_UNIX */ + if (_socket_desc < 0){ + /* error */ + perror("Creation de la socket impossible"); + fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__); + // FIXME: throw something + exit(-1); + + } + + struct sockaddr_in * myaddr = new sockaddr_in; + + bzero(myaddr,sizeof(sockaddr_in)); + myaddr->sin_family = AF_INET; + myaddr->sin_port = htons(0); + myaddr->sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(_socket_desc,(struct sockaddr *)myaddr,sizeof(sockaddr_in)) < 0) { + //FIXME : throw something + perror("Attachement de la socket impossible"); + fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__); + exit(-1); + } + + //FIXME: définir une liste avec les structures... + std::list::iterator hiIter; + for (hiIter = _hosts.begin(); + hiIter != _hosts.end(); + hiIter++){ + // pour chacun des hostId, on ouvre une socket + HostId &hid = *hiIter; + std::cout << "HOST: " <sin_family = AF_INET; + addr->sin_addr.s_addr = ((struct in_addr *) (hp->h_addr))->s_addr; + addr->sin_port = htons(hid.port); + + _addrs.push_back(addr); + } +} + + +void Group::broadcast(Message & msg){ + if (DEBUG) + printf("Group::broadcast -- enter\n"); + std::vector::iterator saIter; + + for (int index = 0; index < _addrs.size(); index++){ + this->sendto(msg, index); + } + + if (DEBUG) + printf("Group::broadcast -- exit\n"); +} + + +void Group::sendto(Message &msg, int index){ + sockaddr_in * addr = _addrs[index]; + + if (DEBUG) + printf("Group::sendto -- for\n"); + + int message_len = msg.getRawSize(); + char * message = msg.getRaw(); + + int en; + + if (DEBUG) + printf("Group::sendto -- got mesg\n"); + + if ((en = ::sendto(_socket_desc, + message, + message_len, + 0, + (struct sockaddr*)addr, + sizeof(struct sockaddr_in)) < 0)){ + + perror("sendto failed\n"); + /* error */ + exit(-2); + } else { + if (DEBUG) + printf("Group::sendto -- done\n"); + } +} + +#undef DEBUG diff --git a/src/group.h b/src/group.h new file mode 100644 index 0000000..b062f62 --- /dev/null +++ b/src/group.h @@ -0,0 +1,39 @@ +#ifndef _GYR_GROUP_HOST_H +#define _GYR_GROUP_HOST_H + +#include +#include + +#include +#include +#include +#include +#include +#include + + +#include "message.h" + +class HostId { + public: + std::string host; + int port; +}; + +class Group { + public: + + private: + std::list _hosts; + std::vector _addrs; + int _socket_desc; + + protected: + public: + Group(std::list group); + + void sendto(Message &msg, int index); + void broadcast(Message &msg); +}; + +#endif diff --git a/src/groupmember.h b/src/groupmember.h new file mode 100644 index 0000000..37b9fa4 --- /dev/null +++ b/src/groupmember.h @@ -0,0 +1,22 @@ +#ifndef _GYR_GROUP_MEMBER_HH +#define _GYR_GROUP_MEMBER_HH + +#include "lowreceiver.h" +#include "highreceiver.h" +#include "lowsender.h" + +class GroupMember { + private: + std::list _group; + + HighReceiver * _high_receiver; // high level + LowReceiver * _low_receiver; // low level + LowSender * _sender; // low level + + protected: + public: + void addToGroup(int port); + +}; + +#endif // _GYR_COMPONENT_HH diff --git a/src/highreceiver.cc b/src/highreceiver.cc new file mode 100644 index 0000000..3ce9e13 --- /dev/null +++ b/src/highreceiver.cc @@ -0,0 +1,53 @@ + +#include "highreceiver.h" + +#include +#include +#include +#include +#include + +HighReceiver::HighReceiver(short int portHigh){ + + printf("LowReceiver::LowReceiver --"); + + _socket_desc = socket(AF_INET, SOCK_DGRAM, 0); + + /* et l'autre variante : AF_UNIX */ + if (_socket_desc < 0){ + /* error */ + perror("Creation de la socket impossible"); + fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__); + // FIXME: throw something + exit(-1); + + } + + struct sockaddr_in * _socket_addr = new sockaddr_in; + + // port_low = externe + // port_high = interne + bzero(_socket_addr,sizeof(sockaddr_in)); + _socket_addr->sin_family = AF_INET; + _socket_addr->sin_port = htons(portHigh); + _socket_addr->sin_addr.s_addr = htonl(INADDR_ANY); + + // chopper une socket + if (bind(_socket_desc,(struct sockaddr *)_socket_addr,sizeof(sockaddr_in)) < 0) { + //FIXME : throw something + perror("Attachement de la socket impossible"); + fprintf(stderr,"BOUM at %s:%d",__FILE__,__LINE__); + exit(-1); + } +} + +HighReceiver::~HighReceiver(){ + delete _socket_addr; +} + +void HighReceiver::run(){ + while(1){ + sleep(1); + printf("HighReceiver::run -- \n"); + } +} diff --git a/src/highreceiver.h b/src/highreceiver.h new file mode 100644 index 0000000..f8a0913 --- /dev/null +++ b/src/highreceiver.h @@ -0,0 +1,17 @@ +#ifndef _GYR_HIGH_RECEIVER_H +#define _GYR_HIGH_RECEIVER_H + +class HighReceiver { + private: + int _socket_desc; + struct sockaddr_in * _socket_addr; + + protected: + public: + HighReceiver(short int portHigh); + ~HighReceiver(); + + void run(); // thread part +}; + +#endif // _GYR_HIGH_RECEIVER_H diff --git a/src/lowreceiver.cc b/src/lowreceiver.cc new file mode 100644 index 0000000..168db37 --- /dev/null +++ b/src/lowreceiver.cc @@ -0,0 +1,144 @@ + +#include "lowreceiver.h" +#include "messagecell_ab.h" + +LowReceiver::LowReceiver(short port_low, + short port_high, + Group &grp, + Clock &clk) : + _port_low(port_low), _port_high(port_high), _group(grp), _clock(clk) +{ + + printf("LowReceiver::LowReceiver --\n"); + + _socket_desc = socket(AF_INET, SOCK_DGRAM, 0); + + /* et l'autre variante : AF_UNIX */ + if (_socket_desc < 0){ + /* error */ + perror("Creation de la socket impossible\n"); + fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__); + // FIXME: throw something + exit(-1); + + } + + _socket_addr = new sockaddr_in; + + + // port_low = externe + // port_high = interne + bzero(_socket_addr,sizeof(sockaddr_in)); + _socket_addr->sin_family = AF_INET; + _socket_addr->sin_port = htons(_port_low); + _socket_addr->sin_addr.s_addr = htonl(INADDR_ANY); + + // chopper une socket + if (bind(_socket_desc, + (struct sockaddr *)_socket_addr, + sizeof(sockaddr_in)) < 0) { + //FIXME : throw something + perror("Attachement de la socket impossible\n"); + fprintf(stderr,"BOUM at %s:%d\n",__FILE__,__LINE__); + exit(-1); + } +} + +LowReceiver::~LowReceiver() { + delete _socket_addr; +} + +void LowReceiver::run(){ + int buffer_len = 1500; + char * buffer = new char[buffer_len]; + struct sockaddr_in repaddr; + socklen_t sockaddr_len = sizeof(struct sockaddr_in); + Message * mesg = NULL; + while(1){ + // recevoir les données + memset(buffer, '\0', buffer_len); + bzero(&repaddr,sizeof(struct sockaddr_in)); + int read_buffer_len = recvfrom( + _socket_desc, + buffer, + buffer_len, + 0, + (struct sockaddr*)&repaddr, + &sockaddr_len); + + mesg = new Message(buffer, read_buffer_len); + printf("LowReceiver::run -- READ\n"); + char * str = new char[mesg->getDataSize() + 1]; + strncpy(str, mesg->getData(), mesg->getDataSize()); + str[mesg->getDataSize()] = '\0'; + printf("LowReceiver::run -- READ '%s'\n", str); + + this->manage(mesg); + + delete(mesg); + mesg = NULL; + // on dispatche les données en fonction du protocole... + } +} + + +void LowReceiver::manage(Message * mesg){ + switch(mesg->getType()){ + case Protocol::TYPE_TEST : + { + printf("LowReceiver::manage -- NOT IMPLEMENTED\n"); + } + break; + case Protocol::TYPE_ABCAST : + { + this->manage_abcast(mesg); + } + break; + case Protocol::TYPE_CBCAST : + { + this->manage_cbcast(mesg); + } + break; + default: + printf("LowReceiver::manage -- ERROR\n"); + break; + } +} + +void LowReceiver::manage_abcast(Message * mesg) { + static std::list fifo; + std::list::iterator iter; + printf("LowReceiver::manage_abcast -- init\n"); + + // FIXME: on suppose ne pas etre l'emetteur + // identifiant = horloge + id_site_emeteur + bool firstSeenMessage = true; + for (iter = fifo.begin(); iter != fifo.end(); iter++){ + MessageCellAb * cur = *iter; + if (cur->message == mesg) { + printf("LowReceiver::manage_abcast -- message seen\n"); + firstSeenMessage = false; + break; + } + } + + if (firstSeenMessage){ + // si le message est vu pour la premiere fois: + // - on l'ajoute dans la liste d'attente + MessageCellAb * cell = new MessageCellAb(); + cell->message = new Message(*mesg); //FIXME: make a copy; + cell->type = MessageCellAb::TYPE_TEMPORARY; + // - on retourne une estampille(reception) a l'emeteur + + } else { + // sinon + // - l'estampille du message est mise a jour + + // - le message est marqué comme final + // - on défile les estampille finale la + } +} + +void LowReceiver::manage_cbcast(Message * mesg) { + printf("LowReceiver::manage_cbcast -- init\n"); +} diff --git a/src/lowreceiver.h b/src/lowreceiver.h new file mode 100644 index 0000000..76aa5d4 --- /dev/null +++ b/src/lowreceiver.h @@ -0,0 +1,30 @@ +#ifndef _GYR_LOW_RECEIVER +#define _GYR_LOW_RECEIVER + +#include "group.h" +#include "clock.h" +#include "message.h" + +class LowReceiver { + private: + Group & _group; + Clock & _clock; + int _port_low; + int _port_high; + int _socket_desc; + struct sockaddr_in * _socket_addr; + + protected: + + public: + LowReceiver(short port_low, short port_high, Group & grp, Clock & clk); + ~LowReceiver(); + void run(); // thread part + + void manage(Message * mesg); + void manage_abcast(Message * mesg); + void manage_cbcast(Message * mesg); + +}; + +#endif diff --git a/src/lowsender.cc b/src/lowsender.cc new file mode 100644 index 0000000..b4f1a24 --- /dev/null +++ b/src/lowsender.cc @@ -0,0 +1,26 @@ + +#include "lowsender.h" + +#define MESSAGE "AB or CB cast from LowSender ? hehe...." + +LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type) : _group(grp), _clock(clk), _type(type) { + +} + +void LowSender::run(){ + // thread part + while(1){ + sleep(3); + printf("LowSender -- Broadcasting '%s'\n", MESSAGE); + + TimeStamp ts = _clock.inc(); + printf("LowSender::run -- Timestamp done\n"); + + Message msg(_type, ts, MESSAGE, strlen(MESSAGE)); + printf("LowSender::run -- Mesg done and ready to send\n"); + + _group.broadcast(msg); + printf("LowSender::run -- Mesg sent\n"); + } +} + diff --git a/src/lowsender.h b/src/lowsender.h new file mode 100644 index 0000000..c6f3d50 --- /dev/null +++ b/src/lowsender.h @@ -0,0 +1,22 @@ +#ifndef _GYR_LOW_SENDER_H +#define _GYR_LOW_SENDER_H + +#include "protocol.h" +#include "group.h" +#include "clock.h" +#include "message.h" + +class LowSender{ + private: + Group & _group; + Clock & _clock; + Protocol::Type _type; + + protected: + + public: + LowSender::LowSender(Group &grp, Clock &clk, Protocol::Type type); + void run(); // thread part +}; + +#endif diff --git a/src/message.cc b/src/message.cc new file mode 100644 index 0000000..706aa4d --- /dev/null +++ b/src/message.cc @@ -0,0 +1,244 @@ + +#include + +#include "message.h" + +#define DEBUG_INPUT 0 +#define DEBUG_OUTPUT 0 + +Message::Message(const Message & original){ + printf("Message::Message -- copy\n"); + + _size = original._size; + _raw = new char[_size]; + memcpy(_raw, original._raw, _size); + + _data_size = original._data_size; + _data = new char[_data_size]; + memcpy(_data, original._data, _data_size); + + _type = original._type; + + _stamp = new TimeStamp(*(original._stamp)); + +} + +Message::Message(void * data, int len) { + _raw = new char[len]; + memcpy(_raw, data, len); + _size = len; + + if (DEBUG_INPUT) + printf("Message::Message(void* , int) -- length %d\n",len); + + int index = 0; + // découper le type + _type = (Protocol::Type)(_raw[index]); + index += 1; + + /* + short stamp_index = -1; + short stamp_len = -1; + switch(_type){ + case Protocol::TYPE_ABCAST : + { + memcpy(&stamp_index, (_raw + index), 2); + stamp_index = ntohs(stamp_index); + stamp_len = 1; // 2 * 2 octets; + index += 2; + } + break; + case Protocol::TYPE_CBCAST : + { + // on est super embétés... + // on regarde d'abord l'index... + // stamp_len = 4i + memcpy(&stamp_index, (_raw + index), 2); + stamp_index = ntohs(stamp_index); + index += 2; + + memcpy(&stamp_len, (_raw + index), 2); + index += 2; + } + break; + case Protocol::TYPE_TEST : + { + // taille = 0; + stamp_len = 0; + } + break; + case Protocol::TYPE_UNKNOWN : + break; + default : + break; + } + + if (DEBUG_INPUT) { + printf("Message::Message(void *, int) -- stamp_index %d\n", stamp_index); + printf("Message::Message(void *, int) -- stamp_len %d\n", stamp_len); + } + if (stamp_len < 0){ + fprintf(stderr, "Longueur du timestamp inconnue\n !"); + exit(-1); + } + _stamp = new TimeStamp(_type, stamp_index); + // on itere stamp_len fois sur 2 octets + for (int i = 0; i< stamp_len; i++){ + short site_value; + memcpy(&site_value, (_raw + index), 2); + site_value = ntohs(site_value); + + if (DEBUG_INPUT) + printf("Message::Message -- horloge[%d] = %d\n", i, site_value); + + index += 2; + _stamp->push_back(site_value); + } + */ + _stamp = new TimeStamp(_type, (_raw + index), len - index); + index += _stamp->getRawSize(); + + // initialise message from the following data + _data_size = -1; + memcpy(&_data_size, (_raw + index), 2); + index += 2; + + if (DEBUG_INPUT) + printf("Message::Message -- message length %d\n", _data_size); + + _data = new char[_data_size]; + memcpy(_data, (_raw+index), _data_size); + +// if (DEBUG_INPUT) +// printf("Message::Message -- message data '%s'\n",_data); //FIXME (buffer overflow) +} + + +Message::Message(Protocol::Type type, TimeStamp ×tamp, char * mesg, short mesg_size) : + _type(type) +{ + _data = new char[mesg_size]; + _data_size = mesg_size; + strncpy(_data, mesg, _data_size); + + _stamp = new TimeStamp(timestamp); + + if (DEBUG_OUTPUT) + printf("Message::Message -- constructing from type, stamp, string\n"); + + // construire le message qui va bien a partir des parametres + unsigned char dgram_type; + unsigned int dgram_size; + char * dgram_stamp; + unsigned int dgram_type_idx; + unsigned int dgram_stamp_idx; + unsigned int dgram_messagesize_idx; + unsigned int dgram_messagedata_idx; + + printf("Message::Message -- initialising\n"); + + dgram_type_idx = 0; + dgram_stamp_idx = 1; + + if (DEBUG_OUTPUT) + printf("Message::Message -- computing message length\n"); + + dgram_messagesize_idx = dgram_stamp_idx + _stamp->getRawSize(); + + if (DEBUG_OUTPUT) + printf("Message::Message -- computing message length %d\n",dgram_messagesize_idx); + + dgram_messagedata_idx = dgram_messagesize_idx + 2; + + if (DEBUG_OUTPUT) + printf("Message::Message -- computing message length %d\n",dgram_messagedata_idx); + + dgram_size = dgram_messagedata_idx + _data_size; + _size = dgram_size; + + if (DEBUG_OUTPUT) + printf("Message::Message -- creating datagram (%d)\n", dgram_size); + + // we create the dgram + if (_raw != NULL){ free(_raw); } + _raw = new char[dgram_size]; + + // fill the dgram with type + _raw[0] = _type; + + // fill the dgram with timestamp + dgram_stamp = _stamp->getRaw(); + memcpy((_raw + dgram_stamp_idx), + dgram_stamp, _stamp->getRawSize()); + free(dgram_stamp); + + // fill the dgram with message + short ds = htons(_data_size); + memcpy((_raw + dgram_messagesize_idx), + &ds, + 2); + memcpy((_raw + dgram_messagedata_idx), + _data, + _data_size); + + if (DEBUG_OUTPUT) + printf("Message::raw -- MESSAGE(%c,\n\t",type); + for (int i=0; i < _size; i++){ + if (DEBUG_OUTPUT) + printf("%#x ", _raw[i]); + } + if (DEBUG_OUTPUT) printf(")\n"); + +} + + +char * Message::getRaw(){ + char * result = new char[_size]; + memcpy(result, _raw, _size); + + printf("Message::getRaw -- hex: \n"); + for (int i=0; i < _size; i++){ + printf("%#x ", result[i]); + } + printf("\n"); + + return result; +} + + +size_t Message::getRawSize(){ + printf("Message::getRawSize -- %d\n",_size); + return _size; +} + + +char * Message::getData(){ + char * result = new char[_data_size]; + memcpy(result, _data, _data_size); + + return result; +} + +short Message::getDataSize(){ + return _data_size; +} + + + +TimeStamp Message::getStamp(){ + return *_stamp; +} + + +Protocol::Type Message::getType(){ + return _type; +} + + +bool Message::operator==(Message &message) { + printf("Message::operator== -- \n"); + return (message.getStamp() == this->getStamp()); +} + +#undef DEBUG_INPUT +#undef DEBUG_OUTPUT diff --git a/src/message.h b/src/message.h new file mode 100644 index 0000000..ca560b2 --- /dev/null +++ b/src/message.h @@ -0,0 +1,35 @@ +#ifndef _GYR_MESSAGE_H +#define _GYR_MESSAGE_H + +#include + +#include "protocol.h" +#include "clock.h" + +class Message { + public: + + private: + char * _raw; + size_t _size; + char * _data; + short _data_size; + TimeStamp * _stamp; // construit dynamiquement + Protocol::Type _type; + protected: + public: + + Message(Protocol::Type type, TimeStamp &ts, char * data, short data_size); + Message(void * data, int len); + Message(const Message & mesg); + + char * getRaw(); + size_t getRawSize(); + char * getData(); + short getDataSize(); + Protocol::Type getType(); + TimeStamp getStamp(); + bool operator==(Message &message); +}; + +#endif diff --git a/src/messagecell_ab.cc b/src/messagecell_ab.cc new file mode 100644 index 0000000..21a53c5 --- /dev/null +++ b/src/messagecell_ab.cc @@ -0,0 +1,8 @@ + +#include "messagecell_ab.h" + +MessageCellAb::MessageCellAb(){ + printf("MessageCellAb::MessageCellAb -- constructor\n"); + this->message = NULL; + this->type = MessageCellAb::TYPE_UNDEF; +} diff --git a/src/messagecell_ab.h b/src/messagecell_ab.h new file mode 100644 index 0000000..e6e6dd2 --- /dev/null +++ b/src/messagecell_ab.h @@ -0,0 +1,21 @@ +#ifndef _MESSAGE_CELL_AB +#define _MESSAGE_CELL_AB + +#include "message.h" + +class MessageCellAb { + public: + typedef enum { + TYPE_UNDEF, + TYPE_TEMPORARY, + TYPE_DEFINITIVE + } Type; + + Message * message; + MessageCellAb::Type type; + + MessageCellAb(); +}; + +#endif + diff --git a/src/protocol.h b/src/protocol.h new file mode 100644 index 0000000..f0bb5c1 --- /dev/null +++ b/src/protocol.h @@ -0,0 +1,14 @@ +#ifndef _GYR_PROTOCOL_H +#define _GYR_PROTOCOL_H + +class Protocol { + public: + typedef enum { + TYPE_UNKNOWN='#', + TYPE_TEST='T', + TYPE_ABCAST='A', + TYPE_CBCAST='C' + } Type; +}; + +#endif // _GYR_PROTOCOL_H diff --git a/src/timestamp.cc b/src/timestamp.cc new file mode 100644 index 0000000..285685a --- /dev/null +++ b/src/timestamp.cc @@ -0,0 +1,175 @@ + +#include + +#include "timestamp.h" + +#define DEBUG 1 + +TimeStamp::TimeStamp(Protocol::Type type, short index){ + _index = index; + _type = type; +} + +TimeStamp::TimeStamp(Protocol::Type type, char * raw, short raw_size){ + int pos_idx = 0; + short stamp_len = -1; + + _index = -1; + _type = type; + switch(_type){ + case Protocol::TYPE_ABCAST : + { + memcpy(&_index, raw , 2); + _index = ntohs(_index); + pos_idx += 2; + stamp_len = 1; + } + break; + case Protocol::TYPE_CBCAST : + { + // on est super embétés... + // on regarde d'abord l'index... + // stamp_len = 4i + memcpy(&_index, raw , 2); + _index = ntohs(_index); + pos_idx += 2; + + memcpy(&stamp_len, (raw + pos_idx), 2); + stamp_len = ntohs(stamp_len); + pos_idx += 2; + } + break; + case Protocol::TYPE_TEST : + { + // taille = 0; + stamp_len = 0; + } + break; + case Protocol::TYPE_UNKNOWN : + break; + default : + break; + } + + if (DEBUG) { + printf("TimeStamp::TimeStamp(Protocol::Type, void *, int) -- stamp_index %d\n", _index); + printf("TimeStamp::TimeStamp(Protocol::Type, void *, int) -- stamp_len %d\n", stamp_len); + } + if (stamp_len < 0){ + fprintf(stderr, "TimeStamp::TimeStamp -- Longueur du timestamp inconnue\n !"); + exit(-1); + } + //_stamp = new TimeStamp(_type, stamp_index); + // on itere stamp_len fois sur 2 octets + for (int i = 0; i< stamp_len; i++){ + short net_site_value, host_site_value; + memcpy(&net_site_value, (raw + pos_idx), 2); + host_site_value = ntohs(net_site_value); + + if (DEBUG) + printf("TimeStamp::TimeStamp -- index %d horloge[%d] = %d -> %d\n", + pos_idx, i, net_site_value, host_site_value); + + pos_idx += 2; + this->push_back(host_site_value); + } + +} + +Protocol::Type TimeStamp::getType(){ + return _type; +} + +short TimeStamp::getIndex(){ + return _index; +} + +char * TimeStamp::getRaw(){ + int result_len = 0; + char * result = NULL; + switch(_type){ + case Protocol::TYPE_TEST : + { + result = NULL; + } + break; + case Protocol::TYPE_ABCAST : + { + result_len = 2; // 2 bytes for site index + result_len += 2; // 2 bytes for clock value + result = new char[result_len]; + + short index_value = htons(_index); + if (DEBUG) + printf("TimeStamp::raw -- index_value %d -> %d\n", _index, index_value); + + memcpy(result, &index_value, 2); // on fixe l'index + + short host_clock_value, net_clock_value; + host_clock_value = (*this)[0]; + net_clock_value = htons(host_clock_value); + + if (DEBUG) + printf("TimeStamp::raw -- clock_value %d -> %d\n",host_clock_value, net_clock_value); + + memcpy((result + 2), + &net_clock_value, 2); // on fixe l'index + + } + break; + case Protocol::TYPE_CBCAST : + { + } + break; + default: + printf("TimeStamp::raw -- undef type\n"); + break; + } + return result; +} + +unsigned short TimeStamp::getRawSize(){ + int result = -1; + switch(_type){ + case Protocol::TYPE_TEST : + { + result = 0; + } + break; + case Protocol::TYPE_ABCAST : + { + result = 4; + } + break; + case Protocol::TYPE_CBCAST : + { + result = 4; // FIXME: plus la taille du vecteur * 2; + } + break; + default: + printf("TimeStamp::raw_len -- undef type\n"); + break; + } + return result; +} + +bool TimeStamp::operator==(TimeStamp &stamp){ + printf("TimeStamp::operator== -- \n"); + bool ident = true; + if (this->_index != stamp._index){ + ident = false; + } + if (this->size() != stamp.size()){ + ident = false; + } else { + for (int i = 0; i < this->size(); i++){ + if ((*this)[i] != stamp[i]){ + ident = false; + break; + } + } + } + return ident; +} + +#undef DEBUG diff --git a/src/timestamp.h b/src/timestamp.h new file mode 100644 index 0000000..37ffb27 --- /dev/null +++ b/src/timestamp.h @@ -0,0 +1,25 @@ +#ifndef _GYR_TIMESTAMP +#define _GYR_TIMESTAMP + +#include "protocol.h" + +#include + +class TimeStamp : public std::vector { + private: + short _index; + Protocol::Type _type; + public: + TimeStamp(Protocol::Type t, short idx); + TimeStamp::TimeStamp(Protocol::Type type, char * raw, short raw_size); + + char * getRaw(); + unsigned short getRawSize(); + + Protocol::Type getType(); + short getIndex(); + bool operator==(TimeStamp &stamp); + +}; + +#endif