70 #ifdef PYTHON_BINDINGS
71 #include <pybind11/pybind11.h>
72 #include <pybind11/stl.h>
73 #include <pybind11/numpy.h>
74 namespace py = pybind11;
79 template<
typename CONFIG = default_config>
83 static const int D = CONFIG::D;
85 static const bool QUIET = CONFIG::QUIET;
87 using REAL =
typename CONFIG::REAL;
95 template<
typename T>
struct add_vp_ {
using type = std::vector<std::pair<point_type,T> >; };
96 template<
typename T>
struct def_storage_;
97 template<
typename... TYPES>
struct def_storage_<
type_list<TYPES...> >{
101 template<
typename T>
struct add_vi_ {
using type = std::vector<std::pair<size_t, T> >; };
102 template<
typename T>
struct def_storage_raw_;
103 template<
typename... TYPES>
struct def_storage_raw_<type_list<TYPES...> >{
104 using type = storage<typename add_vi_<TYPES>::type...>;
107 template<
typename T>
struct def_storage_single_;
108 template<
typename... TYPES>
struct def_storage_single_<type_list<TYPES...> >{
109 using type = storage<TYPES...>;
113 using storage_t =
typename def_storage_<data_types>::type;
114 using spatial_t = spatial_storage<bin_t<CONFIG>,storage_t,CONFIG>;
115 using frame_type = std::unordered_map<std::string, storage_t>;
116 using bin_frame_type = std::unordered_map<std::string, spatial_t>;
118 using storage_raw_t =
typename def_storage_raw_<data_types>::type;
119 using frame_raw_type = std::unordered_map<std::string, storage_raw_t>;
121 using storage_single_t =
typename def_storage_single_<data_types>::type;
124 peer_state() : disable_send(false), disable_recv(false), ss_stat_send(false), ss_stat_recv(false) {}
126 using spans_type = std::map<std::pair<time_type,time_type>,
span_t>;
129 return scan_spans_(t,s,recving_spans);
133 recving_spans.emplace(std::make_pair(start,end),std::move(s));
137 return scan_spans_(t,s,sending_spans);
141 sending_spans.emplace(std::make_pair(start,end), std::move(s));
144 void set_pts(std::vector<point_type>& pts) {
148 const std::vector<point_type>& pts()
const {
152 void set_send_disable() {
156 void set_recv_disable() {
160 bool is_send_disabled()
const {
164 bool is_recv_disabled()
const {
168 void set_ss_send_status(
bool status) {
169 ss_stat_send = status;
172 bool ss_send_status()
const {
176 void set_ss_recv_status(
bool status) {
177 ss_stat_recv = status;
180 bool ss_recv_status()
const {
184 time_type current_t()
const {
return latest_timestamp; }
186 time_type next_t()
const {
return next_timestamp; }
188 void set_current_t(
time_type t ) { latest_timestamp = t; }
189 void set_current_sub(
iterator_type i ) { latest_subiter = i; }
190 void set_next_t(
time_type t ) { next_timestamp = t; }
193 bool scan_spans_(
time_type t,
const span_t& s,
const spans_type& spans )
const {
194 bool prefetched =
false;
195 auto end = spans.lower_bound(std::make_pair(t,t));
196 if( spans.size() == 1 ) end = spans.end();
198 for(
auto itr = spans.begin(); itr != end; ++itr ) {
199 if( t < itr->first.second ||
almost_equal(t, itr->first.second) ) {
201 if(
collide(s,itr->second) )
return true;
210 time_type latest_timestamp = std::numeric_limits<time_type>::lowest();
211 iterator_type latest_subiter = std::numeric_limits<iterator_type>::lowest();
212 time_type next_timestamp = std::numeric_limits<time_type>::lowest();
213 iterator_type next_subiter = std::numeric_limits<iterator_type>::lowest();
214 spans_type recving_spans;
215 spans_type sending_spans;
216 std::vector<point_type> pts_;
217 std::unordered_map<std::string, storage_single_t> assigned_vals_;
225 std::unique_ptr<communicator> comm;
228 std::map<std::pair<time_type, iterator_type>, bin_frame_type> log;
230 frame_type push_buffer;
231 frame_raw_type push_buffer_raw;
232 std::vector<point_type> push_buffer_pts;
234 std::unordered_map<std::string, storage_single_t > assigned_values;
236 std::vector<peer_state> peers;
237 std::vector<bool> peer_is_sending;
238 bool smart_send_set_ =
true;
239 time_type span_start = std::numeric_limits<time_type>::lowest();
240 time_type span_timeout = std::numeric_limits<time_type>::lowest();
242 time_type recv_start = std::numeric_limits<time_type>::lowest();
243 time_type recv_timeout = std::numeric_limits<time_type>::lowest();
247 bool initialized_pts_;
248 size_t fixedPointCount_;
249 time_type fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
250 iterator_type fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
256 using namespace std::placeholders;
258 peers.resize(comm->remote_size());
259 peer_is_sending.resize(comm->remote_size(),
true);
261 readers.link(
"timestamp",
reader_variables<int32_t, std::pair<time_type,iterator_type> >(
262 std::bind(&uniface::on_recv_confirm,
this, _1, _2)));
263 readers.link(
"forecast",
reader_variables<int32_t, std::pair<time_type,iterator_type>>(
264 std::bind(&uniface::on_recv_forecast,
this, _1, _2)));
265 readers.link(
"data",
reader_variables<std::pair<time_type,iterator_type>, frame_type>(
266 std::bind(&uniface::on_recv_data,
this, _1, _2)));
267 readers.link(
"rawdata",
reader_variables<int32_t, std::pair<time_type,iterator_type>, frame_raw_type>(
268 std::bind(&uniface::on_recv_rawdata,
this, _1, _2, _3)));
270 std::bind(&uniface::on_recv_points,
this, _1, _2)));
272 std::bind(&uniface::on_recv_assignedVals,
this, _1, _2)));
274 std::bind(&uniface::on_recv_span,
this, _1, _2, _3, _4)));
276 std::bind(&uniface::on_send_span,
this, _1, _2, _3, _4)));
278 std::bind(&uniface::on_send_disable,
this, _1)));
280 std::bind(&uniface::on_recv_disable,
this, _1)));
290 template<
typename TYPE>
291 void push(
const std::string& attr,
const TYPE& value ) {
292 comm->send(
message::make(
"assignedVals", attr, storage_single_t(TYPE(value))));
299 template<
typename TYPE>
303 if( !initialized_pts_ ) push_buffer_pts.emplace_back( loc );
305 storage_raw_t& n = push_buffer_raw[attr];
306 if( !n ) n = storage_raw_t(std::vector<std::pair<size_t,TYPE> >());
307 storage_cast<std::vector<std::pair<size_t,TYPE> >&>(n).emplace_back( fixedPointCount_, value );
313 storage_t& n = push_buffer[attr];
314 if( !n ) n = storage_t(std::vector<std::pair<point_type,TYPE> >());
315 storage_cast<std::vector<std::pair<point_type,TYPE> >&>(n).emplace_back( loc, value );
319 #ifdef PYTHON_BINDINGS
320 template<
typename TYPE>
321 void push_many(
const std::string& attr,
const class py::array_t<REAL>& points,
322 const class py::array_t<TYPE>& values) {
325 auto points_arr = points.template unchecked<2>();
326 auto values_arr = values.template unchecked<1>();
327 assert(points_arr.shape(0) == values_arr.shape(0));
328 for (ssize_t i = 0; i < points_arr.shape(0); i++) {
329 for (ssize_t j = 0; j < points_arr.shape(1); j++)
330 p[j] = points_arr(i,j);
331 push<TYPE>(attr, p, values_arr(i));
335 template<
class SAMPLER,
class TIME_SAMPLER>
336 py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
337 fetch_many(
const std::string& attr,
const py::array_t<REAL,py::array::c_style> points,
const time_type t,
338 const SAMPLER &sampler,
const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true) {
341 auto points_arr = points.template unchecked<2>();
342 py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
343 auto values_arr = values.template mutable_unchecked<1>();
344 for (ssize_t i = 0; i < points_arr.shape(0); i++) {
345 for (ssize_t j = 0; j < points_arr.shape(1); j++)
346 p[j] = points_arr(i,j);
347 values_arr(i) =
fetch(attr, p, t, sampler, t_sampler, barrier_enabled);
352 template<
class SAMPLER,
class TIME_SAMPLER>
353 py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
354 fetch_many(
const std::string& attr,
const py::array_t<REAL,py::array::c_style> points,
const time_type t,
355 const iterator_type it,
const SAMPLER &sampler,
const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true) {
358 auto points_arr = points.template unchecked<2>();
359 py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
360 auto values_arr = values.template mutable_unchecked<1>();
361 for (ssize_t i = 0; i < points_arr.shape(0); i++) {
362 for (ssize_t j = 0; j < points_arr.shape(1); j++)
363 p[j] = points_arr(i,j);
364 values_arr(i) =
fetch(attr, p, t, it, sampler, t_sampler, barrier_enabled);
369 template<
class SAMPLER,
class TIME_SAMPLER,
class ALGORITHM>
370 py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
371 fetch_many(
const std::string& attr,
const py::array_t<REAL,py::array::c_style> points,
const time_type t,
372 const SAMPLER &sampler,
const TIME_SAMPLER &t_sampler,
const ALGORITHM &algorithm,
bool barrier_enabled =
true) {
375 auto points_arr = points.template unchecked<2>();
376 py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
377 auto values_arr = values.template mutable_unchecked<1>();
378 for (ssize_t i = 0; i < points_arr.shape(0); i++) {
379 for (ssize_t j = 0; j < points_arr.shape(1); j++)
380 p[j] = points_arr(i,j);
381 values_arr(i) =
fetch(attr, p, t, sampler, t_sampler, algorithm, barrier_enabled);
386 template<
class SAMPLER,
class TIME_SAMPLER,
class ALGORITHM>
387 py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
388 fetch_many(
const std::string& attr,
const py::array_t<REAL,py::array::c_style> points,
const time_type t,
389 const iterator_type it,
const SAMPLER &sampler,
const TIME_SAMPLER &t_sampler,
const ALGORITHM &algorithm,
bool barrier_enabled =
true) {
392 auto points_arr = points.template unchecked<2>();
393 py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
394 auto values_arr = values.template mutable_unchecked<1>();
395 for (ssize_t i = 0; i < points_arr.shape(0); i++) {
396 for (ssize_t j = 0; j < points_arr.shape(1); j++)
397 p[j] = points_arr(i,j);
398 values_arr(i) =
fetch(attr, p, t, it, sampler, t_sampler, algorithm, barrier_enabled);
404 template<
typename TYPE,
class TIME_SAMPLER>
405 py::array_t<REAL, py::array::c_style>
406 fetch_points_np(
const std::string& attr,
const time_type t,
const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true, TYPE test_value =
static_cast<TYPE
>(0)) {
407 std::vector<point_type> points = fetch_points<TYPE>(attr, t, t_sampler, barrier_enabled);
408 size_t n = points.size();
410 py::array_t<REAL, py::array::c_style> points_np({n,
static_cast<size_t>(
D)});
411 auto points_np_arr = points_np.template mutable_unchecked<2>();
412 for (std::size_t i = 0; i < n; i++)
413 for (std::size_t j = 0; j <
D; j++)
414 points_np_arr(i,j) = (points[i].data())[j];
418 template<
typename TYPE,
class TIME_SAMPLER>
419 py::array_t<REAL, py::array::c_style>
420 fetch_points_np(
const std::string& attr,
const time_type t,
const iterator_type it,
const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true, TYPE test_value =
static_cast<TYPE
>(0)) {
421 std::vector<point_type> points = fetch_points<TYPE>(attr, t, it, t_sampler, barrier_enabled);
422 size_t n = points.size();
424 py::array_t<REAL, py::array::c_style> points_np({n,
static_cast<size_t>(
D)});
425 auto points_np_arr = points_np.template mutable_unchecked<2>();
426 for (std::size_t i = 0; i < n; i++)
427 for (std::size_t j = 0; j <
D; j++)
428 points_np_arr(i,j) = (points[i].data())[j];
438 template<
typename TYPE>
439 TYPE
fetch(
const std::string& attr ) {
440 storage_single_t& n = assigned_values[attr];
441 if( !n )
return TYPE();
442 return storage_cast<TYPE&>(n);
447 template<
class SAMPLER,
class TIME_SAMPLER,
typename ... ADDITIONAL>
448 typename SAMPLER::OTYPE
450 SAMPLER& sampler,
const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true,
451 ADDITIONAL && ... additional ) {
453 if( fetch_t_hist_ != t && barrier_enabled )
454 barrier(t_sampler.get_upper_bound(t));
458 std::vector<std::pair<std::pair<time_type,iterator_type>,
typename SAMPLER::OTYPE> > v;
459 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
460 std::numeric_limits<iterator_type>::lowest());
462 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
463 std::numeric_limits<iterator_type>::lowest());
464 auto end = log.upper_bound(curr_time_upper);
466 if( log.size() == 1 ) end = log.end();
468 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
469 const auto& iter = start->second.find(attr);
470 if( iter == start->second.end() )
continue;
471 v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
474 return t_sampler.filter(t, v);
479 template<
class SAMPLER,
class TIME_SAMPLER,
typename ... ADDITIONAL>
480 typename SAMPLER::OTYPE
482 SAMPLER& sampler,
const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true,
483 ADDITIONAL && ... additional ) {
485 if((fetch_t_hist_ != t || fetch_i_hist_ != it) && barrier_enabled)
486 barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
491 std::vector<std::pair<std::pair<time_type,iterator_type>,
typename SAMPLER::OTYPE> > v;
492 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
493 t_sampler.get_lower_bound(it)-
threshold(it));
495 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
496 t_sampler.get_upper_bound(it)+
threshold(it));
497 auto end = log.upper_bound(curr_time_upper);
499 if( log.size() == 1 ) end = log.end();
501 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
502 const auto& iter = start->second.find(attr);
503 if( iter == start->second.end() )
continue;
504 v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
507 return t_sampler.filter(std::make_pair(t,it), v);
512 template<
class SAMPLER,
class TIME_SAMPLER,
class COUPLING_ALGO,
typename ... ADDITIONAL>
513 typename SAMPLER::OTYPE
515 SAMPLER& sampler,
const TIME_SAMPLER &t_sampler,
const COUPLING_ALGO &cpl_algo,
516 bool barrier_enabled =
true, ADDITIONAL && ... additional ) {
518 if( fetch_t_hist_ != t && barrier_enabled )
519 barrier(t_sampler.get_upper_bound(t));
523 std::vector<std::pair<std::pair<time_type,iterator_type>,
typename SAMPLER::OTYPE> > v;
524 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
525 std::numeric_limits<iterator_type>::lowest());
527 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
528 std::numeric_limits<iterator_type>::lowest());
529 auto end = log.upper_bound(curr_time_upper);
531 if( log.size() == 1 ) end = log.end();
533 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
534 const auto& iter = start->second.find(attr);
535 if( iter == start->second.end() )
continue;
536 v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
539 return cpl_algo.relaxation(std::make_pair(std::numeric_limits<time_type>::lowest(),
static_cast<iterator_type>(t)), focus, t_sampler.filter(t, v));
544 template<
class SAMPLER,
class TIME_SAMPLER,
class COUPLING_ALGO,
typename ... ADDITIONAL>
545 typename SAMPLER::OTYPE
547 SAMPLER& sampler,
const TIME_SAMPLER &t_sampler,
const COUPLING_ALGO &cpl_algo,
548 bool barrier_enabled =
true, ADDITIONAL && ... additional ) {
550 if((fetch_t_hist_ != t || fetch_i_hist_ != it) && barrier_enabled)
551 barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
556 std::vector<std::pair<std::pair<time_type,iterator_type>,
typename SAMPLER::OTYPE> > v;
557 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
558 t_sampler.get_lower_bound(it)-
threshold(it));
560 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
561 t_sampler.get_upper_bound(it)+
threshold(it));
562 auto end = log.upper_bound(curr_time_upper);
564 if( log.size() == 1 ) end = log.end();
566 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
567 const auto& iter = start->second.find(attr);
568 if( iter == start->second.end() )
continue;
569 v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
572 return cpl_algo.relaxation(std::make_pair(t,it), focus, t_sampler.filter(std::make_pair(t,it), v));
577 template<
typename TYPE,
class TIME_SAMPLER,
typename ... ADDITIONAL>
578 std::vector<point_type>
580 const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true, ADDITIONAL && ... additional ) {
582 if( fetch_t_hist_ != t && barrier_enabled )
583 barrier(t_sampler.get_upper_bound(t));
587 using vec = std::vector<std::pair<point_type,TYPE> >;
588 std::vector <point_type> return_points;
590 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
591 std::numeric_limits<iterator_type>::lowest());
593 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
594 std::numeric_limits<iterator_type>::lowest());
595 auto end = log.upper_bound(curr_time_upper);
597 if( log.size() == 1 ) end = log.end();
599 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
600 const auto& iter = start->second.find(attr);
601 if( iter == start->second.end() )
continue;
602 const vec& ds = iter->second.template return_data<TYPE>();
603 return_points.reserve(ds.size());
604 for(
size_t i=0; i<ds.size(); i++ ) {
605 return_points.emplace_back(ds[i].first);
609 return return_points;
614 template<
typename TYPE,
class TIME_SAMPLER,
typename ... ADDITIONAL>
615 std::vector<point_type>
617 const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true, ADDITIONAL && ... additional ) {
619 if( fetch_t_hist_ != t && fetch_i_hist_ != it && barrier_enabled)
620 barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
625 using vec = std::vector<std::pair<point_type,TYPE> >;
626 std::vector <point_type> return_points;
628 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
629 t_sampler.get_lower_bound(it)-
threshold(it));
631 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
632 t_sampler.get_upper_bound(it)+
threshold(it));
634 auto end = log.upper_bound(curr_time_upper);
636 if( log.size() == 1 ) end = log.end();
638 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
639 const auto& iter = start->second.find(attr);
640 if( iter == start->second.end() )
continue;
641 const vec& ds = iter->second.template return_data<TYPE>();
642 return_points.reserve(ds.size());
643 for(
size_t i=0; i<ds.size(); i++ ) {
644 return_points.emplace_back(ds[i].first);
648 return return_points;
653 template<
typename TYPE,
class TIME_SAMPLER,
typename ... ADDITIONAL>
656 const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true, ADDITIONAL && ... additional ) {
658 if( fetch_t_hist_ != t && barrier_enabled )
659 barrier(t_sampler.get_upper_bound(t));
663 using vec = std::vector<std::pair<point_type,TYPE> >;
664 std::vector<TYPE> return_values;
666 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
667 std::numeric_limits<iterator_type>::lowest());
668 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
669 std::numeric_limits<iterator_type>::lowest());
671 auto end = log.upper_bound(curr_time_upper);
673 if( log.size() == 1 ) end = log.end();
675 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
676 const auto& iter = start->second.find(attr);
677 if( iter == start->second.end() )
continue;
678 const vec& ds = iter->second.template return_data<TYPE>();
679 return_values.reserve(ds.size());
680 for(
size_t i=0; i<ds.size(); i++ ) {
681 return_values.emplace_back(ds[i].second);
685 return return_values;
690 template<
typename TYPE,
class TIME_SAMPLER,
typename ... ADDITIONAL>
693 const TIME_SAMPLER &t_sampler,
bool barrier_enabled =
true, ADDITIONAL && ... additional ) {
695 if( fetch_t_hist_ != t && fetch_i_hist_ != it && barrier_enabled)
696 barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
701 using vec = std::vector<std::pair<point_type,TYPE> >;
702 std::vector<TYPE> return_values;
704 std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-
threshold(t),
705 t_sampler.get_lower_bound(it)-
threshold(it));
706 std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+
threshold(t),
707 t_sampler.get_upper_bound(it)+
threshold(it));
709 auto end = log.upper_bound(curr_time_upper);
711 if( log.size() == 1 ) end = log.end();
713 for(
auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
714 const auto& iter = start->second.find(attr);
715 if( iter == start->second.end() )
continue;
716 const vec& ds = iter->second.template return_data<TYPE>();
717 return_values.reserve(ds.size());
718 for(
size_t i=0; i<ds.size(); i++ ) {
719 return_values.emplace_back(ds[i].second);
723 return return_values;
731 std::pair<time_type, iterator_type>
time(t, it);
734 if ( !smart_send_set_ ) {
736 std::fill(peer_is_sending.begin(), peer_is_sending.end(),
true);
738 smart_send_set_ =
true;
743 if( push_buffer_pts.size() > 0 ) {
744 comm->send(
message::make(
"points",comm->local_rank(),std::move(push_buffer_pts)),peer_is_sending );
745 initialized_pts_ =
true;
746 push_buffer_pts.clear();
750 fixedPointCount_ = 0;
752 if( push_buffer_raw.size() > 0 ) {
753 comm->send(
message::make(
"rawdata",comm->local_rank(),
time,std::move(push_buffer_raw)),peer_is_sending );
754 push_buffer_raw.clear();
758 if( push_buffer.size() > 0 ) {
764 comm->send(
message::make(
"timestamp",comm->local_rank(),
time),peer_is_sending );
766 return std::count( peer_is_sending.begin(),peer_is_sending.end(),
true );
773 if( (((span_start < t) ||
almost_equal(span_start, t)) &&
774 ((t < span_timeout) ||
almost_equal(t, span_timeout))) ) {
775 for(
size_t i=0; i < peers.size(); i++ ) {
777 if( peers[i].is_recv_disabled() ) {
778 peer_is_sending[i] =
false;
783 peer_is_sending[i] = peers[i].is_recving( t, current_span );
787 for(
size_t i=0; i < peers.size(); i++ ) {
788 if( peers[i].is_recv_disabled() ) peer_is_sending[i] =
false;
796 std::pair<time_type,iterator_type>
time(t,it);
803 using logitem_ref_t =
typename decltype(log)::const_reference;
804 return std::any_of(log.begin(), log.end(), [=](logitem_ref_t time_frame) {
805 return time_frame.second.find(attr) != time_frame.second.end(); })
806 && std::all_of(peers.begin(), peers.end(), [=](
const peer_state& p) {
807 return (p.is_send_disabled()) || (!p.is_sending(t, recv_span)) ||
808 ((((p.current_t() > t) || almost_equal(p.current_t(), t)) || (p.next_t() > t))); });
814 using logitem_ref_t =
typename decltype(log)::const_reference;
815 return std::any_of(log.begin(), log.end(), [=](logitem_ref_t time_frame) {
816 return time_frame.second.find(attr) != time_frame.second.end(); })
817 && std::all_of(peers.begin(), peers.end(), [=](
const peer_state& p) {
818 return (p.is_send_disabled()) || (!p.is_sending(t, recv_span)) ||
819 ((((p.current_t() > t) || almost_equal(p.current_t(), t)) || (p.next_t() > t)) &&
820 (((p.current_it() > it) || almost_equal(p.current_it(), it)) || (p.current_it() > it))); });
827 std::lock_guard<std::mutex> lock(mutex);
829 auto start = std::chrono::system_clock::now();
832 size_t peers_unblocked = 0;
833 for(
size_t p = 0; p < peers.size(); p++ ) {
834 if( peers[p].is_send_disabled() ) { peers_unblocked++;
continue; }
835 if( !peers[p].is_sending(t, recv_span) ) { peers_unblocked++;
continue; }
836 if( (peers[p].current_t() > t ||
almost_equal(peers[p].current_t(), t)) || peers[p].next_t() > t ) {
842 if( peers_unblocked == peers.size() )
849 if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
850 std::cout <<
"MUI Warning [uniface.h]: Communication barrier spent over 5 seconds" << std::endl;
859 std::lock_guard<std::mutex> lock(mutex);
861 auto start = std::chrono::system_clock::now();
864 size_t peers_unblocked = 0;
865 for(
size_t p = 0; p < peers.size(); p++ ) {
866 if( peers[p].is_send_disabled() ) { peers_unblocked++;
continue; }
867 if( !peers[p].is_sending(t, recv_span) ) { peers_unblocked++;
continue; }
868 if( ((peers[p].current_t() > t ||
almost_equal(peers[p].current_t(), t)) || peers[p].next_t() > t) &&
869 ((peers[p].current_it() > it ||
almost_equal(peers[p].current_it(), it)) || peers[p].next_it() > it) ) {
875 if( peers_unblocked == peers.size() )
882 if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
883 std::cout <<
"MUI Warning [uniface.h]: Communication barrier spent over 5 seconds" << std::endl;
892 std::lock_guard<std::mutex> lock(mutex);
894 auto start = std::chrono::system_clock::now();
897 if( std::all_of(peers.begin(), peers.end(), [=](
const peer_state& p) {
898 return (p.ss_send_status()); }) )
break;
902 for(
size_t i=0; i<peers.size(); i++) {
903 peers[i].set_ss_send_status(
false);
907 if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
908 std::cout <<
"MUI Warning [uniface.h]: Smart Send communication barrier spent over 5 seconds" << std::endl;
917 std::lock_guard<std::mutex> lock(mutex);
919 auto start = std::chrono::system_clock::now();
922 if( std::all_of(peers.begin(), peers.end(), [=](
const peer_state& p) {
923 return (p.ss_recv_status()); }) )
break;
927 for(
size_t i=0; i<peers.size(); i++) {
928 peers[i].set_ss_recv_status(
false);
931 if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
933 std::cout <<
"MUI Warning [uniface.h]: Smart Send communication barrier spent over 5 seconds" << std::endl;
941 span_timeout = timeout;
942 current_span.swap(s);
943 comm->send(
message::make(
"sendingSpan", comm->local_rank(), start, timeout, std::move(current_span)));
945 smart_send_set_ =
false;
951 comm->send(
message::make(
"sendingDisable", comm->local_rank()));
959 recv_timeout = timeout;
961 comm->send(
message::make(
"receivingSpan", comm->local_rank(), start, timeout, std::move(recv_span)));
963 smart_send_set_ =
false;
969 comm->send(
message::make(
"receivingDisable", comm->local_rank()));
976 std::pair<time_type,iterator_type> upper_limit(last+
threshold(last),
977 std::numeric_limits<iterator_type>::lowest());
979 log.erase(log.begin(), log.upper_bound(upper_limit));
982 std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
983 std::numeric_limits<iterator_type>::lowest());
985 if( !log.empty() ) curr_time = log.rbegin()->first;
987 for(
size_t i=0; i < peers.size(); i++ ) {
988 peers[i].set_current_t(curr_time.first);
989 peers[i].set_current_sub(curr_time.second);
993 fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
998 void forget( std::pair<time_type,iterator_type> last,
bool reset_log =
false ) {
999 std::pair<time_type,iterator_type> upper_limit(last.first+
threshold(last.first),
1002 log.erase(log.begin(), log.upper_bound(upper_limit));
1005 std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
1006 std::numeric_limits<iterator_type>::lowest());
1008 if( !log.empty() ) curr_time = log.rbegin()->first;
1010 for(
size_t i=0; i < peers.size(); i++ ) {
1011 peers[i].set_current_t(curr_time.first);
1012 peers[i].set_current_sub(curr_time.second);
1016 fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1017 fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
1023 std::pair<time_type,iterator_type> lower_limit(first-
threshold(first),
1024 std::numeric_limits<iterator_type>::lowest());
1025 std::pair<time_type,iterator_type> upper_limit(last+
threshold(last),
1026 std::numeric_limits<iterator_type>::lowest());
1028 log.erase(log.lower_bound(lower_limit), log.upper_bound(upper_limit));
1031 std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
1032 std::numeric_limits<iterator_type>::lowest());
1034 if( !log.empty() ) curr_time = log.rbegin()->first;
1036 for(
size_t i=0; i < peers.size(); i++ ) {
1037 peers[i].set_current_t(curr_time.first);
1038 peers[i].set_current_sub(curr_time.second);
1042 fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1047 void forget( std::pair<time_type,iterator_type> first, std::pair<time_type,iterator_type> last,
bool reset_log =
false ) {
1048 std::pair<time_type,iterator_type> lower_limit(first.first-
threshold(first.first),
1050 std::pair<time_type,iterator_type> upper_limit(last.first+
threshold(last.first),
1053 log.erase(log.lower_bound(lower_limit), log.upper_bound(upper_limit));
1056 std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
1057 std::numeric_limits<iterator_type>::lowest());
1059 if( !log.empty() ) curr_time = log.rbegin()->first;
1061 for(
size_t i=0; i<peers.size(); i++ ) {
1062 peers[i].set_current_t(curr_time.first);
1063 peers[i].set_current_sub(curr_time.second);
1067 fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1068 fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
1076 fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1077 fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
1083 return comm->uri_host();
1089 return comm->uri_path();
1095 return comm->uri_protocol();
1103 if(
m.has_id() ) readers[
m.id()](
m);
1108 void on_recv_confirm( int32_t sender, std::pair<time_type,iterator_type> timestamp ) {
1109 peers[sender].set_current_t(timestamp.first);
1110 peers[sender].set_current_sub(timestamp.second);
1115 void on_recv_forecast( int32_t sender, std::pair<time_type,iterator_type> timestamp ) {
1116 peers[sender].set_next_t(timestamp.first);
1117 peers[sender].set_next_sub(timestamp.second);
1122 void on_recv_data( std::pair<time_type,iterator_type> timestamp, frame_type frame ) {
1123 auto itr = log.find(timestamp);
1125 if( itr == log.end() )
1126 std::tie(itr,std::ignore) = log.insert(std::make_pair(timestamp,bin_frame_type()));
1128 auto& cur = itr->second;
1130 for(
auto& p: frame ){
1131 auto pstr = cur.find(p.first);
1132 if( pstr == cur.end() ) cur.insert(std::make_pair(std::move(p.first),spatial_t(std::move(p.second))));
1133 else pstr->second.insert(p.second);
1136 log.erase(log.begin(), log.upper_bound({timestamp.first-memory_length, timestamp.second}));
1141 void on_recv_rawdata( int32_t sender, std::pair<time_type,iterator_type> timestamp, frame_raw_type frame ) {
1142 on_recv_data( timestamp, associate( sender, frame ) );
1148 peers[sender].set_recving(start,timeout,std::move(s));
1149 peers[sender].set_ss_recv_status(
true);
1155 peers[sender].set_sending(start,timeout,std::move(s));
1156 peers[sender].set_ss_send_status(
true);
1161 void on_recv_disable( int32_t sender ) {
1162 peers[sender].set_recv_disable();
1163 peers[sender].set_ss_recv_status(
true);
1164 peer_is_sending[sender] =
false;
1169 void on_send_disable( int32_t sender ) {
1170 peers[sender].set_send_disable();
1171 peers[sender].set_ss_send_status(
true);
1176 void on_recv_points( int32_t sender, std::vector<point_type> points ) {
1177 peers[sender].set_pts(points);
1182 void on_recv_assignedVals( std::string attr, storage_single_t data ) {
1183 typename std::unordered_map<std::string, storage_single_t >::iterator it = assigned_values.find(attr);
1184 if (it != assigned_values.end())
1187 assigned_values.insert( std::pair<std::string, storage_single_t>( attr, data ) );
1192 inline frame_type associate( int32_t sender, frame_raw_type& frame ) {
1194 const auto& pts = peers[sender].pts();
1196 for(
auto& p: frame ) {
1197 const auto& data = storage_cast<const std::vector<std::pair<size_t,REAL> >&>(p.second);
1199 buf.insert(std::make_pair(p.first, storage_t(std::vector<std::pair<point_type,REAL> >())));
1200 std::vector<std::pair<point_type,REAL> >& data_store = storage_cast<std::vector<std::pair<point_type,REAL> >&>(
buf[p.first]);
1202 data_store.resize(data.size());
1204 for(
size_t i=0; i<data.size(); i++ ) {
1205 data_store[i].first = pts[data[i].first];
1206 data_store[i].second = data[i].second;
Structures and methods to create an underlying binning structure for data received through an interfa...
Definition: geometry.h:92
void forget(time_type first, time_type last, bool reset_log=false)
Removes log between [@first, @last].
Definition: uniface.h:1022
void forget(std::pair< time_type, iterator_type > first, std::pair< time_type, iterator_type > last, bool reset_log=false)
Removes log between [[@first.first,@first.second], [@last.first,@last.second]].
Definition: uniface.h:1047
void forget(time_type last, bool reset_log=false)
Removes log between (-inf, @last].
Definition: uniface.h:975
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, const COUPLING_ALGO &cpl_algo, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface with coupling algorithms, blocking with barrier at time=t.
Definition: uniface.h:514
static const bool FIXEDPOINTS
Definition: uniface.h:84
void forecast(time_type t, iterator_type it=std::numeric_limits< iterator_type >::lowest())
Sends a forecast of an upcoming time to remote nodes.
Definition: uniface.h:795
void announce_send_span(time_type start, time_type timeout, span_t s, bool synchronised=false)
Announces to all remote nodes using non-blocking peer-to-peer approach "I'll send this span".
Definition: uniface.h:939
void announce_send_disable(bool synchronised=false)
Announces to all remote nodes "I'm disabled for send".
Definition: uniface.h:950
void barrier_ss_send()
Blocking barrier for Smart Send send values. Initiates receive from remote nodes.
Definition: uniface.h:890
std::string uri_protocol()
Returns the URI protocol for the created uniface.
Definition: uniface.h:1094
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, const iterator_type it, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, const COUPLING_ALGO &cpl_algo, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface with coupling algorithms, blocking with barrier at time=t,...
Definition: uniface.h:546
uniface(std::string const &URI)
Definition: uniface.h:254
bool is_ready(const std::string &attr, time_type t) const
Tests whether data is available at time=t.
Definition: uniface.h:802
typename CONFIG::time_type time_type
Definition: uniface.h:89
uniface(communicator *comm_)
Definition: uniface.h:255
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface, blocking with barrier at time=t.
Definition: uniface.h:449
TYPE fetch(const std::string &attr)
Fetch a single parameter from the interface Overloaded fetch to fetch a single parameter of name attr...
Definition: uniface.h:439
uniface(const uniface &)=delete
typename CONFIG::REAL REAL
Definition: uniface.h:87
uniface & operator=(const uniface &)=delete
typename CONFIG::data_types data_types
Definition: uniface.h:91
int commit(time_type t, iterator_type it=std::numeric_limits< iterator_type >::lowest())
Serializes pushed data and sends it to remote nodes Serializes pushed data and sends it to remote nod...
Definition: uniface.h:730
geometry::any_shape< CONFIG > span_t
Definition: uniface.h:92
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, const iterator_type it, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface, blocking with barrier at time=t,it.
Definition: uniface.h:481
void barrier(time_type t, iterator_type it)
Blocking barrier at time=t,it. Initiates receive from remote nodes.
Definition: uniface.h:857
void push(const std::string &attr, const point_type &loc, const TYPE &value)
Push data with tag "attr" to buffer Push data with tag "attr" to bcuffer. If using CONFIG::FIXEDPOINT...
Definition: uniface.h:300
void barrier(time_type t)
Blocking barrier at time=t. Initiates receive from remote nodes.
Definition: uniface.h:825
typename CONFIG::point_type point_type
Definition: uniface.h:88
std::vector< TYPE > fetch_values(const std::string &attr, const time_type t, const iterator_type it, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch values currently stored in the interface, blocking with barrier at time=t,it.
Definition: uniface.h:692
std::string uri_path()
Returns the URI path (name) for the created uniface.
Definition: uniface.h:1088
void barrier_ss_recv()
Blocking barrier for Smart Send receive values. Initiates receive from remote nodes.
Definition: uniface.h:915
static const int D
Definition: uniface.h:83
static const bool QUIET
Definition: uniface.h:85
std::vector< TYPE > fetch_values(const std::string &attr, const time_type t, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch values currently stored in the interface, blocking with barrier at time=t.
Definition: uniface.h:655
std::vector< point_type > fetch_points(const std::string &attr, const time_type t, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch points currently stored in the interface, blocking with barrier at time=t.
Definition: uniface.h:579
typename CONFIG::iterator_type iterator_type
Definition: uniface.h:90
std::string uri_host()
Returns the URI host (domain) for the created uniface.
Definition: uniface.h:1082
void set_memory(time_type length)
Removes log between (-inf, current-@length] automatically.
Definition: uniface.h:1073
void push(const std::string &attr, const TYPE &value)
Announce the value value with the parameter attr Useful if, for example, you wish to pass a parameter...
Definition: uniface.h:291
uniface(const char URI[])
Definition: uniface.h:253
void announce_recv_span(time_type start, time_type timeout, span_t s, bool synchronised=false)
Announces to all remote nodes using non-blocking peer-to-peer approach "I'm receiving this span".
Definition: uniface.h:957
std::vector< point_type > fetch_points(const std::string &attr, const time_type t, const iterator_type it, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch points currently stored in the interface, blocking with barrier at time=t,it.
Definition: uniface.h:616
void announce_recv_disable(bool synchronised=false)
Announces to all remote nodes "I'm disabled for receive".
Definition: uniface.h:968
bool is_ready(const std::string &attr, time_type t, iterator_type it) const
Tests whether data is available at time=t,it.
Definition: uniface.h:813
void update_smart_send(time_type t)
Updates Smart Send locality data Creates a new comm rank mapping for Smart Send functionality.
Definition: uniface.h:772
void forget(std::pair< time_type, iterator_type > last, bool reset_log=false)
Removes log between ([-inf,-inf], [@last.first,@last.second]].
Definition: uniface.h:998
File containing class definition of communication interface. This is the base class for all other com...
Structures and methods to create a new communicator based on chosen protocols.
char buf[BUFSIZE]
Definition: comm_tcp.h:281
File containing data structures defining all data types used by an interface.
Implementation of a compound dynamic data structure used throughout MUI.
Structure for communicator used in comm_factory.h.
Structure to contain and manipulate data from internal data to MPI message.
dim< 0, 0, 1, 0, 0, 0, 0, 0 > time
Definition: dim.h:207
u u m
Definition: dim.h:281
dim< 0, 1, 0, 0, 0, 0, 0, 0 > length
Definition: dim.h:206
bool almost_equal(T x, T y)
Definition: util.h:128
T threshold(T x)
Definition: util.h:146
bool collide(const span< CONFIG > &lhs, const span< CONFIG > &rhs)
Definition: span.h:120
SCALAR max(vexpr< E, SCALAR, D > const &u)
Definition: point.h:350
Creates a structure to parse a message as variables and pass them to a function as arguments.
Defines the spatial_storage data type.
Defines base stream class container_stream and associated functors.
Defines the stream in/out for std::string data type.
Defines the stream in/out for the unordered std::unordered_map. data type.
Defines the stream in/out for std::vector data type.
Definition: comm_factory.h:61
static message make(const id_type &id, types &&... data)
Definition: message.h:72
std::string id_type
Definition: message.h:63
Definition: reader_variable.h:64
Definition: dynstorage.h:146
Provides a number of utility functions used through the rest of the library.