GHS
Leader Election Based on GHS Minimum Spanning Tree
ghs_impl.hpp
Go to the documentation of this file.
1 
41 #include "ghs.h"
42 #include "msg.h"
43 
44 #ifndef NDEBUG
45 #include <stdexcept>
46 #define ghs_fatal(s) throw std::runtime_error(std::string(__FILE__)+":"+std::to_string(__LINE__)+" "+strerror(s))
47 #else
48 #define ghs_fatal(s) printf("[fatal] %s",strerror(s))
49 #endif
50 
51 using std::max;
52 
53 using namespace le::ghs::msg;
54 
55 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
56 GhsState<MAX_AGENTS, BUF_SZ>::GhsState(agent_t my_id, Edge* edges, size_t num_edges) {
57  this->my_id = my_id;
58  reset();
59  for (size_t idx=0;idx<num_edges;idx++){
60  Edge e=edges[idx];
61  if (! is_valid(e)){
62  continue;
63  }
64  auto err = set_edge(e);
65  if (OK==err){ continue; }
66  if (TOO_MANY_AGENTS==err){ break; }
67  }
68 }
69 
70 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
72 
76 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
77 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::reset() {
78  n_peers=0;
79  set_leader_id(my_id);
80  set_level(LEVEL_START);
81  set_parent_id(my_id);
82 
83  for (size_t i=0;i<MAX_AGENTS;i++){
84  peers[i] = NO_AGENT;
85  outgoing_edges[i].status=UNKNOWN;
86  waiting_for_response[i]=false;
87  response_required[i]=false;
88  response_prompt[i]={};
89  }
90  this->best_edge = worst_edge();
91  this->algorithm_converged = false;
92 
93  return OK;
94 }
95 
96 
101 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
103  //If I'm leader, then I need to start the process. Otherwise wait.
104  if (get_leader_id() == get_id()){
105  //nobody tells us what to do but ourselves
106  return process_srch(get_id(), {get_leader_id(), get_level()}, outgoing_buffer, qsz);
107  }
108  qsz=0;
109  return OK;
110 }
111 
112 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
114  return best_edge;
115 }
116 
117 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
119 
120  if (msg.from()==my_id){
121  //ghs_debug_crash("Received msg.from() self!");
122  return PROCESS_SELFMSG;
123  }
124 
125  if (msg.to() != my_id){
126  return PROCESS_NOTME;
127  }
128 
129  if (! has_edge(msg.from()) ){
130  return PROCESS_NO_EDGE_FOUND;
131  }
132 
133  switch (msg.type()){
134  case (msg::Type::SRCH):{ return process_srch( msg.from(), msg.data().srch, outgoing_buffer, qsz); }
135  case (msg::Type::SRCH_RET):{ return process_srch_ret( msg.from(), msg.data().srch_ret, outgoing_buffer, qsz); }
136  case (msg::Type::IN_PART):{ return process_in_part( msg.from(), msg.data().in_part, outgoing_buffer, qsz); }
137  case (msg::Type::ACK_PART):{ return process_ack_part( msg.from(), msg.data().ack_part, outgoing_buffer, qsz); }
138  case (msg::Type::NACK_PART):{ return process_nack_part( msg.from(), msg.data().nack_part, outgoing_buffer, qsz); }
139  case (msg::Type::JOIN_US):{ return process_join_us( msg.from(), msg.data().join_us, outgoing_buffer, qsz); }
140  case (msg::Type::NOOP):{ return process_noop( outgoing_buffer , qsz); }
141  default:{ return PROCESS_INVALID_TYPE; }
142  }
143  return OK;
144 }
145 
146 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
148 {
149 
150  //this msg is weird, in that we sometimes trigger internally with from==my_id
151 
152  if (from!=my_id){//must be MST edge then
153  Edge to_them;
154  auto ret = get_edge(from, to_them);
155  if (ret != OK)
156  {
157  return ret;
158  }
159  //HOWEVER. it may not be an MST_PARENT at the moment, because in fact the
160  //parent-status is set by the SRCH messages propegating (see, like 10 lines
161  //later).
162  //
163  //SO DONT DO THIS:
164  //if (to_them.status!=MST_PARENT )
165  //{
166  // return PROCESS_REQ_MST;
167  //}
168  }
169 
170  if (waiting_count() != 0 )
171  {
172  return SRCH_STILL_WAITING;
173  }
174 
175  //grab the new partition information, since only one node / partition sends srch() msgs.
176  agent_t leader = data.your_leader;
177  level_t level = data.your_level;
178  my_leader = leader;
179  my_level = level;
180  //also note our parent may have changed
181  auto err = set_parent_id(from);
182  if (OK!=err){return err;}
183 
184  //initialize the best edge to a bad value for comparisons
185  best_edge = worst_edge();
186  best_edge.root = my_id;
187 
188  //we'll cache outgoing messages temporarily
189  StaticQueue<Msg,BUF_SZ> srchbuf;
190 
191  //first broadcast the SRCH down the tree
192  msg::Data to_send;
193  to_send.srch = SrchPayload{my_leader, my_level};
194  size_t srch_sent=0;
195  le::Errno srch_ret = mst_broadcast(msg::Type::SRCH, to_send, srchbuf,srch_sent);
196  if (srch_ret!=OK){
197  return srch_ret;
198  }
199 
200  //then ping unknown edges
201  //OPTIMIZATION: Ping neighbors in sorted order, rather than flooding
202 
203  to_send.in_part = InPartPayload{my_leader, my_level};
204  size_t part_sent=0;
205  le::Errno part_ret = typecast(status_t::UNKNOWN, msg::Type::IN_PART, to_send, srchbuf, part_sent);
206  if (part_ret!=OK){
207  return part_ret;
208  }
209 
210  //remember who we sent to so we can wait for them:
211  size_t srchbuf_sz = srchbuf.size();
212  if (srchbuf_sz != srch_sent + part_sent){
213  return ERR_QUEUE_MSGS;
214  }
215 
216  //at this point, we may not have sent any msgs, because:
217  //1) There are no unknown outgoing edges
218  //2) There are no children to relay the srch to
219  //
220  //If that's the case, we can safely respond with "No MWOE" and that's it.
221  if (srchbuf_sz == 0 && delayed_count() ==0){
222  return respond_no_mwoe(buf,qsz);
223  }
224 
225  //past here, we know we either had srchbuf msgs, or a delayed msg.
226 
227  //first handle srchbuf msgs
228  //push temporarily cache'd messages to outgoingn buf, and note the receiver
229  //ID so we can track their response later.
230  size_t buf_sz_before = buf.size();
231  for (size_t i=0;i<srchbuf_sz;i++){
232  Msg m;
233  srchbuf.pop(m);
234  set_waiting_for(m.to(), true);
235  buf.push(m);
236  }
237 
238  if ( (buf.size() - buf_sz_before != srch_sent + part_sent ) )
239  {
240  return ERR_QUEUE_MSGS;
241  }
242 
243  //make sure to check_new_level, since our level may have changed, above,
244  //which will handled delayed_count != 0;
245  size_t old_msgs_processed=0;
246  le::Errno lvl_err = check_new_level(buf,old_msgs_processed);
247  if (lvl_err != OK ){
248  return lvl_err;
249  }
250 
251  //notify hunky dory
252  qsz = srch_sent + part_sent + old_msgs_processed;
253  return OK;
254 }
255 
256 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
257 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::respond_no_mwoe( StaticQueue<Msg, BUF_SZ> &buf, size_t & qsz)
258 {
259  msg::Data pld;
260  pld.srch_ret.to=0;
261  pld.srch_ret.from=0;
262  pld.srch_ret.metric = WORST_METRIC;
263  return mst_convergecast(msg::Type::SRCH_RET, pld, buf,qsz);
264 }
265 
266 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
267 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::process_srch_ret( agent_t from, const SrchRetPayload &data, StaticQueue<Msg,BUF_SZ>&buf, size_t & qsz)
268 {
269 
270 
271  if (this->waiting_count() == 0){
272  return UNEXPECTED_SRCH_RET;
273  }
274 
275  bool wf=false;
276  auto wfr = is_waiting_for(from,wf);
277  if (OK!=wfr){
278  return wfr;
279  }
280  if ( !wf ){
281  return UNEXPECTED_SRCH_RET;
282  }
283 
284  auto swfr = set_waiting_for(from,false);
285  if (OK != swfr ){
286  return swfr;
287  }
288 
289  //compare our best edge to their best edge
290  //first, get their best edge
291  Edge theirs;
292  theirs.peer = data.to;
293  theirs.root = data.from;
294  theirs.metric_val = data.metric;
295 
296  if (theirs.metric_val < best_edge.metric_val){
297  best_edge.root = theirs.root;
298  best_edge.peer = theirs.peer;
299  best_edge.metric_val = theirs.metric_val;
300  }
301 
302  return check_search_status(buf,qsz);
303 }
304 
305 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
306 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::process_in_part( agent_t from, const InPartPayload& data, StaticQueue<Msg,BUF_SZ>&buf, size_t & qsz)
307 {
308  //let them know if we're in their partition or not. Easy.
309  agent_t part_id = data.leader;
310 
311  //except if they are *ahead* of us in the execution of their algorithm. That is, what if we
312  //don't actually know if we are in their partition or not? This is detectable if their level > ours.
313  level_t their_level = data.level;
314  level_t our_level = my_level;
315 
316  if (their_level <= our_level){
317  //They aren't behind, so we can respond
318  if (part_id == this->my_leader){
319  Msg to_send (from, my_id, AckPartPayload{});
320  buf.push( to_send );
321  //do not do this:
322  //waiting_for.erase(from);
323  //set_edge_status(from,DELETED);
324  //(breaks the contract of IN_PART messages, because now we don't need a
325  //response to the one we must have sent to them.
326  qsz=1;
327  return OK;
328  } else {
329  Msg to_send (from, my_id, NackPartPayload{});
330  buf.push (to_send);
331  qsz=1;
332  return OK;
333  }
334  } else {
335  respond_later(from, data);
336  qsz=0;
337  return OK;
338  }
339 }
340 
341 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
342 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::process_ack_part( agent_t from, const AckPartPayload& data, StaticQueue<Msg,BUF_SZ>&buf, size_t & qsz)
343 {
344  //is this the right time to receive this msg?
345  bool wf=false;
346  auto wfr = is_waiting_for(from,wf);
347  if (OK!=wfr){
348  return wfr;
349  }
350  if (!wf)
351  {
352  return ACK_NOT_WAITING;
353  }
354 
355  //we now know that the sender is in our partition. Mark their edge as deleted
356  auto sesr = set_edge_status(from, DELETED);
357  if (OK != sesr){
358  return sesr;
359  }
360 
361  auto swfr = set_waiting_for(from, false);
362  if (OK != swfr){
363  return swfr;
364  }
365 
366  return check_search_status(buf,qsz);
367 }
368 
369 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
370 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::process_nack_part( agent_t from, const NackPartPayload &data, StaticQueue<Msg,BUF_SZ>&buf, size_t & qsz)
371 {
372  //is this the right time to receive this msg?
373  bool wf=false;
374  auto wfr = is_waiting_for(from,wf);
375  if (OK!=wfr){
376  return wfr;
377  }
378  if (!wf)
379  {
380  return ACK_NOT_WAITING;
381  }
382 
383  Edge their_edge;
384  //although has_edge was checked, we are careful anyway
385  auto ger = get_edge(from, their_edge);
386  if (OK != ger)
387  {
388  return PROCESS_NO_EDGE_FOUND;
389  }
390 
391  if (best_edge.metric_val > their_edge.metric_val){
392  best_edge = their_edge;
393  }
394 
395  auto swfr = set_waiting_for(from,false);
396  if (OK != swfr){
397  return swfr;
398  }
399 
400  return check_search_status(buf,qsz);
401 }
402 
403 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
404 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::check_search_status(StaticQueue<Msg,BUF_SZ> &buf, size_t & qsz){
405 
406  if (waiting_count() == 0)
407  {
408  auto e = mwoe();
409  bool am_leader = (my_leader == my_id);
410  bool found_new_edge = (e.metric_val < WORST_METRIC);
411  bool its_my_edge = (mwoe().root == my_id);
412 
413  if (!am_leader){
414  //pass on results, no matter how bad
415  msg::Data send_data;
416  send_data.srch_ret = SrchRetPayload{e.peer, e.root, e.metric_val};
417  return mst_convergecast( msg::Type::SRCH_RET, send_data, buf, qsz);
418  }
419 
420  if (am_leader && found_new_edge && its_my_edge){
421  if (e.peer == e.root){
422  return BAD_MSG;
423  }
424  //just start the process to join up, rather than sending messages
425  return process_join_us(my_id, {e.peer, e.root, get_leader_id(), get_level()}, buf, qsz);
426  }
427 
428  if (am_leader && !found_new_edge ){
429  //I'm leader, no new edge, let's move on b/c we're done here
430  return process_noop( buf, qsz);
431  }
432 
433  if (am_leader && found_new_edge && !its_my_edge){
434  //inform the crew to add the edge
435  //This is a bit awkward ...
436  msg::Data data;
437  data.join_us = JoinUsPayload{e.peer, e.root, get_leader_id(), get_level()};
438  return mst_broadcast( JOIN_US, data, buf, qsz);
439  }
440  }
441 
442  //nothing to do
443  qsz=0;
444  return OK;
445 }
446 
447 
448 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
449 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::check_new_level( StaticQueue<Msg,BUF_SZ> &buf, size_t & qsz){
450 
451  qsz=0;
452  for (size_t idx=0;idx<n_peers;idx++){
453  if (response_required[idx]){
454  agent_t who = peers[idx];
455  InPartPayload &m = response_prompt[idx];
456  level_t their_level = m.level;
457  if (their_level <= get_level() )
458  {
459  size_t sentsz=0;
460  //ok to answer, they were waiting for us to catch up
461  le::Errno ret=process_in_part(who, m, buf, sentsz);
462  if (ret!=OK){
463  //some error, propegate up
464  return ret;
465  }
466  qsz+=sentsz;
467  response_required[idx]=false;
468  }
469  }
470  }
471  return OK;
472 }
473 
474 
475 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
476 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::process_join_us( agent_t from, const JoinUsPayload &data, StaticQueue<Msg,BUF_SZ>&buf, size_t & qsz)
477 {
478 
479  auto join_peer = data.join_peer; // the side of the edge that is in the other partition
480  auto join_root = data.join_root; // the side of the edge that is in the partition initiating join
481  auto join_lead = data.proposed_leader; // the leader, as declared during search, of the peer
482  auto join_level = data.proposed_level; // the level of the peer as declared during search
483 
484  bool not_involved = (join_root != my_id && join_peer != my_id);
485  bool in_initiating_partition = (join_root == my_id);
486 
487  if (not_involved){
488  if (join_lead != my_leader){
489  return JOIN_BAD_LEADER;
490  }
491  if (join_level != my_level){
492  return JOIN_BAD_LEVEL;
493  }
494 
495  msg::Data to_send;
496  to_send.join_us = data;
497  return mst_broadcast(msg::Type::JOIN_US, to_send, buf, qsz);
498  }
499 
500  //let's find the edge to the other partition
501  Edge edge_to_other_part;
502 
503  if (in_initiating_partition){
504  //leader CAN be different, even though we're initating, IF we're on an MST
505  //link to them
506  le::Errno retcode;
507  Edge join_peer_edge;
508  retcode = get_edge(join_peer, join_peer_edge);
509  if (retcode != OK){
510  return retcode;
511  }
512 
513  //check join_peer status (better be MST)
514  if (join_lead != my_leader && join_peer_edge.status != MST){
515  return JOIN_INIT_BAD_LEADER;
516  }
517  if (join_level != my_level){
518  return JOIN_INIT_BAD_LEVEL;
519  }
520  //found the correct edge
521  retcode = get_edge(join_peer, edge_to_other_part);
522  if (OK != retcode)
523  {
524  return retcode;
525  }
526  } else {
527  //we aren't in initiating partition, and yet it includes our partition, this is a problem!
528  if (join_lead == my_leader){
529  return JOIN_MY_LEADER;
530  }
531  //level can be same, lower (from another partition), but not higher (we shouldn't have replied)
532  if (join_level > my_level){
533  return JOIN_UNEXPECTED_REPLY;
534  }
535  //found the correct edge
536  auto ger = get_edge(join_root, edge_to_other_part);
537  if (OK != ger)
538  {
539  return ger;
540  }
541  }
542 
543  //after all that, we found the edge
544 
545  if ( edge_to_other_part.status == MST){
546  //we already absorbed once, so now we merge()
547  auto leader_id = max(join_peer, join_root);
548  my_leader = leader_id;
549  my_level++;
550  if (leader_id == my_id){
551  //In this case, we already sent JOIN_US (since it's an MST link), and if
552  //they have not procssed that, they will soon enough. At that time, they
553  //will see the link to us as MST (after all they sent JOIN_US -- this
554  //msg), AND they will recognize our leader-hood. We advance in faith that
555  //they will march along.
556  return start_round(buf, qsz);
557  } else {
558  //In this case, we already sent JOIN_US (b/c it's an MST link), meaning
559  //this came from THEM. If they rec'd ours first, then they know if they
560  //are leader or not. if not, AND we're not leader, we're at deadlock
561  //until they process our JOIN_US and see the MST link from their JOIN_US
562  //request and recognize their own leader-hood. We wait.
563  qsz=0;
564  return OK;
565  }
566  } else if (edge_to_other_part.status == UNKNOWN) {
567  if (in_initiating_partition ){
568  //In this case, we're sending a JOIN_US without hearing from them yet.
569  //We may not be their MWOE, which would make this an "absorb" case.
570  //just send it, see what they say (see next one). btw, because we were
571  //able to find a MWOE, we know that their level >= ours. Otherwise,
572  //they would not have responded to our search (see process_in_part). So
573  //this absorb request is valid and setting their link as MST is OK.
574  set_edge_status(join_peer, MST);
575  auto payload = JoinUsPayload{ data.join_peer, data.join_root,
576  data.proposed_leader, data.proposed_level };
577  Msg to_send = Msg( join_peer, my_id, payload );
578  buf.push(to_send);
579  qsz=1;
580  return OK;
581  } else {
582 
583  //In this case, we received a JOIN_US from another partition, one that
584  //we have not yet recognized or marked as our own MWOE. This means they
585  //are a prime candidate to absorb into our partition.
586  //NOTE, if we were waiting for them, they would not respond until their
587  //level is == ours, so this should never fail:
588  if (my_level < join_level){
589  return JOIN_UNEXPECTED_REPLY;
590  }
591 
592  //Since we know they are prime absorbtion material, we just do it and
593  //mark them as children. There's one subtlety here: We may have to
594  //revise our search status once they absorb!! That's TODO: test to see
595  //if we adequately handle premature absorb requests. After all, our
596  //MWOE might now be in their subtree.
597  auto sesr = set_edge_status(join_root, MST);
598  if (OK != sesr){
599  return sesr;
600  }
601 
602  //Anyway, because we aren't in the initiating partition, the other guy
603  //already has us marked MST, so we don't need to do anything. -- a
604  //leader somewhere else will send the next round start (or perhaps it
605  //will be one of us when we do a merge()
606  qsz=0;
607  return OK;
608  }
609  } else {
610  ghs_fatal(ERR_IMPL);
611  return ERR_IMPL;
612  }
613 
614  ghs_fatal(ERR_IMPL);
615  return ERR_IMPL;
616 }
617 
618 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
619 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::process_noop(StaticQueue<Msg,BUF_SZ> &buf, size_t &qsz){
620  algorithm_converged=true;
621  return mst_broadcast(msg::Type::NOOP, {},buf, qsz);
622 }
623 
624 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
625 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::typecast(const status_t status, const msg::Type m, const msg::Data &data, StaticQueue<Msg,BUF_SZ> &buf, size_t &qsz)const {
626  size_t sent=0;
627  for (size_t idx=0;idx<n_peers;idx++){
628  const Edge &e = outgoing_edges[idx];
629  if (e.root!=my_id){
630  return CAST_INVALID_EDGE;
631  }
632  if ( e.status == status ){
633  sent++;
634  Msg to_send (e.peer, my_id, m, data);
635  buf.push( to_send );
636  }
637  }
638  qsz=sent;
639  return OK;
640 }
641 
642 
643 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
645  size_t sent =0;
646  for (size_t idx =0;idx<n_peers;idx++){
647  const Edge&e = outgoing_edges[idx];
648  if (e.root!=my_id){
649  return CAST_INVALID_EDGE;
650  }
651  if (e.status == MST ){
652  sent++;
653  Msg to_send( e.peer, my_id, m, data);
654  buf.push( to_send );
655  }
656  }
657  qsz=sent;
658  return OK;
659 }
660 
661 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
663  size_t sent=0;
664  for (size_t idx =0;idx<n_peers;idx++){
665  const Edge&e = outgoing_edges[idx];
666  if (e.root!=my_id){
667  return CAST_INVALID_EDGE;
668  }
669  if (e.status == MST_PARENT ){
670  sent++;
671  Msg to_send ( e.peer, my_id, m, data);
672  buf.push( to_send );
673  }
674  }
675  qsz=sent;
676  return OK;
677 }
678 
679 
680 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
682 
683  //clear old id
684  for (size_t idx=0;idx<n_peers;idx++){
685  if (outgoing_edges[idx].status==MST_PARENT){
686  outgoing_edges[idx].status=MST;
687  }
688  }
689 
690  //self loop ok
691  if (id==get_id()){
692  return OK;
693  }
694 
695  if (!has_edge(id)){
696  return PARENT_UNRECOGNIZED;
697  }
698 
699  Edge peer;
700  auto ger = get_edge(id,peer);
701  if (OK != ger){ return ger; }
702 
703  if (peer.status == MST ){
704  auto err = set_edge_status(id,MST_PARENT);
705  return err;
706  }
707  return PARENT_REQ_MST;
708 }
709 
710 
711 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
713  for (size_t idx=0;idx<n_peers;idx++){
714  Edge e = outgoing_edges[idx];
715  if (e.status == MST_PARENT){
716  return e.peer;
717  }
718  }
719  return my_id;
720 }
721 
722 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
724  return my_leader;
725 }
726 
727 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
728 
730  my_leader = leader;
731  return OK;
732 }
733 
734 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
736  return my_level;
737 }
738 
739 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
740 
742  my_level = l;
743  return OK;
744 }
745 
746 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
748  return algorithm_converged;
749 }
750 
751 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
752 
754  if (who == my_id){
755  return IMPL_REQ_PEER_MY_ID;
756  }
757  for (size_t i=0;i<n_peers;i++){
758  if (peers[i] == who){
759  idx = i;
760  return OK;
761  }
762  }
763  return NO_SUCH_PEER;
764 }
765 
766 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
767 
769 
770  size_t idx;
771  le::Errno retcode=checked_index_of(who,idx);
772  if (retcode!=OK){return retcode;}
773 
774  waiting_for_response[idx]=waiting;
775  return OK;
776 }
777 
778 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
779 
781  size_t idx;
782  le::Errno retcode=checked_index_of(who,idx);
783  if (retcode!=OK){return retcode;}
784 
785  waiting_for = waiting_for_response[idx];
786  return OK;
787 }
788 
789 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
790 
792 {
793  size_t idx;
794  le::Errno retcode=checked_index_of(who,idx);
795  if (retcode!=OK){return retcode;}
796 
797  response_required[idx]=resp;
798  return OK;
799 }
800 
801 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
802 
804  size_t idx;
805  le::Errno retcode=checked_index_of(who,idx);
806  if (retcode!=OK){return retcode;}
807 
808  res_req = response_required[idx];
809  return OK;
810 }
811 
812 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
813 
815  size_t idx;
816  le::Errno retcode=checked_index_of(who,idx);
817  if (retcode!=OK){return retcode;}
818 
819  response_prompt[idx]=m;
820  return OK;
821 }
822 
823 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
824 
826  size_t idx;
827  le::Errno retcode=checked_index_of(who,idx);
828  if (retcode!=OK){return retcode;}
829 
830  out = response_prompt[idx];
831  return OK;
832 }
833 
834 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
835 
837 {
838  size_t idx;
839  le::Errno retcode=checked_index_of(from,idx);
840  if (retcode!=OK){return retcode;}
841 
842  response_required[idx]=true;
843  response_prompt[idx] =m;
844  return OK;
845 }
846 
847 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
849  size_t idx;
850  return OK==checked_index_of(to,idx);
851 }
852 
853 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
854 
856 {
857  size_t idx;
858  le::Errno retcode=checked_index_of(to,idx);
859  if (retcode!=OK){return retcode;}
860 
861  out = outgoing_edges[idx];
862  return OK;
863 }
864 
865 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
866 
868 {
869  size_t idx;
870  le::Errno retcode=checked_index_of(to,idx);
871  if (retcode!=OK){return retcode;}
872 
873  out = outgoing_edges[idx].status;
874  return OK;
875 }
876 
877 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
878 
879 
881 {
882  size_t idx;
883  le::Errno retcode=checked_index_of(to,idx);
884  if (retcode!=OK){return retcode;}
885 
886  outgoing_edges[idx].status=status;
887  return OK;
888 }
889 
890 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
891 
893 {
894  size_t idx;
895  le::Errno retcode=checked_index_of(to,idx);
896  if (retcode!=OK){return retcode;}
897 
898  out = outgoing_edges[idx].metric_val;
899  return OK;
900 }
901 
902 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
903 
905 {
906  size_t idx;
907  le::Errno retcode=checked_index_of(to,idx);
908  if (retcode!=OK){return retcode;}
909 
910  outgoing_edges[idx].metric_val=m;
911  return OK;
912 }
913 
914 
915 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
916 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::set_edge(const Edge &e) {
917 
918  if (e.root == NO_AGENT || e.peer == NO_AGENT){
920  }
921 
922  if (e.metric_val == WORST_METRIC){
924  }
925 
926  if (e.root != my_id){
928  }
929 
930  //no self loops
931  if (e.peer == my_id){
933  }
934 
935  agent_t who = e.peer;
936  size_t idx;
937  le::Errno er = checked_index_of(who,idx);
938  if (OK == er)
939  {
940  //found em
941  outgoing_edges[idx].metric_val = e.metric_val;
942  outgoing_edges[idx].status = e.status;
943  return OK;
944  }
945  else if (NO_SUCH_PEER == er)
946  {
947  //don't have em (yet)
948  if (n_peers>=MAX_AGENTS){
949  return TOO_MANY_AGENTS;
950  }
951  peers[n_peers]=e.peer;
952  outgoing_edges[n_peers] = e;
953  n_peers++;
954  return OK;
955  }
956  else
957  {
958  //something else happened
959  return er;
960  }
961  ghs_fatal(ERR_IMPL);
962  return ERR_IMPL;
963 }
964 
965 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
967 {
968  size_t waiting =0;
969  //count them up.
970  for (size_t i=0;i<n_peers;i++){
971  if (waiting_for_response[i]){
972  waiting++;
973  }
974  }
975  return waiting;
976 }
977 
978 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
980 {
981  size_t delayed =0;
982  for (size_t i=0;i<n_peers;i++){
983  if (response_required[i]){
984  delayed++;
985  }
986  }
987  return delayed;
988 }
989 
990 template <std::size_t MAX_AGENTS, std::size_t BUF_SZ>
992  return my_id;
993 }
994 
le::SET_INVALID_EDGE_METRIC
@ SET_INVALID_EDGE_METRIC
add- or set edge failed because metric_val==WORST_METRIC
Definition: errno.h:71
le::PROCESS_NO_EDGE_FOUND
@ PROCESS_NO_EDGE_FOUND
Unable to process the message when we don't have an edge to that agent.
Definition: errno.h:58
le::ghs::agent_t
int agent_t
problems for GhsState
Definition: agent.h:51
le::ghs::GhsState::get_edge_metric
le::Errno get_edge_metric(const agent_t &to, metric_t &m) const
Definition: ghs_impl.hpp:892
le::ghs::GhsState::get_response_prompt
le::Errno get_response_prompt(const agent_t &who, msg::InPartPayload &m)
Definition: ghs_impl.hpp:825
le::ghs::GhsState::get_edge
le::Errno get_edge(const agent_t &to, Edge &out) const
Definition: ghs_impl.hpp:855
le::ghs::MST
@ MST
We have added this edge as an MST link.
Definition: edge.h:86
le::ghs::GhsState::waiting_count
size_t waiting_count() const
Definition: ghs_impl.hpp:966
le::ghs::MST_PARENT
@ MST_PARENT
We have added this edge as parent MST link.
Definition: edge.h:88
le::ghs::GhsState::typecast
le::Errno typecast(const status_t status, const msg::Type, const msg::Data &, StaticQueue< Msg, MSG_Q_SIZE > &buf, size_t &) const
Definition: ghs_impl.hpp:625
le::SET_INVALID_EDGE_NO_AGENT
@ SET_INVALID_EDGE_NO_AGENT
add- or set edge failed because of NO_AGENT as peer/root
Definition: errno.h:73
le::ghs::GhsState::mst_broadcast
le::Errno mst_broadcast(const msg::Type, const msg::Data &, StaticQueue< Msg, MSG_Q_SIZE > &buf, size_t &) const
Definition: ghs_impl.hpp:644
le::ghs::GhsState::mst_convergecast
le::Errno mst_convergecast(const msg::Type, const msg::Data &, StaticQueue< Msg, MSG_Q_SIZE > &buf, size_t &) const
Definition: ghs_impl.hpp:662
le::ghs::metric_t
unsigned long metric_t
Definition: edge.h:63
le::ghs::status_t
status_t
A status enumeration, for the ghs edges.
Definition: edge.h:82
le::NO_SUCH_PEER
@ NO_SUCH_PEER
Cannot find peer idx – no edge or unrecognized ID?
Definition: errno.h:77
le::ghs::GhsState::checked_index_of
le::Errno checked_index_of(const agent_t &, size_t &) const
Definition: ghs_impl.hpp:753
le::ghs::GhsState::start_round
le::Errno start_round(StaticQueue< Msg, MSG_Q_SIZE > &outgoing_msgs, size_t &)
Definition: ghs_impl.hpp:102
le::Errno
Errno
Definition: errno.h:49
le::IMPL_REQ_PEER_MY_ID
@ IMPL_REQ_PEER_MY_ID
Cannot treat my_id as peer – bad message?
Definition: errno.h:78
le::ghs::DELETED
@ DELETED
We have decided not to further consider this edge, either it was "bad", or it is already part of our ...
Definition: edge.h:90
le::ghs::msg::Type
Type
Stores what type of Msg this is.
Definition: msg.h:58
le::ERR_QUEUE_MSGS
@ ERR_QUEUE_MSGS
Unable to enqueue messages, received seque::Retcode not OK.
Definition: errno.h:57
msg.h
provides the defs for the struct le::ghs::msg
le::ghs::Msg
An aggregate type containing all the data to exchange with to/from information.
Definition: msg.h:143
le::ghs::GhsState::get_level
level_t get_level() const
Definition: ghs_impl.hpp:735
le::PROCESS_NOTME
@ PROCESS_NOTME
Could not process a message not directed towards this agent.
Definition: errno.h:52
le::ghs::msg::SrchRetPayload
Returns an edge that represents the minimum weight outgoing edge.
Definition: msg.h:85
le::ghs::GhsState::process
le::Errno process(const Msg &msg, StaticQueue< Msg, MSG_Q_SIZE > &outgoing_buffer, size_t &sz)
Definition: ghs_impl.hpp:118
le::SET_INVALID_EDGE_NOT_ROOT
@ SET_INVALID_EDGE_NOT_ROOT
add- or set edge failed because edge was not rooted on my_id
Definition: errno.h:72
le::CAST_INVALID_EDGE
@ CAST_INVALID_EDGE
*cast operation failed because of bad edge
Definition: errno.h:69
le::ghs::is_valid
bool is_valid(const agent_t a)
Definition: agent.cpp:43
le::ghs::worst_edge
Edge worst_edge()
Definition: edge.cpp:45
le::ghs::GhsState::mwoe
Edge mwoe() const
Definition: ghs_impl.hpp:113
le::PARENT_UNRECOGNIZED
@ PARENT_UNRECOGNIZED
Cannot set parent ID to unrecognized node (no edge to them!)
Definition: errno.h:75
ghs.h
The main GhsState object
le::SET_INVALID_EDGE_SELF_LOOP
@ SET_INVALID_EDGE_SELF_LOOP
add- or set edge failed because Edge.peer == my_id
Definition: errno.h:74
seque::StaticQueue
a static-sized single-ended queue for use in GhsState
Definition: static_queue.h:60
le::ghs::msg::SrchPayload
Requests a search begin in the MST subtree rooted at the receiver, for the minimum weight outgoing ed...
Definition: msg.h:75
le::ghs::UNKNOWN
@ UNKNOWN
We have not probed this edge for information yet, or have not recieved a reponse.
Definition: edge.h:84
le::PARENT_REQ_MST
@ PARENT_REQ_MST
Cannot set parent ID to non-MST node (bad edge type)
Definition: errno.h:76
le::ERR_IMPL
@ ERR_IMPL
Implementation error: Reached branch that should not have been reachable.
Definition: errno.h:68
le::ghs::LEVEL_START
const level_t LEVEL_START
All levels start at 0.
Definition: level.h:54
le::ghs::msg::JoinUsPayload
Msgs to merge /absorb two partitions across a given edge.
Definition: msg.h:109
le::JOIN_INIT_BAD_LEVEL
@ JOIN_INIT_BAD_LEVEL
Told to init join to another partition, but level unrecognized.
Definition: errno.h:65
le::ghs::GhsState::get_edge_status
le::Errno get_edge_status(const agent_t &to, status_t &out) const
Definition: ghs_impl.hpp:867
le::ghs::msg::AckPartPayload
States "I am in your partition".
Definition: msg.h:98
le::ghs::GhsState::is_response_required
le::Errno is_response_required(const agent_t &who, bool &response_required)
Definition: ghs_impl.hpp:803
le::ghs::GhsState::get_id
agent_t get_id() const
Definition: ghs_impl.hpp:991
le::ghs::msg::InPartPayload
Asks "Are you in my partition".
Definition: msg.h:92
le::UNEXPECTED_SRCH_RET
@ UNEXPECTED_SRCH_RET
Unexpected srch_ret message at this time (not searching or not waiting for that agent)
Definition: errno.h:59
le::JOIN_MY_LEADER
@ JOIN_MY_LEADER
Other partition suggested we join our own partition.
Definition: errno.h:66
le::PROCESS_SELFMSG
@ PROCESS_SELFMSG
Could not process a message from self.
Definition: errno.h:51
le::ghs::GhsState::has_edge
bool has_edge(const agent_t to) const
Definition: ghs_impl.hpp:848
le::BAD_MSG
@ BAD_MSG
Likely malformed message.
Definition: errno.h:61
le::ghs::GhsState::delayed_count
size_t delayed_count() const
Definition: ghs_impl.hpp:979
le::ghs::msg::NackPartPayload
States "I am not in your partition".
Definition: msg.h:102
le::ghs::msg::Data
Definition: msg.h:116
le::JOIN_BAD_LEVEL
@ JOIN_BAD_LEVEL
Received join message with a non-matching level, yet we received join msg with different level.
Definition: errno.h:63
le::ghs::GhsState::get_parent_id
agent_t get_parent_id() const
Definition: ghs_impl.hpp:712
le::ghs::NO_AGENT
const agent_t NO_AGENT
Definition: agent.h:56
seque::StaticQueue::pop
le::Errno pop()
Definition: static_queue_impl.hpp:96
le::SRCH_STILL_WAITING
@ SRCH_STILL_WAITING
THere is no way to process a SRCH message when we're still executing the last search ( waiting_count(...
Definition: errno.h:56
le::ghs::WORST_METRIC
const metric_t WORST_METRIC
Definition: edge.h:74
le::ghs::Edge
A struct to hold all the communication edge information.
Definition: edge.h:98
le::ghs::GhsState::GhsState
GhsState(agent_t my_id, Edge *edges, size_t num_edges)
Definition: ghs_impl.hpp:56
le::JOIN_UNEXPECTED_REPLY
@ JOIN_UNEXPECTED_REPLY
received higher-level join message: Impossible since we should not have replied to their SRCH yet
Definition: errno.h:67
seque::StaticQueue::push
le::Errno push(const T item)
Definition: static_queue_impl.hpp:69
le::TOO_MANY_AGENTS
@ TOO_MANY_AGENTS
Set- or add edge failed, too many agents in static storage already.
Definition: errno.h:79
le::ghs::GhsState::is_waiting_for
le::Errno is_waiting_for(const agent_t &who, bool &out_waiting_for)
Definition: ghs_impl.hpp:780
le::ghs::level_t
int level_t
A "level" which is an internal item for GhsState to track how many times the MST has merged with anot...
Definition: level.h:49
le::ghs::GhsState::is_converged
bool is_converged() const
Definition: ghs_impl.hpp:747
le::OK
@ OK
The operation was successful.
Definition: errno.h:50
le::JOIN_INIT_BAD_LEADER
@ JOIN_INIT_BAD_LEADER
Told to init join to another parition, but leader unrecognized.
Definition: errno.h:64
le::ghs::GhsState::get_leader_id
agent_t get_leader_id() const
Definition: ghs_impl.hpp:723
seque::StaticQueue::size
unsigned int size() const
Definition: static_queue_impl.hpp:64
le::JOIN_BAD_LEADER
@ JOIN_BAD_LEADER
Received join message with a leader not our own, yet we are not on a partition boundary.
Definition: errno.h:62
le::ghs::GhsState
The main state machine for the GHS algorithm
Definition: ghs.h:82
le::ACK_NOT_WAITING
@ ACK_NOT_WAITING
We cannot process an ACK message if we aren't expecting one.
Definition: errno.h:60
le::PROCESS_INVALID_TYPE
@ PROCESS_INVALID_TYPE
Did not recognize or could not process this type of message.
Definition: errno.h:53