GHS
Leader Election Based on GHS Minimum Spanning Tree
ghs-demo-comms.h
Go to the documentation of this file.
1 
41 #ifndef GHS_DEMO_COMMS
42 #define GHS_DEMO_COMMS
43 #include "ghs-demo-config.h"
44 #include "ghs-demo-msgutils.h"
45 #include "ghs-demo-edgemetrics.h"
46 #include "seque/static_queue.h"
47 #include <nng/nng.h>//req_s, rep_s, msg
48 #include <cstring> //memcpy, memset
49 #include <unordered_map>
50 #include <vector>
51 #include <thread>
52 #include <mutex>
53 
54 #ifndef COMMS_DEMO_MAX_N
55 #define COMMS_DEMO_MAX_N 8
57 #endif
58 
59 #ifndef PAYLOAD_MAX_SZ
60 #define PAYLOAD_MAX_SZ 1024
62 #endif
63 
67 namespace demo{
68 
72  enum Errno{
73  OK = 0,
81  };
82 
88  {
94  };
95 
96 //typedef uint8_t ControlCommand;
97 //#define CONTROL_TEST_MSG 1
98 //#define CONTROL_LATENCY 2
99 //#define CONTROL_START_GHS 12
100 //#define CONTROL_RESET_GHS 13
101 
105  typedef uint64_t OptMask;
106 
111  typedef uint16_t Destination;
112 #define MESSAGE_DEST_UNSET -1
113 
114 
122  typedef size_t SequenceCounter;
123 
132  struct
133  //__attribute__((__packed__,aligned(1))) <-- messes with doxygen
134  Header
135  {
137  Destination agent_to=MESSAGE_DEST_UNSET;
139  uint16_t agent_from;
143  uint16_t payload_size=0;
144  };
145 
151  struct
152  //__attribute__((__packed__,aligned(1))) <-- messes with doxygen
153  Control
154  {
157  };
158 
168  struct
169  //__attribute__((__packed__,aligned(1))) <-- messes with doxygen
171  {
178 
182  size_t size() const
183  {
184  return sizeof(demo::Header)
185  +sizeof(demo::Control)
187  }
188  };
189 
199  class Comms
200  {
201  public:
202  Comms();
203  ~Comms();
204  bool ok();
205 
211  static Comms& inst();
212 
223 
231 
241  void start_receiver();
247  void stop_receiver();
248 
254  bool has_msg();
255 
260 
265  void little_iperf();
271  void exchange_iperf();
272 
276  void print_iperf();
277 
281  Kbps kbps_to(const uint16_t agent_id) const;
282 
298  CommsEdgeMetric unique_link_metric_to(const uint16_t agent_id) const;
299 
300  private:
301 
302  void validate_sockets();
303  void read_loop();
304  int recv(demo::WireMessage& buf);
305  demo::Errno internal_send(demo::WireMessage &m, const char* endpoint, long &us_rt);
306 
308  bool read_continues;
309  Config ghs_cfg;
310  nng_listener ctr_listener = NNG_LISTENER_INITIALIZER;
311  nng_listener ghs_listener = NNG_LISTENER_INITIALIZER;
312  nng_socket incoming=NNG_SOCKET_INITIALIZER;
313  nng_socket outgoing=NNG_SOCKET_INITIALIZER;
314  size_t outgoing_seq;
315  std::array<size_t,COMMS_DEMO_MAX_N> sequence_counters;
316  std::array<Kbps,COMMS_DEMO_MAX_N> kbps;
317  std::mutex q_mut;
318  std::mutex send_mut;
319  std::thread reader_thread;
320 
321  };
322 }
323 
324 
325 #endif
demo::WireMessage::control
Control control
Msg control information.
Definition: ghs-demo-comms.h:173
ghs-demo-msgutils.h
Provides some convenience functions for dealing with demo::WireMessage buffers.
demo::Comms::little_iperf
void little_iperf()
Definition: ghs-demo-comms.cpp:381
demo::WireMessage
A wire-ready message structure that can encapsulate a variety of payloads for sending across the wire...
Definition: ghs-demo-comms.h:168
demo::Header::type
PayloadType type
What payload type do we carry?
Definition: ghs-demo-comms.h:141
demo::Header::agent_from
uint16_t agent_from
Which agent id is this from?
Definition: ghs-demo-comms.h:139
demo::ERR_BAD_PAYLOAD_SZ
@ ERR_BAD_PAYLOAD_SZ
Incorrect or no payload size.
Definition: ghs-demo-comms.h:74
ghs-demo-edgemetrics.h
provides the main comms metric calculation.
demo::Comms::with_config
Comms & with_config(Config &)
Definition: ghs-demo-comms.cpp:133
demo::ERR_DEST_UNSET
@ ERR_DEST_UNSET
Bad or unspecified Destination.
Definition: ghs-demo-comms.h:78
demo::OptMask
uint64_t OptMask
Definition: ghs-demo-comms.h:105
demo::Errno
Errno
return codes for the Comms object
Definition: ghs-demo-comms.h:72
Kbps
uint32_t Kbps
a 32 bit throughput measuremeng. 32 bits is important!
Definition: ghs-demo-edgemetrics.h:50
demo::Comms::print_iperf
void print_iperf()
Definition: ghs-demo-comms.cpp:407
static_queue.h
a static-sized single-ended queue for use in GhsState
demo::Comms::exchange_iperf
void exchange_iperf()
Definition: ghs-demo-comms.cpp:355
demo::Control::sequence
SequenceCounter sequence
The sequence counter used to avoid duplicates or detect lost messagesa.
Definition: ghs-demo-comms.h:156
demo
Definition: ghs-demo-clireader.cpp:44
demo::ERR_HANGUP
@ ERR_HANGUP
Connection was lost.
Definition: ghs-demo-comms.h:79
demo::Comms
a message passing class that uses nng, suitable for testing GhsState
Definition: ghs-demo-comms.h:199
demo::Header
a structure that defines source and destination for WireMessage objects
Definition: ghs-demo-comms.h:132
demo::ERR_NNG
@ ERR_NNG
NNG returned an error code, please see logging output.
Definition: ghs-demo-comms.h:80
demo::Header::payload_size
uint16_t payload_size
how big is the payload we carry?
Definition: ghs-demo-comms.h:143
demo::WireMessage::header
Header header
Msg meta information.
Definition: ghs-demo-comms.h:175
demo::PAYLOAD_TYPE_GHS
@ PAYLOAD_TYPE_GHS
message was intended for GHS, don't process, just send it
Definition: ghs-demo-comms.h:93
seque::StaticQueue< demo::WireMessage, 1024 >
demo::Comms::start_receiver
void start_receiver()
Definition: ghs-demo-comms.cpp:472
PAYLOAD_MAX_SZ
#define PAYLOAD_MAX_SZ
The max size of the payload to send over the wire.
Definition: ghs-demo-comms.h:61
demo::WireMessage::bytes
uint8_t bytes[PAYLOAD_MAX_SZ]
The actual payload to send. Note only the first N=demo::WireMessageHeader::payload_size will be sent ...
Definition: ghs-demo-comms.h:177
demo::Config
A struct that holds the union of all configuration variables.
Definition: ghs-demo-config.h:64
demo::PAYLOAD_TYPE_METRICS
@ PAYLOAD_TYPE_METRICS
Metrics messages are for exchanging data about links.
Definition: ghs-demo-comms.h:91
ghs-demo-config.h
contains all the Config objects and methods for configuring demo::Comms
demo::Comms::stop_receiver
void stop_receiver()
Definition: ghs-demo-comms.cpp:477
demo::SequenceCounter
size_t SequenceCounter
Definition: ghs-demo-comms.h:122
demo::WireMessage::size
size_t size() const
Definition: ghs-demo-comms.h:182
demo::ERR_UNRECOGNIZED_PAYLOAD_TYPE
@ ERR_UNRECOGNIZED_PAYLOAD_TYPE
Payload type unrecognized.
Definition: ghs-demo-comms.h:76
demo::Destination
uint16_t Destination
Definition: ghs-demo-comms.h:111
demo::Comms::unique_link_metric_to
CommsEdgeMetric unique_link_metric_to(const uint16_t agent_id) const
This function is really important for the working of your system
Definition: ghs-demo-comms.cpp:483
demo::PAYLOAD_TYPE_NOT_SET
@ PAYLOAD_TYPE_NOT_SET
Not set (produces error)
Definition: ghs-demo-comms.h:89
demo::Comms::has_msg
bool has_msg()
Definition: ghs-demo-comms.cpp:295
demo::Comms::get_next
bool get_next(demo::WireMessage &)
Definition: ghs-demo-comms.cpp:300
demo::ERR_NO_PAYLOAD_TYPE
@ ERR_NO_PAYLOAD_TYPE
Payload type not specified.
Definition: ghs-demo-comms.h:75
demo::Control
a structure that contains control information for WireMessage objects
Definition: ghs-demo-comms.h:151
demo::Comms::kbps_to
Kbps kbps_to(const uint16_t agent_id) const
Definition: ghs-demo-comms.cpp:489
demo::PAYLOAD_TYPE_PING
@ PAYLOAD_TYPE_PING
Ping messages are used to benchmark links to gather metrics.
Definition: ghs-demo-comms.h:92
demo::PAYLOAD_TYPE_CONTROL
@ PAYLOAD_TYPE_CONTROL
Control message are for resetting internal state or commanding.
Definition: ghs-demo-comms.h:90
demo::Comms::send
Errno send(demo::WireMessage &, demo::OptMask=0)
Definition: ghs-demo-comms.cpp:336
demo::ERR_NULL_SRC
@ ERR_NULL_SRC
Bad from field or not set.
Definition: ghs-demo-comms.h:77
demo::PayloadType
PayloadType
the payload type
Definition: ghs-demo-comms.h:87
demo::Header::agent_to
Destination agent_to
Which agent id is this sent to?
Definition: ghs-demo-comms.h:137
demo::OK
@ OK
No error.
Definition: ghs-demo-comms.h:73
demo::Comms::inst
static Comms & inst()
CommsEdgeMetric
uint64_t CommsEdgeMetric
a 64-bit edge metric. 64 bits is important!
Definition: ghs-demo-edgemetrics.h:45