GHS
Leader Election Based on GHS Minimum Spanning Tree
|
Go to the documentation of this file.
95 int do_main(
int argc,
char**argv);
99 template<
size_t AN,
size_t QN>
102 std::vector<Edge> edges;
105 if (i==cfg.
my_id){
continue; }
108 if (c.
kbps_to(i)==0){
continue; }
121 printf(
"[info] Set edge: %d from %d (check=%d)\n",
127 printf(
"[info] (%d<--%d, %d %lu)\n",
136 printf(
"[info] running connectivity test after %fs!\n",config.
wait_s);
142 printf(
"[info]================= measured\n");
145 printf(
"[info]================= exchanging \n");
148 printf(
"[info]================= post-exchange\n");
151 printf(
"[info]================= again\n");
161 static const size_t COMMS_Q_SZ=256;
166 printf(
"[info] Done configuring for id=%d... \n",config.
my_id);
186 printf(
"[error] Cannot create comms from config\n");
194 static bool wegood=
true;
197 signal(SIGINT,[](
int s){
198 printf(
"...... Received shutdown, joining / killing all threads ..... \n");
203 printf(
"[info] sleeping for %f seconds\n",config.
wait_s);
221 ghsp = demo::initialize_ghs<MAX_N,COMMS_Q_SZ>(config,comms);
223 auto ret = ghsp.start_round(ghs_buf, sent);
225 printf(
"[error] could not start ghs! (%d)\n", ret);
257 std::stringstream ss;
259 printf(
"[info] received GHS msg: %s\n",ss.str().c_str());
261 le::Errno retval = ghsp.process(payload_msg,ghs_buf, new_msg_ct);
263 printf(
"[error] could not call ghsp.process():%s",
le::strerror(retval));
266 printf(
"[info] # response msgs: %zu\n", new_msg_ct);
267 printf(
"[info] GHS waiting: %zu, delayed: %zu, leader: %d, parent: %d, level: %d\n",
268 ghsp.waiting_count(),
269 ghsp.delayed_count(),
270 ghsp.get_leader_id(),
271 ghsp.get_parent_id(),
273 printf(
"[info] Edges: %s\n",
277 default: { printf(
"[error] unknown payload type: %d\n", in.
header.
type); wegood=
false;
break;}
287 while(ghs_buf.
size()>0){
291 printf(
"[info] Have %u msgs to send\n", ghs_buf.
size());
300 size_t bsz =
sizeof(out_pld);
301 to_bytes(out_pld,out.
bytes,bsz);
302 assert(bsz==
sizeof(out_pld));
306 std::stringstream ss;
308 printf(
"[info] Sent: %s\n",ss.str().c_str());
311 printf(
"[error] Could not send, will retry: %d\n",retval);
312 ghs_buf.
push(out_pld);
315 printf(
"[error] Could not send, assuming gone: %d\n",retval);
318 printf(
"[error] demo error. We may have populated a message incorreclty %d\n",-retval);
322 if (ghsp.is_converged()){
323 printf(
"Converged!\n");
329 printf(
"[info] waiting a bit for cleanup ... \n");
332 printf(
"[info] Comms stopped ... Exiting\n");
Provides some convenience functions for dealing with demo::WireMessage buffers.
int agent_t
problems for GhsState
Definition: agent.h:51
void little_iperf()
Definition: ghs-demo-comms.cpp:381
A wire-ready message structure that can encapsulate a variety of payloads for sending across the wire...
Definition: ghs-demo-comms.h:168
le::Errno get_edge(const agent_t &to, Edge &out) const
Definition: ghs_impl.hpp:855
GhsState< AN, QN > initialize_ghs(Config &cfg, Comms &c)
Helper function to build edges from configuration information.
Definition: ghs-demo.h:100
Comms & with_config(Config &)
Definition: ghs-demo-comms.cpp:133
unsigned long metric_t
Definition: edge.h:63
contains all the wire-level data structures and the Comms object to manage them.
Errno
return codes for the Comms object
Definition: ghs-demo-comms.h:72
int test
A bool to determine if we should do a test-and-die routine or not.
Definition: ghs-demo-config.h:76
enum demo::Config::@0 command
A special command enumeration.
bool cfg_is_ok(Config config)
Definition: ghs-demo-inireader.cpp:184
void print_iperf()
Definition: ghs-demo-comms.cpp:407
Errno
Definition: errno.h:49
a static-sized single-ended queue for use in GhsState
void exchange_iperf()
Definition: ghs-demo-comms.cpp:355
const char * strerror(const Errno e)
Definition: errno.cpp:44
An aggregate type containing all the data to exchange with to/from information.
Definition: msg.h:143
@ START
Start MST construction after specified delay.
Definition: ghs-demo-config.h:85
Definition: ghs-demo-clireader.cpp:44
@ ERR_HANGUP
Connection was lost.
Definition: ghs-demo-comms.h:79
The main demo logic for executing le::ghs::GhsState across a network
Definition: ghs-demo.h:70
agent_t root
The root is the "from" side of the edge.
Definition: edge.h:106
a message passing class that uses nng, suitable for testing GhsState
Definition: ghs-demo-comms.h:199
@ ERR_NNG
NNG returned an error code, please see logging output.
Definition: ghs-demo-comms.h:80
Header header
Msg meta information.
Definition: ghs-demo-comms.h:175
std::string dump_edges(const le::ghs::GhsState< N, A > &)
Will dump edges in a readable format.
int my_id
This agent's id.
Definition: ghs-demo-config.h:79
provides the implementation of std::ostream operations for lg::ghs objects
@ PAYLOAD_TYPE_GHS
message was intended for GHS, don't process, just send it
Definition: ghs-demo-comms.h:93
agent_t peer
The peer is the "to" side of the edge.
Definition: edge.h:104
a static-sized single-ended queue for use in GhsState
Definition: static_queue.h:60
void start_receiver()
Definition: ghs-demo-comms.cpp:472
@ UNKNOWN
We have not probed this edge for information yet, or have not recieved a reponse.
Definition: edge.h:84
bool retry_connections
If we fail to dial up an agent, should we retry later (true) or drop the message (false)
Definition: ghs-demo-config.h:90
Edge structure definition and functions.
status_t status
The status of this edge, starting with UNKNOWN.
Definition: edge.h:108
uint8_t bytes[PAYLOAD_MAX_SZ]
The actual payload to send. Note only the first N=demo::WireMessageHeader::payload_size will be sent ...
Definition: ghs-demo-comms.h:177
A struct that holds the union of all configuration variables.
Definition: ghs-demo-config.h:64
contains all the Config objects and methods for configuring demo::Comms
some conveneience implementations to print le::ghs::GhsState objects with std::ostream
void read_cfg_stdin(Config *c)
Definition: ghs-demo-inireader.cpp:211
int do_main(int argc, char **argv)
A single entry point function that should be called from main()
Definition: ghs-demo.h:157
void stop_receiver()
Definition: ghs-demo-comms.cpp:477
provides le::ghs::agent_t defintion
CommsEdgeMetric unique_link_metric_to(const uint16_t agent_id) const
This function is really important for the working of your system
Definition: ghs-demo-comms.cpp:483
bool has_edge(const agent_t to) const
Definition: ghs_impl.hpp:848
int do_test_and_die(Comms &comms, Config &config)
Run a quick little_iperf() round to check connectivity.
Definition: ghs-demo.h:135
bool get_next(demo::WireMessage &)
Definition: ghs-demo-comms.cpp:300
le::Errno pop()
Definition: static_queue_impl.hpp:96
metric_t metric_val
By default, this edge has metric_val = WORST_METRIC.
Definition: edge.h:110
Kbps kbps_to(const uint16_t agent_id) const
Definition: ghs-demo-comms.cpp:489
int n_agents
The number of agents currently loaded.
Definition: ghs-demo-config.h:70
A struct to hold all the communication edge information.
Definition: edge.h:98
le::Errno push(const T item)
Definition: static_queue_impl.hpp:69
void read_cfg_cli(int argc, char **argv, Config *config)
Definition: ghs-demo-clireader.cpp:94
@ PAYLOAD_TYPE_CONTROL
Control message are for resetting internal state or commanding.
Definition: ghs-demo-comms.h:90
Errno send(demo::WireMessage &, demo::OptMask=0)
Definition: ghs-demo-comms.cpp:336
@ OK
The operation was successful.
Definition: errno.h:50
unsigned int size() const
Definition: static_queue_impl.hpp:64
@ OK
No error.
Definition: ghs-demo-comms.h:73
The main state machine for the GHS algorithm
Definition: ghs.h:82
float wait_s
The time to wait before starting the main algorithm.
Definition: ghs-demo-config.h:67