Multiscale Universal Interface  2.0
A Concurrent Framework for Coupling Heterogeneous Solvers
uniface.h
Go to the documentation of this file.
1 /*****************************************************************************
2 * Multiscale Universal Interface Code Coupling Library *
3 * *
4 * Copyright (C) 2019 Y. H. Tang, S. Kudo, X. Bian, Z. Li, G. E. Karniadakis, *
5 * S. M. Longshaw, A. Skillen *
6 * *
7 * This software is jointly licensed under the Apache License, Version 2.0 *
8 * and the GNU General Public License version 3, you may use it according *
9 * to either. *
10 * *
11 * ** Apache License, version 2.0 ** *
12 * *
13 * Licensed under the Apache License, Version 2.0 (the "License"); *
14 * you may not use this file except in compliance with the License. *
15 * You may obtain a copy of the License at *
16 * *
17 * http://www.apache.org/licenses/LICENSE-2.0 *
18 * *
19 * Unless required by applicable law or agreed to in writing, software *
20 * distributed under the License is distributed on an "AS IS" BASIS, *
21 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
22 * See the License for the specific language governing permissions and *
23 * limitations under the License. *
24 * *
25 * ** GNU General Public License, version 3 ** *
26 * *
27 * This program is free software: you can redistribute it and/or modify *
28 * it under the terms of the GNU General Public License as published by *
29 * the Free Software Foundation, either version 3 of the License, or *
30 * (at your option) any later version. *
31 * *
32 * This program is distributed in the hope that it will be useful, *
33 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
34 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
35 * GNU General Public License for more details. *
36 * *
37 * You should have received a copy of the GNU General Public License *
38 * along with this program. If not, see <http://www.gnu.org/licenses/>. *
39 *****************************************************************************/
40 
52 #ifndef UNIFACE_H_
53 #define UNIFACE_H_
54 
55 #include "general/util.h"
56 #include "communication/comm.h"
58 #include "config.h"
59 #include "storage/dynstorage.h"
64 #include "storage/stream_vector.h"
66 #include "storage/stream_string.h"
67 #include "storage/bin.h"
68 #include "storage/stream.h"
69 
70 #ifdef PYTHON_BINDINGS
71 #include <pybind11/pybind11.h>
72 #include <pybind11/stl.h>
73 #include <pybind11/numpy.h>
74 namespace py = pybind11;
75 #endif
76 
77 namespace mui {
78 
79 template<typename CONFIG = default_config>
80 class uniface {
81 public:
82  // public typedefs (see config.h for descriptions)
83  static const int D = CONFIG::D;
84  static const bool FIXEDPOINTS = CONFIG::FIXEDPOINTS;
85  static const bool QUIET = CONFIG::QUIET;
86 
87  using REAL = typename CONFIG::REAL;
88  using point_type = typename CONFIG::point_type;
89  using time_type = typename CONFIG::time_type;
90  using iterator_type = typename CONFIG::iterator_type;
91  using data_types = typename CONFIG::data_types;
93 private:
94  // meta functions to split tuple and add vector<pair<point_type,_1> >
95  template<typename T> struct add_vp_ { using type = std::vector<std::pair<point_type,T> >; };
96  template<typename T> struct def_storage_;
97  template<typename... TYPES> struct def_storage_<type_list<TYPES...> >{
99  };
100 
101  template<typename T> struct add_vi_ { using type = std::vector<std::pair<size_t, T> >; };
102  template<typename T> struct def_storage_raw_;
103  template<typename... TYPES> struct def_storage_raw_<type_list<TYPES...> >{
104  using type = storage<typename add_vi_<TYPES>::type...>;
105  };
106 
107  template<typename T> struct def_storage_single_;
108  template<typename... TYPES> struct def_storage_single_<type_list<TYPES...> >{
109  using type = storage<TYPES...>;
110  };
111 
112  // internal typedefinitions for full frame
113  using storage_t = typename def_storage_<data_types>::type;
114  using spatial_t = spatial_storage<bin_t<CONFIG>,storage_t,CONFIG>;
115  using frame_type = std::unordered_map<std::string, storage_t>;
116  using bin_frame_type = std::unordered_map<std::string, spatial_t>;
117  // internal typdefinitions for data values only (static points)
118  using storage_raw_t = typename def_storage_raw_<data_types>::type;
119  using frame_raw_type = std::unordered_map<std::string, storage_raw_t>;
120  // internal typedefinitions for single value
121  using storage_single_t = typename def_storage_single_<data_types>::type;
122 
123  struct peer_state {
124  peer_state() : disable_send(false), disable_recv(false), ss_stat_send(false), ss_stat_recv(false) {}
125 
126  using spans_type = std::map<std::pair<time_type,time_type>,span_t>;
127 
128  bool is_recving(time_type t, const span_t& s) const {
129  return scan_spans_(t,s,recving_spans);
130  }
131 
132  void set_recving( time_type start, time_type end, span_t s ) {
133  recving_spans.emplace(std::make_pair(start,end),std::move(s));
134  }
135 
136  bool is_sending(time_type t, const span_t& s) const {
137  return scan_spans_(t,s,sending_spans);
138  }
139 
140  void set_sending(time_type start, time_type end, span_t s) {
141  sending_spans.emplace(std::make_pair(start,end), std::move(s));
142  }
143 
144  void set_pts(std::vector<point_type>& pts) {
145  pts_ = pts;
146  }
147 
148  const std::vector<point_type>& pts() const {
149  return pts_;
150  }
151 
152  void set_send_disable() {
153  disable_send = true;
154  }
155 
156  void set_recv_disable() {
157  disable_recv = true;
158  }
159 
160  bool is_send_disabled() const {
161  return disable_send;
162  }
163 
164  bool is_recv_disabled() const {
165  return disable_recv;
166  }
167 
168  void set_ss_send_status(bool status) {
169  ss_stat_send = status;
170  }
171 
172  bool ss_send_status() const {
173  return ss_stat_send;
174  }
175 
176  void set_ss_recv_status(bool status) {
177  ss_stat_recv = status;
178  }
179 
180  bool ss_recv_status() const {
181  return ss_stat_recv;
182  }
183 
184  time_type current_t() const { return latest_timestamp; }
185  iterator_type current_it() const { return latest_subiter; }
186  time_type next_t() const { return next_timestamp; }
187  iterator_type next_it() const { return next_subiter; }
188  void set_current_t( time_type t ) { latest_timestamp = t; }
189  void set_current_sub( iterator_type i ) { latest_subiter = i; }
190  void set_next_t( time_type t ) { next_timestamp = t; }
191  void set_next_sub( iterator_type i ) { next_subiter = i; }
192  private:
193  bool scan_spans_(time_type t, const span_t& s, const spans_type& spans ) const {
194  bool prefetched = false;
195  auto end = spans.lower_bound(std::make_pair(t,t));
196  if( spans.size() == 1 ) end = spans.end();
197 
198  for( auto itr = spans.begin(); itr != end; ++itr ) {
199  if( t < itr->first.second || almost_equal(t, itr->first.second) ) {
200  prefetched = true;
201  if( collide(s,itr->second) ) return true;
202  }
203  }
204 
205  // if prefetched at t, but no overlap region, then return false;
206  // otherwise return true;
207  return !prefetched;
208  }
209 
210  time_type latest_timestamp = std::numeric_limits<time_type>::lowest();
211  iterator_type latest_subiter = std::numeric_limits<iterator_type>::lowest();
212  time_type next_timestamp = std::numeric_limits<time_type>::lowest();
213  iterator_type next_subiter = std::numeric_limits<iterator_type>::lowest();
214  spans_type recving_spans;
215  spans_type sending_spans;
216  std::vector<point_type> pts_;
217  std::unordered_map<std::string, storage_single_t> assigned_vals_;
218  bool disable_send;
219  bool disable_recv;
220  bool ss_stat_send;
221  bool ss_stat_recv;
222  };
223 
224 private: // data members
225  std::unique_ptr<communicator> comm;
226  dispatcher<message::id_type, std::function<void(message)> > readers;
227 
228  std::map<std::pair<time_type, iterator_type>, bin_frame_type> log;
229 
230  frame_type push_buffer;
231  frame_raw_type push_buffer_raw;
232  std::vector<point_type> push_buffer_pts;
233 
234  std::unordered_map<std::string, storage_single_t > assigned_values;
235 
236  std::vector<peer_state> peers;
237  std::vector<bool> peer_is_sending;
238  bool smart_send_set_ = true;
239  time_type span_start = std::numeric_limits<time_type>::lowest();
240  time_type span_timeout = std::numeric_limits<time_type>::lowest();
241  span_t current_span;
242  time_type recv_start = std::numeric_limits<time_type>::lowest();
243  time_type recv_timeout = std::numeric_limits<time_type>::lowest();
244  span_t recv_span;
246  std::mutex mutex;
247  bool initialized_pts_;
248  size_t fixedPointCount_;
249  time_type fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
250  iterator_type fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
251 
252 public:
253  uniface( const char URI[] ) : uniface( comm_factory::create_comm(URI, QUIET) ) {}
254  uniface( std::string const &URI ) : uniface( comm_factory::create_comm(URI.c_str(), QUIET) ) {}
255  uniface( communicator* comm_ ) : comm(comm_), initialized_pts_(false), fixedPointCount_(0) {
256  using namespace std::placeholders;
257 
258  peers.resize(comm->remote_size());
259  peer_is_sending.resize(comm->remote_size(), true);
260 
261  readers.link("timestamp", reader_variables<int32_t, std::pair<time_type,iterator_type> >(
262  std::bind(&uniface::on_recv_confirm, this, _1, _2)));
263  readers.link("forecast", reader_variables<int32_t, std::pair<time_type,iterator_type>>(
264  std::bind(&uniface::on_recv_forecast, this, _1, _2)));
265  readers.link("data", reader_variables<std::pair<time_type,iterator_type>, frame_type>(
266  std::bind(&uniface::on_recv_data, this, _1, _2)));
267  readers.link("rawdata", reader_variables<int32_t, std::pair<time_type,iterator_type>, frame_raw_type>(
268  std::bind(&uniface::on_recv_rawdata, this, _1, _2, _3)));
269  readers.link("points", reader_variables<int32_t, std::vector<point_type>>(
270  std::bind(&uniface::on_recv_points, this, _1, _2)));
271  readers.link("assignedVals", reader_variables<std::string, storage_single_t>(
272  std::bind(&uniface::on_recv_assignedVals, this, _1, _2)));
274  std::bind(&uniface::on_recv_span, this, _1, _2, _3, _4)));
276  std::bind(&uniface::on_send_span, this, _1, _2, _3, _4)));
277  readers.link("receivingDisable", reader_variables<int32_t>(
278  std::bind(&uniface::on_send_disable, this, _1)));
279  readers.link("sendingDisable", reader_variables<int32_t>(
280  std::bind(&uniface::on_recv_disable, this, _1)));
281  }
282 
283  uniface( const uniface& ) = delete;
284  uniface& operator=( const uniface& ) = delete;
285 
290  template<typename TYPE>
291  void push( const std::string& attr, const TYPE& value ) {
292  comm->send(message::make("assignedVals", attr, storage_single_t(TYPE(value))));
293  }
294 
299  template<typename TYPE>
300  void push( const std::string& attr, const point_type& loc, const TYPE& value ) {
301  if( FIXEDPOINTS ) {
302  // If this push is before first commit then build local points list
303  if( !initialized_pts_ ) push_buffer_pts.emplace_back( loc );
304 
305  storage_raw_t& n = push_buffer_raw[attr];
306  if( !n ) n = storage_raw_t(std::vector<std::pair<size_t,TYPE> >());
307  storage_cast<std::vector<std::pair<size_t,TYPE> >&>(n).emplace_back( fixedPointCount_, value );
308 
309  // Increment counter for flat fixed point list
310  fixedPointCount_++;
311  }
312  else {
313  storage_t& n = push_buffer[attr];
314  if( !n ) n = storage_t(std::vector<std::pair<point_type,TYPE> >());
315  storage_cast<std::vector<std::pair<point_type,TYPE> >&>(n).emplace_back( loc, value );
316  }
317  }
318 
319 #ifdef PYTHON_BINDINGS
320  template<typename TYPE>
321  void push_many(const std::string& attr, const class py::array_t<REAL>& points,
322  const class py::array_t<TYPE>& values) {
323  // Arrays must have ndim = d; can be non-writeable
324  point_type p = 0;
325  auto points_arr = points.template unchecked<2>();
326  auto values_arr = values.template unchecked<1>();
327  assert(points_arr.shape(0) == values_arr.shape(0));
328  for (ssize_t i = 0; i < points_arr.shape(0); i++) {
329  for (ssize_t j = 0; j < points_arr.shape(1); j++)
330  p[j] = points_arr(i,j);
331  push<TYPE>(attr, p, values_arr(i));
332  }
333  }
334 
335  template<class SAMPLER, class TIME_SAMPLER>
336  py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
337  fetch_many(const std::string& attr,const py::array_t<REAL,py::array::c_style> points, const time_type t,
338  const SAMPLER &sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled = true) {
339  // Arrays must have ndim = d; can be non-writeable
340  point_type p = 0;
341  auto points_arr = points.template unchecked<2>();
342  py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
343  auto values_arr = values.template mutable_unchecked<1>();
344  for (ssize_t i = 0; i < points_arr.shape(0); i++) {
345  for (ssize_t j = 0; j < points_arr.shape(1); j++)
346  p[j] = points_arr(i,j);
347  values_arr(i) = fetch(attr, p, t, sampler, t_sampler, barrier_enabled);
348  }
349  return values;
350  }
351 
352  template<class SAMPLER, class TIME_SAMPLER>
353  py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
354  fetch_many(const std::string& attr,const py::array_t<REAL,py::array::c_style> points, const time_type t,
355  const iterator_type it, const SAMPLER &sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled = true) {
356  // Arrays must have ndim = d; can be non-writeable
357  point_type p = 0;
358  auto points_arr = points.template unchecked<2>();
359  py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
360  auto values_arr = values.template mutable_unchecked<1>();
361  for (ssize_t i = 0; i < points_arr.shape(0); i++) {
362  for (ssize_t j = 0; j < points_arr.shape(1); j++)
363  p[j] = points_arr(i,j);
364  values_arr(i) = fetch(attr, p, t, it, sampler, t_sampler, barrier_enabled);
365  }
366  return values;
367  }
368 
369  template<class SAMPLER, class TIME_SAMPLER, class ALGORITHM>
370  py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
371  fetch_many(const std::string& attr,const py::array_t<REAL,py::array::c_style> points, const time_type t,
372  const SAMPLER &sampler, const TIME_SAMPLER &t_sampler, const ALGORITHM &algorithm, bool barrier_enabled = true) {
373  // Arrays must have ndim = d; can be non-writeable
374  point_type p = 0;
375  auto points_arr = points.template unchecked<2>();
376  py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
377  auto values_arr = values.template mutable_unchecked<1>();
378  for (ssize_t i = 0; i < points_arr.shape(0); i++) {
379  for (ssize_t j = 0; j < points_arr.shape(1); j++)
380  p[j] = points_arr(i,j);
381  values_arr(i) = fetch(attr, p, t, sampler, t_sampler, algorithm, barrier_enabled);
382  }
383  return values;
384  }
385 
386  template<class SAMPLER, class TIME_SAMPLER, class ALGORITHM>
387  py::array_t<typename SAMPLER::OTYPE,py::array::c_style>
388  fetch_many(const std::string& attr,const py::array_t<REAL,py::array::c_style> points, const time_type t,
389  const iterator_type it, const SAMPLER &sampler, const TIME_SAMPLER &t_sampler, const ALGORITHM &algorithm, bool barrier_enabled = true) {
390  // Arrays must have ndim = d; can be non-writeable
391  point_type p = 0;
392  auto points_arr = points.template unchecked<2>();
393  py::array_t<typename SAMPLER::OTYPE,py::array::c_style> values(points_arr.shape(0));
394  auto values_arr = values.template mutable_unchecked<1>();
395  for (ssize_t i = 0; i < points_arr.shape(0); i++) {
396  for (ssize_t j = 0; j < points_arr.shape(1); j++)
397  p[j] = points_arr(i,j);
398  values_arr(i) = fetch(attr, p, t, it, sampler, t_sampler, algorithm, barrier_enabled);
399  }
400  return values;
401  }
402 
403 
404  template<typename TYPE, class TIME_SAMPLER>
405  py::array_t<REAL, py::array::c_style>
406  fetch_points_np(const std::string& attr, const time_type t, const TIME_SAMPLER &t_sampler, bool barrier_enabled = true, TYPE test_value = static_cast<TYPE>(0)) {
407  std::vector<point_type> points = fetch_points<TYPE>(attr, t, t_sampler, barrier_enabled);
408  size_t n = points.size();
409  test_value += 1;
410  py::array_t<REAL, py::array::c_style> points_np({n, static_cast<size_t>(D)});
411  auto points_np_arr = points_np.template mutable_unchecked<2>();
412  for (std::size_t i = 0; i < n; i++)
413  for (std::size_t j = 0; j < D; j++)
414  points_np_arr(i,j) = (points[i].data())[j];
415  return points_np;
416  }
417 
418  template<typename TYPE, class TIME_SAMPLER>
419  py::array_t<REAL, py::array::c_style>
420  fetch_points_np(const std::string& attr, const time_type t, const iterator_type it, const TIME_SAMPLER &t_sampler, bool barrier_enabled = true, TYPE test_value = static_cast<TYPE>(0)) {
421  std::vector<point_type> points = fetch_points<TYPE>(attr, t, it, t_sampler, barrier_enabled);
422  size_t n = points.size();
423  test_value += 1;
424  py::array_t<REAL, py::array::c_style> points_np({n, static_cast<size_t>(D)});
425  auto points_np_arr = points_np.template mutable_unchecked<2>();
426  for (std::size_t i = 0; i < n; i++)
427  for (std::size_t j = 0; j < D; j++)
428  points_np_arr(i,j) = (points[i].data())[j];
429  return points_np;
430  }
431 #endif
432 
438  template<typename TYPE>
439  TYPE fetch( const std::string& attr ) {
440  storage_single_t& n = assigned_values[attr];
441  if( !n ) return TYPE();
442  return storage_cast<TYPE&>(n);
443  }
444 
447  template<class SAMPLER, class TIME_SAMPLER, typename ... ADDITIONAL>
448  typename SAMPLER::OTYPE
449  fetch( const std::string& attr,const point_type& focus, const time_type t,
450  SAMPLER& sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled = true,
451  ADDITIONAL && ... additional ) {
452  // Only enter barrier on first fetch for time=t
453  if( fetch_t_hist_ != t && barrier_enabled )
454  barrier(t_sampler.get_upper_bound(t));
455 
456  fetch_t_hist_ = t;
457 
458  std::vector<std::pair<std::pair<time_type,iterator_type>,typename SAMPLER::OTYPE> > v;
459  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
460  std::numeric_limits<iterator_type>::lowest());
461 
462  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
463  std::numeric_limits<iterator_type>::lowest());
464  auto end = log.upper_bound(curr_time_upper);
465 
466  if( log.size() == 1 ) end = log.end();
467 
468  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
469  const auto& iter = start->second.find(attr);
470  if( iter == start->second.end() ) continue;
471  v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
472  }
473 
474  return t_sampler.filter(t, v);
475  }
476 
479  template<class SAMPLER, class TIME_SAMPLER, typename ... ADDITIONAL>
480  typename SAMPLER::OTYPE
481  fetch( const std::string& attr,const point_type& focus, const time_type t, const iterator_type it,
482  SAMPLER& sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled = true,
483  ADDITIONAL && ... additional ) {
484  // Only enter barrier on first fetch for time=t,iteration=it
485  if((fetch_t_hist_ != t || fetch_i_hist_ != it) && barrier_enabled)
486  barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
487 
488  fetch_t_hist_ = t;
489  fetch_i_hist_ = it;
490 
491  std::vector<std::pair<std::pair<time_type,iterator_type>,typename SAMPLER::OTYPE> > v;
492  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
493  t_sampler.get_lower_bound(it)-threshold(it));
494 
495  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
496  t_sampler.get_upper_bound(it)+threshold(it));
497  auto end = log.upper_bound(curr_time_upper);
498 
499  if( log.size() == 1 ) end = log.end();
500 
501  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
502  const auto& iter = start->second.find(attr);
503  if( iter == start->second.end() ) continue;
504  v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
505  }
506 
507  return t_sampler.filter(std::make_pair(t,it), v);
508  }
509 
512  template<class SAMPLER, class TIME_SAMPLER, class COUPLING_ALGO, typename ... ADDITIONAL>
513  typename SAMPLER::OTYPE
514  fetch( const std::string& attr,const point_type& focus, const time_type t,
515  SAMPLER& sampler, const TIME_SAMPLER &t_sampler, const COUPLING_ALGO &cpl_algo,
516  bool barrier_enabled = true, ADDITIONAL && ... additional ) {
517  // Only enter barrier on first fetch for time=t
518  if( fetch_t_hist_ != t && barrier_enabled )
519  barrier(t_sampler.get_upper_bound(t));
520 
521  fetch_t_hist_ = t;
522 
523  std::vector<std::pair<std::pair<time_type,iterator_type>,typename SAMPLER::OTYPE> > v;
524  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
525  std::numeric_limits<iterator_type>::lowest());
526 
527  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
528  std::numeric_limits<iterator_type>::lowest());
529  auto end = log.upper_bound(curr_time_upper);
530 
531  if( log.size() == 1 ) end = log.end();
532 
533  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
534  const auto& iter = start->second.find(attr);
535  if( iter == start->second.end() ) continue;
536  v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
537  }
538 
539  return cpl_algo.relaxation(std::make_pair(std::numeric_limits<time_type>::lowest(), static_cast<iterator_type>(t)), focus, t_sampler.filter(t, v));
540  }
541 
544  template<class SAMPLER, class TIME_SAMPLER, class COUPLING_ALGO, typename ... ADDITIONAL>
545  typename SAMPLER::OTYPE
546  fetch( const std::string& attr,const point_type& focus, const time_type t, const iterator_type it,
547  SAMPLER& sampler, const TIME_SAMPLER &t_sampler, const COUPLING_ALGO &cpl_algo,
548  bool barrier_enabled = true, ADDITIONAL && ... additional ) {
549  // Only enter barrier on first fetch for time=t,iteration=it
550  if((fetch_t_hist_ != t || fetch_i_hist_ != it) && barrier_enabled)
551  barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
552 
553  fetch_t_hist_ = t;
554  fetch_i_hist_ = it;
555 
556  std::vector<std::pair<std::pair<time_type,iterator_type>,typename SAMPLER::OTYPE> > v;
557  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
558  t_sampler.get_lower_bound(it)-threshold(it));
559 
560  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
561  t_sampler.get_upper_bound(it)+threshold(it));
562  auto end = log.upper_bound(curr_time_upper);
563 
564  if( log.size() == 1 ) end = log.end();
565 
566  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ) {
567  const auto& iter = start->second.find(attr);
568  if( iter == start->second.end() ) continue;
569  v.emplace_back( start->first, iter->second.build_and_query_ts( focus, sampler, additional... ) );
570  }
571 
572  return cpl_algo.relaxation(std::make_pair(t,it), focus, t_sampler.filter(std::make_pair(t,it), v));
573  }
574 
577  template<typename TYPE, class TIME_SAMPLER, typename ... ADDITIONAL>
578  std::vector<point_type>
579  fetch_points( const std::string& attr, const time_type t,
580  const TIME_SAMPLER &t_sampler, bool barrier_enabled = true, ADDITIONAL && ... additional ) {
581  // Only enter barrier on first fetch for time=t
582  if( fetch_t_hist_ != t && barrier_enabled )
583  barrier(t_sampler.get_upper_bound(t));
584 
585  fetch_t_hist_ = t;
586 
587  using vec = std::vector<std::pair<point_type,TYPE> >;
588  std::vector <point_type> return_points;
589 
590  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
591  std::numeric_limits<iterator_type>::lowest());
592 
593  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
594  std::numeric_limits<iterator_type>::lowest());
595  auto end = log.upper_bound(curr_time_upper);
596 
597  if( log.size() == 1 ) end = log.end();
598 
599  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
600  const auto& iter = start->second.find(attr);
601  if( iter == start->second.end() ) continue;
602  const vec& ds = iter->second.template return_data<TYPE>();
603  return_points.reserve(ds.size());
604  for( size_t i=0; i<ds.size(); i++ ) {
605  return_points.emplace_back(ds[i].first);
606  }
607  }
608 
609  return return_points;
610  }
611 
614  template<typename TYPE, class TIME_SAMPLER, typename ... ADDITIONAL>
615  std::vector<point_type>
616  fetch_points( const std::string& attr, const time_type t, const iterator_type it,
617  const TIME_SAMPLER &t_sampler, bool barrier_enabled = true, ADDITIONAL && ... additional ) {
618  // Only enter barrier on first fetch for time=t,iteration=it
619  if( fetch_t_hist_ != t && fetch_i_hist_ != it && barrier_enabled)
620  barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
621 
622  fetch_t_hist_ = t;
623  fetch_i_hist_ = it;
624 
625  using vec = std::vector<std::pair<point_type,TYPE> >;
626  std::vector <point_type> return_points;
627 
628  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
629  t_sampler.get_lower_bound(it)-threshold(it));
630 
631  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
632  t_sampler.get_upper_bound(it)+threshold(it));
633 
634  auto end = log.upper_bound(curr_time_upper);
635 
636  if( log.size() == 1 ) end = log.end();
637 
638  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
639  const auto& iter = start->second.find(attr);
640  if( iter == start->second.end() ) continue;
641  const vec& ds = iter->second.template return_data<TYPE>();
642  return_points.reserve(ds.size());
643  for( size_t i=0; i<ds.size(); i++ ) {
644  return_points.emplace_back(ds[i].first);
645  }
646  }
647 
648  return return_points;
649  }
650 
653  template<typename TYPE, class TIME_SAMPLER, typename ... ADDITIONAL>
654  std::vector<TYPE>
655  fetch_values( const std::string& attr, const time_type t,
656  const TIME_SAMPLER &t_sampler, bool barrier_enabled = true, ADDITIONAL && ... additional ) {
657  // Only enter barrier on first fetch for time=t,iteration=it
658  if( fetch_t_hist_ != t && barrier_enabled )
659  barrier(t_sampler.get_upper_bound(t));
660 
661  fetch_t_hist_ = t;
662 
663  using vec = std::vector<std::pair<point_type,TYPE> >;
664  std::vector<TYPE> return_values;
665 
666  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
667  std::numeric_limits<iterator_type>::lowest());
668  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
669  std::numeric_limits<iterator_type>::lowest());
670 
671  auto end = log.upper_bound(curr_time_upper);
672 
673  if( log.size() == 1 ) end = log.end();
674 
675  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
676  const auto& iter = start->second.find(attr);
677  if( iter == start->second.end() ) continue;
678  const vec& ds = iter->second.template return_data<TYPE>();
679  return_values.reserve(ds.size());
680  for( size_t i=0; i<ds.size(); i++ ) {
681  return_values.emplace_back(ds[i].second);
682  }
683  }
684 
685  return return_values;
686  }
687 
690  template<typename TYPE, class TIME_SAMPLER, typename ... ADDITIONAL>
691  std::vector<TYPE>
692  fetch_values( const std::string& attr, const time_type t, const iterator_type it,
693  const TIME_SAMPLER &t_sampler, bool barrier_enabled = true, ADDITIONAL && ... additional ) {
694  // Only enter barrier on first fetch for time=t,iteration=it
695  if( fetch_t_hist_ != t && fetch_i_hist_ != it && barrier_enabled)
696  barrier(t_sampler.get_upper_bound(t),t_sampler.get_upper_bound(it));
697 
698  fetch_t_hist_ = t;
699  fetch_i_hist_ = it;
700 
701  using vec = std::vector<std::pair<point_type,TYPE> >;
702  std::vector<TYPE> return_values;
703 
704  std::pair<time_type,iterator_type> curr_time_lower(t_sampler.get_lower_bound(t)-threshold(t),
705  t_sampler.get_lower_bound(it)-threshold(it));
706  std::pair<time_type,iterator_type> curr_time_upper(t_sampler.get_upper_bound(t)+threshold(t),
707  t_sampler.get_upper_bound(it)+threshold(it));
708 
709  auto end = log.upper_bound(curr_time_upper);
710 
711  if( log.size() == 1 ) end = log.end();
712 
713  for( auto start = log.lower_bound(curr_time_lower); start != end; ++start ){
714  const auto& iter = start->second.find(attr);
715  if( iter == start->second.end() ) continue;
716  const vec& ds = iter->second.template return_data<TYPE>();
717  return_values.reserve(ds.size());
718  for( size_t i=0; i<ds.size(); i++ ) {
719  return_values.emplace_back(ds[i].second);
720  }
721  }
722 
723  return return_values;
724  }
725 
730  int commit( time_type t, iterator_type it = std::numeric_limits<iterator_type>::lowest() ) {
731  std::pair<time_type, iterator_type> time(t, it);
732 
733  // Check Smart Send if announcement made
734  if ( !smart_send_set_ ) {
735  // Reset all peers to default of enabled
736  std::fill(peer_is_sending.begin(), peer_is_sending.end(), true);
738  smart_send_set_ = true;
739  }
740 
741  if( FIXEDPOINTS ) {
742  // This only happens during the first commit
743  if( push_buffer_pts.size() > 0 ) {
744  comm->send( message::make("points",comm->local_rank(),std::move(push_buffer_pts)),peer_is_sending );
745  initialized_pts_ = true;
746  push_buffer_pts.clear();
747  }
748 
749  // Reset counter for flat point structure
750  fixedPointCount_ = 0;
751 
752  if( push_buffer_raw.size() > 0 ) {
753  comm->send( message::make("rawdata",comm->local_rank(),time,std::move(push_buffer_raw)),peer_is_sending );
754  push_buffer_raw.clear();
755  }
756  }
757  else {
758  if( push_buffer.size() > 0 ) {
759  comm->send( message::make("data",time,std::move(push_buffer)),peer_is_sending );
760  push_buffer.clear();
761  }
762  }
763 
764  comm->send( message::make("timestamp",comm->local_rank(),time),peer_is_sending );
765 
766  return std::count( peer_is_sending.begin(),peer_is_sending.end(),true );
767  }
768 
773  if( (((span_start < t) || almost_equal(span_start, t)) &&
774  ((t < span_timeout) || almost_equal(t, span_timeout))) ) {
775  for( size_t i=0; i < peers.size(); i++ ) {
776  // Check if peer is explicitly disabled
777  if( peers[i].is_recv_disabled() ) {
778  peer_is_sending[i] = false;
779  continue;
780  }
781 
782  // Perform geometric check against defined regions
783  peer_is_sending[i] = peers[i].is_recving( t, current_span );
784  }
785  }
786  else { // Ensure explicitly disabled peers are taken into account if outside Smart Send time bounds
787  for( size_t i=0; i < peers.size(); i++ ) {
788  if( peers[i].is_recv_disabled() ) peer_is_sending[i] = false;
789  }
790  }
791  }
792 
795  void forecast( time_type t, iterator_type it = std::numeric_limits<iterator_type>::lowest()) {
796  std::pair<time_type,iterator_type> time(t,it);
797  comm->send(message::make("forecast", comm->local_rank(), time));
798  }
799 
802  bool is_ready( const std::string& attr, time_type t ) const {
803  using logitem_ref_t = typename decltype(log)::const_reference;
804  return std::any_of(log.begin(), log.end(), [=](logitem_ref_t time_frame) {
805  return time_frame.second.find(attr) != time_frame.second.end(); }) // return false for attributes that don't exist.
806  && std::all_of(peers.begin(), peers.end(), [=](const peer_state& p) {
807  return (p.is_send_disabled()) || (!p.is_sending(t, recv_span)) ||
808  ((((p.current_t() > t) || almost_equal(p.current_t(), t)) || (p.next_t() > t))); });
809  }
810 
813  bool is_ready( const std::string& attr, time_type t, iterator_type it ) const {
814  using logitem_ref_t = typename decltype(log)::const_reference;
815  return std::any_of(log.begin(), log.end(), [=](logitem_ref_t time_frame) {
816  return time_frame.second.find(attr) != time_frame.second.end(); }) // return false for attributes that don't exist.
817  && std::all_of(peers.begin(), peers.end(), [=](const peer_state& p) {
818  return (p.is_send_disabled()) || (!p.is_sending(t, recv_span)) ||
819  ((((p.current_t() > t) || almost_equal(p.current_t(), t)) || (p.next_t() > t)) &&
820  (((p.current_it() > it) || almost_equal(p.current_it(), it)) || (p.current_it() > it))); });
821  }
822 
825  void barrier( time_type t ) {
826  // barrier must be thread-safe because it is called in fetch()
827  std::lock_guard<std::mutex> lock(mutex);
828 
829  auto start = std::chrono::system_clock::now();
830 
831  for(;;) {
832  size_t peers_unblocked = 0;
833  for( size_t p = 0; p < peers.size(); p++ ) {
834  if( peers[p].is_send_disabled() ) { peers_unblocked++; continue; } // Rank disabled, immediate break
835  if( !peers[p].is_sending(t, recv_span) ) { peers_unblocked++; continue; } // Rank disabled due to Smart Send geometry check
836  if( (peers[p].current_t() > t || almost_equal(peers[p].current_t(), t)) || peers[p].next_t() > t ) { // Final time check
837  peers_unblocked++;
838  continue;
839  }
840  }
841  // All peers unblocked, break loop
842  if( peers_unblocked == peers.size() )
843  break;
844  else // Acquire messages
845  acquire();
846  }
847 
848  if( !QUIET ) {
849  if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
850  std::cout << "MUI Warning [uniface.h]: Communication barrier spent over 5 seconds" << std::endl;
851  }
852  }
853  }
854 
858  // barrier must be thread-safe because it is called in fetch()
859  std::lock_guard<std::mutex> lock(mutex);
860 
861  auto start = std::chrono::system_clock::now();
862 
863  for(;;) {
864  size_t peers_unblocked = 0;
865  for( size_t p = 0; p < peers.size(); p++ ) {
866  if( peers[p].is_send_disabled() ) { peers_unblocked++; continue; } // Rank disabled, immediate break
867  if( !peers[p].is_sending(t, recv_span) ) { peers_unblocked++; continue; } // Rank disabled due to Smart Send geometry check
868  if( ((peers[p].current_t() > t || almost_equal(peers[p].current_t(), t)) || peers[p].next_t() > t) && // Final time check
869  ((peers[p].current_it() > it || almost_equal(peers[p].current_it(), it)) || peers[p].next_it() > it) ) {
870  peers_unblocked++;
871  continue;
872  }
873  }
874  // All peers unblocked, break loop
875  if( peers_unblocked == peers.size() )
876  break;
877  else // Acquire messages
878  acquire();
879  }
880 
881  if( !QUIET ) {
882  if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
883  std::cout << "MUI Warning [uniface.h]: Communication barrier spent over 5 seconds" << std::endl;
884  }
885  }
886  }
887 
890  void barrier_ss_send( ) {
891  // barrier must be thread-safe because it is called in fetch()
892  std::lock_guard<std::mutex> lock(mutex);
893 
894  auto start = std::chrono::system_clock::now();
895 
896  for(;;) { // barrier must be thread-safe because it is called in fetch()
897  if( std::all_of(peers.begin(), peers.end(), [=](const peer_state& p) {
898  return (p.ss_send_status()); }) ) break;
899  acquire(); // To avoid infinite-loop when synchronous communication
900  }
901 
902  for(size_t i=0; i<peers.size(); i++) {
903  peers[i].set_ss_send_status(false);
904  }
905 
906  if( !QUIET ) {
907  if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
908  std::cout << "MUI Warning [uniface.h]: Smart Send communication barrier spent over 5 seconds" << std::endl;
909  }
910  }
911  }
912 
915  void barrier_ss_recv( ) {
916  // barrier must be thread-safe because it is called in fetch()
917  std::lock_guard<std::mutex> lock(mutex);
918 
919  auto start = std::chrono::system_clock::now();
920 
921  for(;;) { // barrier must be thread-safe because it is called in fetch()
922  if( std::all_of(peers.begin(), peers.end(), [=](const peer_state& p) {
923  return (p.ss_recv_status()); }) ) break;
924  acquire(); // To avoid infinite-loop when synchronous communication
925  }
926 
927  for(size_t i=0; i<peers.size(); i++) {
928  peers[i].set_ss_recv_status(false);
929  }
930 
931  if( (std::chrono::system_clock::now() - start) > std::chrono::seconds(5) ) {
932  if( !QUIET )
933  std::cout << "MUI Warning [uniface.h]: Smart Send communication barrier spent over 5 seconds" << std::endl;
934  }
935  }
936 
939  void announce_send_span( time_type start, time_type timeout, span_t s, bool synchronised = false) {
940  span_start = start;
941  span_timeout = timeout;
942  current_span.swap(s);
943  comm->send(message::make("sendingSpan", comm->local_rank(), start, timeout, std::move(current_span)));
944  if( synchronised ) barrier_ss_send();
945  smart_send_set_ = false;
946  }
947 
950  void announce_send_disable( bool synchronised = false ) {
951  comm->send(message::make("sendingDisable", comm->local_rank()));
952  if( synchronised ) barrier_ss_send();
953  }
954 
957  void announce_recv_span( time_type start, time_type timeout, span_t s, bool synchronised = false ) {
958  recv_start = start;
959  recv_timeout = timeout;
960  recv_span.swap(s);
961  comm->send(message::make("receivingSpan", comm->local_rank(), start, timeout, std::move(recv_span)));
962  if( synchronised ) barrier_ss_recv();
963  smart_send_set_ = false;
964  }
965 
968  void announce_recv_disable( bool synchronised = false ) {
969  comm->send(message::make("receivingDisable", comm->local_rank()));
970  if( synchronised ) barrier_ss_recv();
971  }
972 
975  void forget( time_type last, bool reset_log = false ) {
976  std::pair<time_type,iterator_type> upper_limit(last+threshold(last),
977  std::numeric_limits<iterator_type>::lowest());
978 
979  log.erase(log.begin(), log.upper_bound(upper_limit));
980 
981  if( reset_log ) {
982  std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
983  std::numeric_limits<iterator_type>::lowest());
984 
985  if( !log.empty() ) curr_time = log.rbegin()->first;
986 
987  for( size_t i=0; i < peers.size(); i++ ) {
988  peers[i].set_current_t(curr_time.first);
989  peers[i].set_current_sub(curr_time.second);
990  }
991  }
992 
993  fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
994  }
995 
998  void forget( std::pair<time_type,iterator_type> last, bool reset_log = false ) {
999  std::pair<time_type,iterator_type> upper_limit(last.first+threshold(last.first),
1000  last.second+threshold(last.second));
1001 
1002  log.erase(log.begin(), log.upper_bound(upper_limit));
1003 
1004  if( reset_log ) {
1005  std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
1006  std::numeric_limits<iterator_type>::lowest());
1007 
1008  if( !log.empty() ) curr_time = log.rbegin()->first;
1009 
1010  for( size_t i=0; i < peers.size(); i++ ) {
1011  peers[i].set_current_t(curr_time.first);
1012  peers[i].set_current_sub(curr_time.second);
1013  }
1014  }
1015 
1016  fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1017  fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
1018  }
1019 
1022  void forget( time_type first, time_type last, bool reset_log = false ) {
1023  std::pair<time_type,iterator_type> lower_limit(first-threshold(first),
1024  std::numeric_limits<iterator_type>::lowest());
1025  std::pair<time_type,iterator_type> upper_limit(last+threshold(last),
1026  std::numeric_limits<iterator_type>::lowest());
1027 
1028  log.erase(log.lower_bound(lower_limit), log.upper_bound(upper_limit));
1029 
1030  if( reset_log ) {
1031  std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
1032  std::numeric_limits<iterator_type>::lowest());
1033 
1034  if( !log.empty() ) curr_time = log.rbegin()->first;
1035 
1036  for( size_t i=0; i < peers.size(); i++ ) {
1037  peers[i].set_current_t(curr_time.first);
1038  peers[i].set_current_sub(curr_time.second);
1039  }
1040  }
1041 
1042  fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1043  }
1044 
1047  void forget( std::pair<time_type,iterator_type> first, std::pair<time_type,iterator_type> last, bool reset_log = false ) {
1048  std::pair<time_type,iterator_type> lower_limit(first.first-threshold(first.first),
1049  first.second-threshold(first.second));
1050  std::pair<time_type,iterator_type> upper_limit(last.first+threshold(last.first),
1051  last.second+threshold(last.second));
1052 
1053  log.erase(log.lower_bound(lower_limit), log.upper_bound(upper_limit));
1054 
1055  if( reset_log ) {
1056  std::pair<time_type,iterator_type> curr_time(std::numeric_limits<time_type>::lowest(),
1057  std::numeric_limits<iterator_type>::lowest());
1058 
1059  if( !log.empty() ) curr_time = log.rbegin()->first;
1060 
1061  for( size_t i=0; i<peers.size(); i++ ) {
1062  peers[i].set_current_t(curr_time.first);
1063  peers[i].set_current_sub(curr_time.second);
1064  }
1065  }
1066 
1067  fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1068  fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
1069  }
1070 
1074  memory_length = length;
1075 
1076  fetch_t_hist_ = std::numeric_limits<time_type>::lowest();
1077  fetch_i_hist_ = std::numeric_limits<iterator_type>::lowest();
1078  }
1079 
1082  std::string uri_host() {
1083  return comm->uri_host();
1084  }
1085 
1088  std::string uri_path() {
1089  return comm->uri_path();
1090  }
1091 
1094  std::string uri_protocol() {
1095  return comm->uri_protocol();
1096  }
1097 
1098 private:
1101  void acquire() {
1102  message m = comm->recv();
1103  if( m.has_id() ) readers[m.id()](m);
1104  }
1105 
1108  void on_recv_confirm( int32_t sender, std::pair<time_type,iterator_type> timestamp ) {
1109  peers[sender].set_current_t(timestamp.first);
1110  peers[sender].set_current_sub(timestamp.second);
1111  }
1112 
1115  void on_recv_forecast( int32_t sender, std::pair<time_type,iterator_type> timestamp ) {
1116  peers[sender].set_next_t(timestamp.first);
1117  peers[sender].set_next_sub(timestamp.second);
1118  }
1119 
1122  void on_recv_data( std::pair<time_type,iterator_type> timestamp, frame_type frame ) {
1123  auto itr = log.find(timestamp);
1124 
1125  if( itr == log.end() )
1126  std::tie(itr,std::ignore) = log.insert(std::make_pair(timestamp,bin_frame_type()));
1127 
1128  auto& cur = itr->second;
1129 
1130  for( auto& p: frame ){
1131  auto pstr = cur.find(p.first);
1132  if( pstr == cur.end() ) cur.insert(std::make_pair(std::move(p.first),spatial_t(std::move(p.second))));
1133  else pstr->second.insert(p.second);
1134  }
1135 
1136  log.erase(log.begin(), log.upper_bound({timestamp.first-memory_length, timestamp.second}));
1137  }
1138 
1141  void on_recv_rawdata( int32_t sender, std::pair<time_type,iterator_type> timestamp, frame_raw_type frame ) {
1142  on_recv_data( timestamp, associate( sender, frame ) );
1143  }
1144 
1147  void on_recv_span( int32_t sender, time_type start, time_type timeout, span_t s ) {
1148  peers[sender].set_recving(start,timeout,std::move(s));
1149  peers[sender].set_ss_recv_status(true);
1150  }
1151 
1154  void on_send_span( int32_t sender, time_type start, time_type timeout, span_t s ) {
1155  peers[sender].set_sending(start,timeout,std::move(s));
1156  peers[sender].set_ss_send_status(true);
1157  }
1158 
1161  void on_recv_disable( int32_t sender ) {
1162  peers[sender].set_recv_disable();
1163  peers[sender].set_ss_recv_status(true);
1164  peer_is_sending[sender] = false;
1165  }
1166 
1169  void on_send_disable( int32_t sender ) {
1170  peers[sender].set_send_disable();
1171  peers[sender].set_ss_send_status(true);
1172  }
1173 
1176  void on_recv_points( int32_t sender, std::vector<point_type> points ) {
1177  peers[sender].set_pts(points);
1178  }
1179 
1182  void on_recv_assignedVals( std::string attr, storage_single_t data ) {
1183  typename std::unordered_map<std::string, storage_single_t >::iterator it = assigned_values.find(attr);
1184  if (it != assigned_values.end())
1185  it->second = data;
1186  else
1187  assigned_values.insert( std::pair<std::string, storage_single_t>( attr, data ) );
1188  }
1189 
1192  inline frame_type associate( int32_t sender, frame_raw_type& frame ) {
1193  frame_type buf;
1194  const auto& pts = peers[sender].pts();
1195 
1196  for( auto& p: frame ) {
1197  const auto& data = storage_cast<const std::vector<std::pair<size_t,REAL> >&>(p.second);
1198 
1199  buf.insert(std::make_pair(p.first, storage_t(std::vector<std::pair<point_type,REAL> >())));
1200  std::vector<std::pair<point_type,REAL> >& data_store = storage_cast<std::vector<std::pair<point_type,REAL> >&>(buf[p.first]);
1201 
1202  data_store.resize(data.size());
1203 
1204  for( size_t i=0; i<data.size(); i++ ) {
1205  data_store[i].first = pts[data[i].first];
1206  data_store[i].second = data[i].second;
1207  }
1208  }
1209 
1210  return buf;
1211  }
1212 };
1213 
1214 }
1215 
1216 #endif // _UNIFACE_H
Structures and methods to create an underlying binning structure for data received through an interfa...
Definition: comm.h:55
Definition: geometry.h:92
Definition: uniface.h:80
void forget(time_type first, time_type last, bool reset_log=false)
Removes log between [@first, @last].
Definition: uniface.h:1022
void forget(std::pair< time_type, iterator_type > first, std::pair< time_type, iterator_type > last, bool reset_log=false)
Removes log between [[@first.first,@first.second], [@last.first,@last.second]].
Definition: uniface.h:1047
void forget(time_type last, bool reset_log=false)
Removes log between (-inf, @last].
Definition: uniface.h:975
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, const COUPLING_ALGO &cpl_algo, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface with coupling algorithms, blocking with barrier at time=t.
Definition: uniface.h:514
static const bool FIXEDPOINTS
Definition: uniface.h:84
void forecast(time_type t, iterator_type it=std::numeric_limits< iterator_type >::lowest())
Sends a forecast of an upcoming time to remote nodes.
Definition: uniface.h:795
void announce_send_span(time_type start, time_type timeout, span_t s, bool synchronised=false)
Announces to all remote nodes using non-blocking peer-to-peer approach "I'll send this span".
Definition: uniface.h:939
void announce_send_disable(bool synchronised=false)
Announces to all remote nodes "I'm disabled for send".
Definition: uniface.h:950
void barrier_ss_send()
Blocking barrier for Smart Send send values. Initiates receive from remote nodes.
Definition: uniface.h:890
std::string uri_protocol()
Returns the URI protocol for the created uniface.
Definition: uniface.h:1094
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, const iterator_type it, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, const COUPLING_ALGO &cpl_algo, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface with coupling algorithms, blocking with barrier at time=t,...
Definition: uniface.h:546
uniface(std::string const &URI)
Definition: uniface.h:254
bool is_ready(const std::string &attr, time_type t) const
Tests whether data is available at time=t.
Definition: uniface.h:802
typename CONFIG::time_type time_type
Definition: uniface.h:89
uniface(communicator *comm_)
Definition: uniface.h:255
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface, blocking with barrier at time=t.
Definition: uniface.h:449
TYPE fetch(const std::string &attr)
Fetch a single parameter from the interface Overloaded fetch to fetch a single parameter of name attr...
Definition: uniface.h:439
uniface(const uniface &)=delete
typename CONFIG::REAL REAL
Definition: uniface.h:87
uniface & operator=(const uniface &)=delete
typename CONFIG::data_types data_types
Definition: uniface.h:91
int commit(time_type t, iterator_type it=std::numeric_limits< iterator_type >::lowest())
Serializes pushed data and sends it to remote nodes Serializes pushed data and sends it to remote nod...
Definition: uniface.h:730
geometry::any_shape< CONFIG > span_t
Definition: uniface.h:92
SAMPLER::OTYPE fetch(const std::string &attr, const point_type &focus, const time_type t, const iterator_type it, SAMPLER &sampler, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch from the interface, blocking with barrier at time=t,it.
Definition: uniface.h:481
void barrier(time_type t, iterator_type it)
Blocking barrier at time=t,it. Initiates receive from remote nodes.
Definition: uniface.h:857
void push(const std::string &attr, const point_type &loc, const TYPE &value)
Push data with tag "attr" to buffer Push data with tag "attr" to bcuffer. If using CONFIG::FIXEDPOINT...
Definition: uniface.h:300
void barrier(time_type t)
Blocking barrier at time=t. Initiates receive from remote nodes.
Definition: uniface.h:825
typename CONFIG::point_type point_type
Definition: uniface.h:88
std::vector< TYPE > fetch_values(const std::string &attr, const time_type t, const iterator_type it, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch values currently stored in the interface, blocking with barrier at time=t,it.
Definition: uniface.h:692
std::string uri_path()
Returns the URI path (name) for the created uniface.
Definition: uniface.h:1088
void barrier_ss_recv()
Blocking barrier for Smart Send receive values. Initiates receive from remote nodes.
Definition: uniface.h:915
static const int D
Definition: uniface.h:83
static const bool QUIET
Definition: uniface.h:85
std::vector< TYPE > fetch_values(const std::string &attr, const time_type t, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch values currently stored in the interface, blocking with barrier at time=t.
Definition: uniface.h:655
std::vector< point_type > fetch_points(const std::string &attr, const time_type t, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch points currently stored in the interface, blocking with barrier at time=t.
Definition: uniface.h:579
typename CONFIG::iterator_type iterator_type
Definition: uniface.h:90
std::string uri_host()
Returns the URI host (domain) for the created uniface.
Definition: uniface.h:1082
void set_memory(time_type length)
Removes log between (-inf, current-@length] automatically.
Definition: uniface.h:1073
void push(const std::string &attr, const TYPE &value)
Announce the value value with the parameter attr Useful if, for example, you wish to pass a parameter...
Definition: uniface.h:291
uniface(const char URI[])
Definition: uniface.h:253
void announce_recv_span(time_type start, time_type timeout, span_t s, bool synchronised=false)
Announces to all remote nodes using non-blocking peer-to-peer approach "I'm receiving this span".
Definition: uniface.h:957
std::vector< point_type > fetch_points(const std::string &attr, const time_type t, const iterator_type it, const TIME_SAMPLER &t_sampler, bool barrier_enabled=true, ADDITIONAL &&... additional)
Fetch points currently stored in the interface, blocking with barrier at time=t,it.
Definition: uniface.h:616
void announce_recv_disable(bool synchronised=false)
Announces to all remote nodes "I'm disabled for receive".
Definition: uniface.h:968
bool is_ready(const std::string &attr, time_type t, iterator_type it) const
Tests whether data is available at time=t,it.
Definition: uniface.h:813
void update_smart_send(time_type t)
Updates Smart Send locality data Creates a new comm rank mapping for Smart Send functionality.
Definition: uniface.h:772
void forget(std::pair< time_type, iterator_type > last, bool reset_log=false)
Removes log between ([-inf,-inf], [@last.first,@last.second]].
Definition: uniface.h:998
File containing class definition of communication interface. This is the base class for all other com...
Structures and methods to create a new communicator based on chosen protocols.
char buf[BUFSIZE]
Definition: comm_tcp.h:281
File containing data structures defining all data types used by an interface.
Implementation of a compound dynamic data structure used throughout MUI.
Structure for communicator used in comm_factory.h.
Structure to contain and manipulate data from internal data to MPI message.
dim< 0, 0, 1, 0, 0, 0, 0, 0 > time
Definition: dim.h:207
u u m
Definition: dim.h:281
dim< 0, 1, 0, 0, 0, 0, 0, 0 > length
Definition: dim.h:206
Definition: comm.h:54
bool almost_equal(T x, T y)
Definition: util.h:128
T threshold(T x)
Definition: util.h:146
bool collide(const span< CONFIG > &lhs, const span< CONFIG > &rhs)
Definition: span.h:120
SCALAR max(vexpr< E, SCALAR, D > const &u)
Definition: point.h:350
Creates a structure to parse a message as variables and pass them to a function as arguments.
Defines the spatial_storage data type.
Defines base stream class container_stream and associated functors.
Defines the stream in/out for std::string data type.
Defines the stream in/out for the unordered std::unordered_map. data type.
Defines the stream in/out for std::vector data type.
Definition: comm_factory.h:61
Definition: message.h:61
static message make(const id_type &id, types &&... data)
Definition: message.h:72
std::string id_type
Definition: message.h:63
Definition: reader_variable.h:64
Definition: dynstorage.h:146
Definition: config.h:57
Provides a number of utility functions used through the rest of the library.