46 #define ghs_fatal(s) throw std::runtime_error(std::string(__FILE__)+":"+std::to_string(__LINE__)+" "+strerror(s))
48 #define ghs_fatal(s) printf("[fatal] %s",strerror(s))
53 using namespace le::ghs::msg;
55 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
59 for (
size_t idx=0;idx<num_edges;idx++){
64 auto err = set_edge(e);
65 if (
OK==err){
continue; }
70 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
76 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
77 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::reset() {
83 for (
size_t i=0;i<MAX_AGENTS;i++){
85 outgoing_edges[i].status=
UNKNOWN;
86 waiting_for_response[i]=
false;
87 response_required[i]=
false;
88 response_prompt[i]={};
91 this->algorithm_converged =
false;
101 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
104 if (get_leader_id() == get_id()){
106 return process_srch(get_id(), {get_leader_id(), get_level()}, outgoing_buffer, qsz);
112 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
117 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
120 if (msg.from()==my_id){
125 if (msg.to() != my_id){
129 if (! has_edge(msg.from()) ){
134 case (msg::Type::SRCH):{
return process_srch( msg.from(), msg.data().srch, outgoing_buffer, qsz); }
135 case (msg::Type::SRCH_RET):{
return process_srch_ret( msg.from(), msg.data().srch_ret, outgoing_buffer, qsz); }
136 case (msg::Type::IN_PART):{
return process_in_part( msg.from(), msg.data().in_part, outgoing_buffer, qsz); }
137 case (msg::Type::ACK_PART):{
return process_ack_part( msg.from(), msg.data().ack_part, outgoing_buffer, qsz); }
138 case (msg::Type::NACK_PART):{
return process_nack_part( msg.from(), msg.data().nack_part, outgoing_buffer, qsz); }
139 case (msg::Type::JOIN_US):{
return process_join_us( msg.from(), msg.data().join_us, outgoing_buffer, qsz); }
140 case (msg::Type::NOOP):{
return process_noop( outgoing_buffer , qsz); }
146 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
154 auto ret = get_edge(from, to_them);
170 if (waiting_count() != 0 )
176 agent_t leader = data.your_leader;
177 level_t level = data.your_level;
181 auto err = set_parent_id(from);
182 if (
OK!=err){
return err;}
186 best_edge.root = my_id;
195 le::Errno srch_ret = mst_broadcast(msg::Type::SRCH, to_send, srchbuf,srch_sent);
211 size_t srchbuf_sz = srchbuf.
size();
212 if (srchbuf_sz != srch_sent + part_sent){
221 if (srchbuf_sz == 0 && delayed_count() ==0){
222 return respond_no_mwoe(buf,qsz);
230 size_t buf_sz_before = buf.
size();
231 for (
size_t i=0;i<srchbuf_sz;i++){
234 set_waiting_for(m.to(),
true);
238 if ( (buf.
size() - buf_sz_before != srch_sent + part_sent ) )
245 size_t old_msgs_processed=0;
246 le::Errno lvl_err = check_new_level(buf,old_msgs_processed);
252 qsz = srch_sent + part_sent + old_msgs_processed;
256 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
263 return mst_convergecast(msg::Type::SRCH_RET, pld, buf,qsz);
266 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
271 if (this->waiting_count() == 0){
276 auto wfr = is_waiting_for(from,wf);
284 auto swfr = set_waiting_for(from,
false);
292 theirs.peer = data.to;
293 theirs.root = data.from;
294 theirs.metric_val = data.metric;
296 if (theirs.metric_val < best_edge.metric_val){
297 best_edge.root = theirs.root;
298 best_edge.peer = theirs.peer;
299 best_edge.metric_val = theirs.metric_val;
302 return check_search_status(buf,qsz);
305 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
313 level_t their_level = data.level;
316 if (their_level <= our_level){
318 if (part_id == this->my_leader){
335 respond_later(from, data);
341 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
346 auto wfr = is_waiting_for(from,wf);
356 auto sesr = set_edge_status(from,
DELETED);
361 auto swfr = set_waiting_for(from,
false);
366 return check_search_status(buf,qsz);
369 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
374 auto wfr = is_waiting_for(from,wf);
385 auto ger = get_edge(from, their_edge);
391 if (best_edge.metric_val > their_edge.metric_val){
392 best_edge = their_edge;
395 auto swfr = set_waiting_for(from,
false);
400 return check_search_status(buf,qsz);
403 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
406 if (waiting_count() == 0)
409 bool am_leader = (my_leader == my_id);
411 bool its_my_edge = (mwoe().root == my_id);
416 send_data.srch_ret =
SrchRetPayload{e.peer, e.root, e.metric_val};
417 return mst_convergecast( msg::Type::SRCH_RET, send_data, buf, qsz);
420 if (am_leader && found_new_edge && its_my_edge){
421 if (e.peer == e.root){
425 return process_join_us(my_id, {e.peer, e.root, get_leader_id(), get_level()}, buf, qsz);
428 if (am_leader && !found_new_edge ){
430 return process_noop( buf, qsz);
433 if (am_leader && found_new_edge && !its_my_edge){
437 data.join_us =
JoinUsPayload{e.peer, e.root, get_leader_id(), get_level()};
438 return mst_broadcast( JOIN_US, data, buf, qsz);
448 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
452 for (
size_t idx=0;idx<n_peers;idx++){
453 if (response_required[idx]){
457 if (their_level <= get_level() )
461 le::Errno ret=process_in_part(who, m, buf, sentsz);
467 response_required[idx]=
false;
475 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
479 auto join_peer = data.join_peer;
480 auto join_root = data.join_root;
481 auto join_lead = data.proposed_leader;
482 auto join_level = data.proposed_level;
484 bool not_involved = (join_root != my_id && join_peer != my_id);
485 bool in_initiating_partition = (join_root == my_id);
488 if (join_lead != my_leader){
491 if (join_level != my_level){
496 to_send.join_us = data;
497 return mst_broadcast(msg::Type::JOIN_US, to_send, buf, qsz);
501 Edge edge_to_other_part;
503 if (in_initiating_partition){
508 retcode = get_edge(join_peer, join_peer_edge);
514 if (join_lead != my_leader && join_peer_edge.status !=
MST){
517 if (join_level != my_level){
521 retcode = get_edge(join_peer, edge_to_other_part);
528 if (join_lead == my_leader){
532 if (join_level > my_level){
536 auto ger = get_edge(join_root, edge_to_other_part);
545 if ( edge_to_other_part.status ==
MST){
547 auto leader_id = max(join_peer, join_root);
548 my_leader = leader_id;
550 if (leader_id == my_id){
556 return start_round(buf, qsz);
566 }
else if (edge_to_other_part.status ==
UNKNOWN) {
567 if (in_initiating_partition ){
574 set_edge_status(join_peer,
MST);
575 auto payload =
JoinUsPayload{ data.join_peer, data.join_root,
576 data.proposed_leader, data.proposed_level };
577 Msg to_send = Msg( join_peer, my_id, payload );
588 if (my_level < join_level){
597 auto sesr = set_edge_status(join_root,
MST);
618 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
620 algorithm_converged=
true;
621 return mst_broadcast(msg::Type::NOOP, {},buf, qsz);
624 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
627 for (
size_t idx=0;idx<n_peers;idx++){
628 const Edge &e = outgoing_edges[idx];
632 if ( e.status == status ){
634 Msg to_send (e.peer, my_id, m, data);
643 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
646 for (
size_t idx =0;idx<n_peers;idx++){
647 const Edge&e = outgoing_edges[idx];
651 if (e.status ==
MST ){
653 Msg to_send( e.peer, my_id, m, data);
661 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
664 for (
size_t idx =0;idx<n_peers;idx++){
665 const Edge&e = outgoing_edges[idx];
671 Msg to_send ( e.peer, my_id, m, data);
680 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
684 for (
size_t idx=0;idx<n_peers;idx++){
686 outgoing_edges[idx].status=
MST;
700 auto ger = get_edge(
id,peer);
701 if (
OK != ger){
return ger; }
703 if (peer.status ==
MST ){
711 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
713 for (
size_t idx=0;idx<n_peers;idx++){
714 Edge e = outgoing_edges[idx];
722 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
727 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
734 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
739 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
746 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
748 return algorithm_converged;
751 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
757 for (
size_t i=0;i<n_peers;i++){
758 if (peers[i] == who){
766 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
771 le::Errno retcode=checked_index_of(who,idx);
772 if (retcode!=
OK){
return retcode;}
774 waiting_for_response[idx]=waiting;
778 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
782 le::Errno retcode=checked_index_of(who,idx);
783 if (retcode!=
OK){
return retcode;}
785 waiting_for = waiting_for_response[idx];
789 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
794 le::Errno retcode=checked_index_of(who,idx);
795 if (retcode!=
OK){
return retcode;}
797 response_required[idx]=resp;
801 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
805 le::Errno retcode=checked_index_of(who,idx);
806 if (retcode!=
OK){
return retcode;}
808 res_req = response_required[idx];
812 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
816 le::Errno retcode=checked_index_of(who,idx);
817 if (retcode!=
OK){
return retcode;}
819 response_prompt[idx]=m;
823 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
827 le::Errno retcode=checked_index_of(who,idx);
828 if (retcode!=
OK){
return retcode;}
830 out = response_prompt[idx];
834 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
839 le::Errno retcode=checked_index_of(from,idx);
840 if (retcode!=
OK){
return retcode;}
842 response_required[idx]=
true;
843 response_prompt[idx] =m;
847 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
850 return OK==checked_index_of(to,idx);
853 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
858 le::Errno retcode=checked_index_of(to,idx);
859 if (retcode!=
OK){
return retcode;}
861 out = outgoing_edges[idx];
865 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
870 le::Errno retcode=checked_index_of(to,idx);
871 if (retcode!=
OK){
return retcode;}
873 out = outgoing_edges[idx].status;
877 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
883 le::Errno retcode=checked_index_of(to,idx);
884 if (retcode!=
OK){
return retcode;}
886 outgoing_edges[idx].status=status;
890 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
895 le::Errno retcode=checked_index_of(to,idx);
896 if (retcode!=
OK){
return retcode;}
898 out = outgoing_edges[idx].metric_val;
902 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
907 le::Errno retcode=checked_index_of(to,idx);
908 if (retcode!=
OK){
return retcode;}
910 outgoing_edges[idx].metric_val=m;
915 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
916 le::Errno GhsState<MAX_AGENTS, BUF_SZ>::set_edge(
const Edge &e) {
926 if (e.root != my_id){
931 if (e.peer == my_id){
937 le::Errno er = checked_index_of(who,idx);
941 outgoing_edges[idx].metric_val = e.metric_val;
942 outgoing_edges[idx].status = e.status;
948 if (n_peers>=MAX_AGENTS){
951 peers[n_peers]=e.peer;
952 outgoing_edges[n_peers] = e;
965 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
970 for (
size_t i=0;i<n_peers;i++){
971 if (waiting_for_response[i]){
978 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>
982 for (
size_t i=0;i<n_peers;i++){
983 if (response_required[i]){
990 template <std::
size_t MAX_AGENTS, std::
size_t BUF_SZ>