GHS
Leader Election Based on GHS Minimum Spanning Tree
ghs-demo.h
Go to the documentation of this file.
1 
39 #include <signal.h>
40 #include <unistd.h> //sleep
41 #include <sstream> //better than iostream
42 #include <cassert>
43 
44 #include "ghs-demo-config.h"
45 #include "ghs-demo-msgutils.h"
46 #include "ghs-demo-comms.h"
47 
48 #include "ghs/ghs.h"
49 #include "ghs/ghs_printer.h" //dump_edges
50 #include "ghs/msg_printer.h" //for printing GHS msgs.
51 #include "ghs/agent.h"
52 #include "ghs/edge.h"
53 #include "seque/static_queue.h"
54 
55 using le::ghs::GhsState;
56 using le::ghs::metric_t;
57 using le::ghs::agent_t;
58 using le::ghs::Edge;
59 using le::ghs::Msg;
60 
62 namespace demo{
63 
70  class GhsDemoExec{
71  public:
95  int do_main(int argc,char**argv);
96  };
97 
99  template<size_t AN, size_t QN>
101  {
102  std::vector<Edge> edges;
103  for (int i=0;i<cfg.n_agents;i++){
104  //don't add self links
105  if (i==cfg.my_id){ continue; }
106 
107  //dont' add dead links
108  if (c.kbps_to(i)==0){ continue; }
109 
110  //OK, link is fine
111  metric_t link_wt = (metric_t) c.unique_link_metric_to(i);
112  edges.push_back( { (agent_t)i, (agent_t)cfg.my_id, UNKNOWN, link_wt, } );
113  }
114 
115  //construct with all live links
116  GhsState<AN,QN> ghs(cfg.my_id,edges.data(),edges.size());
117 
118  //and eyeball-verify the links were added
119  for(int i=0;i<cfg.n_agents;i++){
120  if (i!=cfg.my_id){
121  printf("[info] Set edge: %d from %d (check=%d)\n",
122  i,
123  cfg.my_id,
124  ghs.has_edge(i));
125  Edge e;
126  ghs.get_edge(i,e);
127  printf("[info] (%d<--%d, %d %lu)\n",
128  e.peer,e.root,e.status,e.metric_val);
129  }
130  }
131  return ghs;
132  }
133 
135  int do_test_and_die(Comms& comms, Config &config){
136  printf("[info] running connectivity test after %fs!\n",config.wait_s);
137  sleep(config.wait_s);
138  comms.start_receiver();
139  sleep(1);
140  comms.little_iperf();
141  sleep(1);
142  printf("[info]================= measured\n");
143  comms.print_iperf();
144  sleep(1);
145  printf("[info]================= exchanging \n");
146  comms.exchange_iperf();
147  sleep(1);
148  printf("[info]================= post-exchange\n");
149  comms.print_iperf();
150  sleep(1);
151  printf("[info]================= again\n");
152  comms.print_iperf();
153  comms.stop_receiver();
154  return 0;
155  }
156 
157  int GhsDemoExec::do_main(int argc, char** argv)
158  {
159 
160  demo::Config config;
161  static const size_t COMMS_Q_SZ=256;
162 
163  demo::read_cfg_stdin(&config);
164  demo::read_cfg_cli(argc,argv,&config);
165 
166  printf("[info] Done configuring for id=%d... \n",config.my_id);
167 
168  if (!demo::cfg_is_ok(config)){
169  return 1;
170  }
171 
172 
173  //initialize the buffer used for input & output to get response messages from
174  //those state machines.
175  //Use your own here ...
177 
178 
179  //here's the queue to/from ghs TODO: unify message types.
181 
182  demo::Comms comms;
183  comms.with_config(config);
184 
185  if (!comms.ok()){
186  printf("[error] Cannot create comms from config\n");
187  return 1;
188  }
189 
190  if (config.test==1){
191  return demo::do_test_and_die(comms,config);
192  }
193 
194  static bool wegood=true;
195 
196  //stop loop on sigint
197  signal(SIGINT,[](int s){
198  printf("...... Received shutdown, joining / killing all threads ..... \n");
199  wegood=false;
200  });
201 
202  if (config.wait_s>0){
203  printf("[info] sleeping for %f seconds\n",config.wait_s);
204  sleep(config.wait_s);
205  }
206 
207  comms.start_receiver();
208  comms.little_iperf();
209  comms.print_iperf();
210  sleep(1);
211  comms.exchange_iperf();
212  comms.print_iperf();
213  sleep(1);
214  comms.print_iperf();
215 
216  GhsState<MAX_N,COMMS_Q_SZ> ghsp(-1,{},0);
217 
218  if (config.command==demo::Config::START){
219  //initialize all the message-driven state machines that need msg callbacks.
220  //In this case. Just GHS...
221  ghsp = demo::initialize_ghs<MAX_N,COMMS_Q_SZ>(config,comms);
222  size_t sent;
223  auto ret = ghsp.start_round(ghs_buf, sent);
224  if (ret != le::OK){
225  printf("[error] could not start ghs! (%d)\n", ret);
226  return 1;
227  }
228  }
229 
230 
231  while (wegood){
232 
234  //retrieve next message from background reader.
235  bool ok = comms.get_next(in);
236 
237  if (ok){
238  printf("[info] recv'd msg from %d to %d\n",in.header.agent_from, in.header.agent_to);
239 
240  //no shenanegans plz
241  assert(in.header.agent_to==config.my_id);
242 
243  //there might be other types, like link metrics, control messages, or
244  //other subsystems to prod. etc
245  switch (in.header.type){
247  {
248  break;
249  }
251  {
252  //with static size checking:
253  //Msg payload_msg=from_bytes<MAX_MSG_SZ>(in.bytes);
254  //or with compression / variable sizes
255  Msg payload_msg = from_bytes(in.bytes, in.header.payload_size);
256  //push msg to the subsystem
257  std::stringstream ss;
258  ss<<payload_msg;
259  printf("[info] received GHS msg: %s\n",ss.str().c_str());
260  size_t new_msg_ct=0;
261  le::Errno retval = ghsp.process(payload_msg,ghs_buf, new_msg_ct);
262  if (retval != le::OK){
263  printf("[error] could not call ghsp.process():%s",le::strerror(retval));
264  return 1;
265  }
266  printf("[info] # response msgs: %zu\n", new_msg_ct);
267  printf("[info] GHS waiting: %zu, delayed: %zu, leader: %d, parent: %d, level: %d\n",
268  ghsp.waiting_count(),
269  ghsp.delayed_count(),
270  ghsp.get_leader_id(),
271  ghsp.get_parent_id(),
272  ghsp.get_level());
273  printf("[info] Edges: %s\n",
274  dump_edges(ghsp).c_str());
275  break;
276  }
277  default: { printf("[error] unknown payload type: %d\n", in.header.type); wegood=false; break;}
278  }
279 
280 
281  }
282 
283 
284  //now process the outgoing msgs
285  //ghs example, others are similar:
286  //You don't really *need* to wrap messages like this ...
287  while(ghs_buf.size()>0){
288  demo::WireMessage out;
289  le::ghs::Msg out_pld;
290 
291  printf("[info] Have %u msgs to send\n", ghs_buf.size());
292  if (seque::OK!=ghs_buf.pop(out_pld)){
293  wegood=false;
294  break;
295  }
296  out.header.agent_to=out_pld.to();
297  out.header.agent_from=out_pld.from();
299  out.header.payload_size=sizeof(out_pld);
300  size_t bsz = sizeof(out_pld);
301  to_bytes(out_pld,out.bytes,bsz);
302  assert(bsz==sizeof(out_pld));
303  demo::Errno retval=comms.send(out);
304 
305  if (retval==demo::OK){
306  std::stringstream ss;
307  ss<<out_pld;
308  printf("[info] Sent: %s\n",ss.str().c_str());
309  } else if (retval == demo::ERR_NNG || retval == demo::ERR_HANGUP){
310  if (config.retry_connections){
311  printf("[error] Could not send, will retry: %d\n",retval);
312  ghs_buf.push(out_pld);
313  break;
314  } else {
315  printf("[error] Could not send, assuming gone: %d\n",retval);
316  }
317  } else {
318  printf("[error] demo error. We may have populated a message incorreclty %d\n",-retval);
319  }
320  }
321 
322  if (ghsp.is_converged()){
323  printf("Converged!\n");
324  wegood=false;
325  }
326 
327  }
328 
329  printf("[info] waiting a bit for cleanup ... \n");
330  sleep(3);
331  comms.stop_receiver();
332  printf("[info] Comms stopped ... Exiting\n");
333 
334  return 0;
335  }
336 }
ghs-demo-msgutils.h
Provides some convenience functions for dealing with demo::WireMessage buffers.
le::ghs::agent_t
int agent_t
problems for GhsState
Definition: agent.h:51
demo::Comms::little_iperf
void little_iperf()
Definition: ghs-demo-comms.cpp:381
demo::WireMessage
A wire-ready message structure that can encapsulate a variety of payloads for sending across the wire...
Definition: ghs-demo-comms.h:168
le::ghs::GhsState::get_edge
le::Errno get_edge(const agent_t &to, Edge &out) const
Definition: ghs_impl.hpp:855
demo::Header::type
PayloadType type
What payload type do we carry?
Definition: ghs-demo-comms.h:141
demo::Header::agent_from
uint16_t agent_from
Which agent id is this from?
Definition: ghs-demo-comms.h:139
demo::initialize_ghs
GhsState< AN, QN > initialize_ghs(Config &cfg, Comms &c)
Helper function to build edges from configuration information.
Definition: ghs-demo.h:100
demo::Comms::with_config
Comms & with_config(Config &)
Definition: ghs-demo-comms.cpp:133
le::ghs::metric_t
unsigned long metric_t
Definition: edge.h:63
ghs-demo-comms.h
contains all the wire-level data structures and the Comms object to manage them.
demo::Errno
Errno
return codes for the Comms object
Definition: ghs-demo-comms.h:72
demo::Config::test
int test
A bool to determine if we should do a test-and-die routine or not.
Definition: ghs-demo-config.h:76
demo::Config::command
enum demo::Config::@0 command
A special command enumeration.
demo::cfg_is_ok
bool cfg_is_ok(Config config)
Definition: ghs-demo-inireader.cpp:184
demo::Comms::print_iperf
void print_iperf()
Definition: ghs-demo-comms.cpp:407
le::Errno
Errno
Definition: errno.h:49
static_queue.h
a static-sized single-ended queue for use in GhsState
demo::Comms::exchange_iperf
void exchange_iperf()
Definition: ghs-demo-comms.cpp:355
le::strerror
const char * strerror(const Errno e)
Definition: errno.cpp:44
le::ghs::Msg
An aggregate type containing all the data to exchange with to/from information.
Definition: msg.h:143
demo::Config::START
@ START
Start MST construction after specified delay.
Definition: ghs-demo-config.h:85
demo
Definition: ghs-demo-clireader.cpp:44
demo::ERR_HANGUP
@ ERR_HANGUP
Connection was lost.
Definition: ghs-demo-comms.h:79
demo::GhsDemoExec
The main demo logic for executing le::ghs::GhsState across a network
Definition: ghs-demo.h:70
le::ghs::Edge::root
agent_t root
The root is the "from" side of the edge.
Definition: edge.h:106
demo::Comms
a message passing class that uses nng, suitable for testing GhsState
Definition: ghs-demo-comms.h:199
demo::ERR_NNG
@ ERR_NNG
NNG returned an error code, please see logging output.
Definition: ghs-demo-comms.h:80
demo::Header::payload_size
uint16_t payload_size
how big is the payload we carry?
Definition: ghs-demo-comms.h:143
demo::WireMessage::header
Header header
Msg meta information.
Definition: ghs-demo-comms.h:175
dump_edges
std::string dump_edges(const le::ghs::GhsState< N, A > &)
Will dump edges in a readable format.
demo::Config::my_id
int my_id
This agent's id.
Definition: ghs-demo-config.h:79
msg_printer.h
provides the implementation of std::ostream operations for lg::ghs objects
demo::PAYLOAD_TYPE_GHS
@ PAYLOAD_TYPE_GHS
message was intended for GHS, don't process, just send it
Definition: ghs-demo-comms.h:93
le::ghs::Edge::peer
agent_t peer
The peer is the "to" side of the edge.
Definition: edge.h:104
ghs.h
The main GhsState object
seque::StaticQueue
a static-sized single-ended queue for use in GhsState
Definition: static_queue.h:60
demo::Comms::start_receiver
void start_receiver()
Definition: ghs-demo-comms.cpp:472
le::ghs::UNKNOWN
@ UNKNOWN
We have not probed this edge for information yet, or have not recieved a reponse.
Definition: edge.h:84
demo::Config::retry_connections
bool retry_connections
If we fail to dial up an agent, should we retry later (true) or drop the message (false)
Definition: ghs-demo-config.h:90
edge.h
Edge structure definition and functions.
le::ghs::Edge::status
status_t status
The status of this edge, starting with UNKNOWN.
Definition: edge.h:108
demo::WireMessage::bytes
uint8_t bytes[PAYLOAD_MAX_SZ]
The actual payload to send. Note only the first N=demo::WireMessageHeader::payload_size will be sent ...
Definition: ghs-demo-comms.h:177
demo::Config
A struct that holds the union of all configuration variables.
Definition: ghs-demo-config.h:64
ghs-demo-config.h
contains all the Config objects and methods for configuring demo::Comms
ghs_printer.h
some conveneience implementations to print le::ghs::GhsState objects with std::ostream
demo::read_cfg_stdin
void read_cfg_stdin(Config *c)
Definition: ghs-demo-inireader.cpp:211
demo::GhsDemoExec::do_main
int do_main(int argc, char **argv)
A single entry point function that should be called from main()
Definition: ghs-demo.h:157
demo::Comms::stop_receiver
void stop_receiver()
Definition: ghs-demo-comms.cpp:477
agent.h
provides le::ghs::agent_t defintion
demo::Comms::unique_link_metric_to
CommsEdgeMetric unique_link_metric_to(const uint16_t agent_id) const
This function is really important for the working of your system
Definition: ghs-demo-comms.cpp:483
le::ghs::GhsState::has_edge
bool has_edge(const agent_t to) const
Definition: ghs_impl.hpp:848
demo::do_test_and_die
int do_test_and_die(Comms &comms, Config &config)
Run a quick little_iperf() round to check connectivity.
Definition: ghs-demo.h:135
demo::Comms::get_next
bool get_next(demo::WireMessage &)
Definition: ghs-demo-comms.cpp:300
seque::StaticQueue::pop
le::Errno pop()
Definition: static_queue_impl.hpp:96
le::ghs::Edge::metric_val
metric_t metric_val
By default, this edge has metric_val = WORST_METRIC.
Definition: edge.h:110
demo::Comms::kbps_to
Kbps kbps_to(const uint16_t agent_id) const
Definition: ghs-demo-comms.cpp:489
demo::Config::n_agents
int n_agents
The number of agents currently loaded.
Definition: ghs-demo-config.h:70
le::ghs::Edge
A struct to hold all the communication edge information.
Definition: edge.h:98
seque::StaticQueue::push
le::Errno push(const T item)
Definition: static_queue_impl.hpp:69
demo::read_cfg_cli
void read_cfg_cli(int argc, char **argv, Config *config)
Definition: ghs-demo-clireader.cpp:94
demo::PAYLOAD_TYPE_CONTROL
@ PAYLOAD_TYPE_CONTROL
Control message are for resetting internal state or commanding.
Definition: ghs-demo-comms.h:90
demo::Comms::send
Errno send(demo::WireMessage &, demo::OptMask=0)
Definition: ghs-demo-comms.cpp:336
le::OK
@ OK
The operation was successful.
Definition: errno.h:50
seque::StaticQueue::size
unsigned int size() const
Definition: static_queue_impl.hpp:64
demo::Header::agent_to
Destination agent_to
Which agent id is this sent to?
Definition: ghs-demo-comms.h:137
demo::OK
@ OK
No error.
Definition: ghs-demo-comms.h:73
le::ghs::GhsState
The main state machine for the GHS algorithm
Definition: ghs.h:82
demo::Config::wait_s
float wait_s
The time to wait before starting the main algorithm.
Definition: ghs-demo-config.h:67