Multiscale Universal Interface  2.0
A Concurrent Framework for Coupling Heterogeneous Solvers
comm_tcp.h
Go to the documentation of this file.
1 /*****************************************************************************
2 * Multiscale Universal Interface Code Coupling Library *
3 * *
4 * Copyright (C) 2019 Y. H. Tang, S. Kudo, X. Bian, Z. Li, G. E. Karniadakis *
5 * *
6 * This software is jointly licensed under the Apache License, Version 2.0 *
7 * and the GNU General Public License version 3, you may use it according *
8 * to either. *
9 * *
10 * ** Apache License, version 2.0 ** *
11 * *
12 * Licensed under the Apache License, Version 2.0 (the "License"); *
13 * you may not use this file except in compliance with the License. *
14 * You may obtain a copy of the License at *
15 * *
16 * http://www.apache.org/licenses/LICENSE-2.0 *
17 * *
18 * Unless required by applicable law or agreed to in writing, software *
19 * distributed under the License is distributed on an "AS IS" BASIS, *
20 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
21 * See the License for the specific language governing permissions and *
22 * limitations under the License. *
23 * *
24 * ** GNU General Public License, version 3 ** *
25 * *
26 * This program is free software: you can redistribute it and/or modify *
27 * it under the terms of the GNU General Public License as published by *
28 * the Free Software Foundation, either version 3 of the License, or *
29 * (at your option) any later version. *
30 * *
31 * This program is distributed in the hope that it will be useful, *
32 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
33 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
34 * GNU General Public License for more details. *
35 * *
36 * You should have received a copy of the GNU General Public License *
37 * along with this program. If not, see <http://www.gnu.org/licenses/>. *
38 *****************************************************************************/
39 
47 #ifndef COMM_TCP_H
48 #define COMM_TCP_H
49 
50 
51 #include <cstdio>
52 #include <cstdlib>
53 #include <cstring>
54 #include <sys/stat.h>
55 #include <sys/types.h>
56 #include <sys/socket.h>
57 #include <netdb.h>
58 #include <netinet/in.h>
59 #include <arpa/inet.h>
60 #include <unistd.h>
61 #include <sys/epoll.h>
62 #include <sys/eventfd.h>
63 #include <fcntl.h>
64 #include <thread>
65 #include <mutex>
66 #include <condition_variable>
67 #include <vector>
68 #include <deque>
69 #include <list>
70 #include <exception>
71 #include <atomic>
72 
73 #include "message.h"
74 #include "comm.h"
75 #include "lib_uri.h"
76 #include "comm_factory.h"
77 
78 namespace mui {
79 
80 /*
81  * smalluint is a integer which has variable length depend on
82  * its value. smalluint requires small bytes if the value is smaller.
83  * For example if the value is less than equal to 127, it only requires
84  * one byte.
85  * Format: the byte length is determined by the number of leading ones
86  * of first byte.
87  */
88 struct smalluint {
89  uint64_t num;
90  static int detect_size( unsigned char byte ){
91  if( byte <= 0x7fu ) return 1;
92  else if( byte <= 0xbfu ) return 2;
93  else if( byte <= 0xdfu ) return 3;
94  else if( byte <= 0xefu ) return 4;
95  else if( byte <= 0xf7u ) return 5;
96  else if( byte <= 0xfbu ) return 6;
97  else if( byte <= 0xfdu ) return 7;
98  else if( byte <= 0xfeu ) return 8;
99  else return 9;
100  }
101 };
102 
103 inline istream& operator>> ( istream& stream, smalluint& sml )
104 {
105  unsigned char byte;
106  stream >> byte;
107  uint64_t num = 0u;
108  int size;
109  if( byte <= 0x7fu ) { size = 1; num = byte&0xffu; }
110  else if( byte <= 0xbfu ) { size = 2; num = byte&0x3fu; }
111  else if( byte <= 0xdfu ) { size = 3; num = byte&0x1fu; }
112  else if( byte <= 0xefu ) { size = 4; num = byte&0x0fu; }
113  else if( byte <= 0xf7u ) { size = 5; num = byte&0x07u; }
114  else if( byte <= 0xfbu ) { size = 6; num = byte&0x03u; }
115  else if( byte <= 0xfdu ) { size = 7; num = byte&0x01u; }
116  else if( byte <= 0xfeu ) size = 8;
117  else size = 9;
118 
119  for( int i=0; i<size-1; ++i ){
120  unsigned char cur;
121  stream >> cur;
122  num = (num<<8u)|cur;
123  }
124  sml.num = num;
125  return stream;
126 }
127 inline ostream& operator<< ( ostream& stream, const smalluint& sml )
128 {
129  uint64_t num = sml.num;
130  unsigned char bytes[9];
131  int size;
132  if( num <= 0x000000000000007full ) size = 1;
133  else if( num <= 0x0000000000003fffull ) size = 2;
134  else if( num <= 0x00000000001fffffull ) size = 3;
135  else if( num <= 0x000000000fffffffull ) size = 4;
136  else if( num <= 0x00000007ffffffffull ) size = 5;
137  else if( num <= 0x000003ffffffffffull ) size = 6;
138  else if( num <= 0x0001ffffffffffffull ) size = 7;
139  else if( num <= 0x00ffffffffffffffull ) size = 8;
140  else size = 9;
141 
142  for( int j=size-1; j>=0; --j ){
143  bytes[j] = (num&0xffull);
144  num >>= 8u;
145  }
146  bytes[0] |= (0xfff00ull>>(size-1));
147  stream.write((char*)bytes,size);
148 
149  return stream;
150 }
151 
152 namespace {
153 static const std::size_t MUI_TCP_BUF_SIZE = 4000;
154 int SYSCHECK(int value){ // carefully use this. just like "SYSCHECK(close(fd));"
155  int err = errno;
156  if(value == -1) throw std::system_error(err, std::system_category());
157  return value;
158 }
159 }
160 
161 struct mutex_timeout: std::runtime_error {
162  mutex_timeout(): std::runtime_error("tcp error: lock time out. May be dead locked.") {}
163 };
164 
165 /* unique_fd_
166  * RAII class for file-descripter.
167  */
168 class unique_fd_ {
169 public:
170  unique_fd_(int fd=0): fd_(fd) {}
171  unique_fd_(const unique_fd_&) = delete;
172  unique_fd_(unique_fd_&& rhs): fd_(rhs.fd_) {
173  rhs.fd_ = 0;
174  }
175  unique_fd_& operator=(const unique_fd_&) = delete;
177  checked_close_();
178  fd_ = rhs.fd_;
179  rhs.fd_ = 0;
180  return *this;
181  }
182  ~unique_fd_(){ checked_close_(); }
183  operator int () const { return fd_; }
184  void reset(int fd) {
185  checked_close_();
186  fd_ = fd;
187  }
188  void swap(unique_fd_& rhs) { std::swap(fd_, rhs.fd_); }
189 private:
190  void checked_close_(){ if( fd_ ) close(fd_); }
191  int fd_;
192 };
193 
194 /* poll_scheduler
195  * A wrapper for epoll. It calls callback functions if the fd binded to callback becomes ready.
196  * port to select? kqueue?
197  */
199 public:
200  static const uint64_t IN = 1;
201  static const uint64_t OUT = 4;
202  static const uint64_t CALL = 32;
203  static const uint64_t ET = 1u<<31; // edge triggered
204 
205  typedef std::function<void(uint64_t)> callback_t;
206 
208  buf_.reserve(max+1);
209  callbacks_.reserve(max+1);
210 
211  epfd_ = SYSCHECK(epoll_create(max));
212 
213  int pipes[2];
214  SYSCHECK(pipe(pipes));
215  pipe_read_.reset(pipes[0]);
216  pipe_write_.reset(pipes[1]);
217  add(pipe_read_, IN, std::bind(&poll_scheduler::pop_event_, this));
218  }
219 
221  for( auto& a: callbacks_ )
222  SYSCHECK(epoll_ctl(epfd_, EPOLL_CTL_DEL, a.first, NULL));
223  }
224 
225  poll_scheduler(const poll_scheduler&) = delete;
227 
228  void run() {
229  buf_.resize(callbacks_.size());
230  int nfd = epoll_wait(epfd_, buf_.data(), buf_.size(), 100);
231  int err = errno;
232  if( nfd == 0 ) return;;
233  if( nfd == -1 && err == EAGAIN ) return;
234  else if( nfd == -1 ) throw std::system_error(err,std::system_category());
235  for( int i=0; i<nfd; ++i )
236  callbacks_[buf_[i].data.fd](translate_(buf_[i].events));
237  }
238 
239  void add(int fd, uint64_t default_events, callback_t callback) {
240  callbacks_.emplace(fd, std::move(callback));
241  epoll_event ev;
242  ev.data.fd = fd;
243  ev.events = translate_from_(default_events) | EPOLLET;
244  try {
245  SYSCHECK(epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev));
246  } catch(...) {
247  callbacks_.erase(callbacks_.find(fd));
248  throw;
249  }
250  }
251  void schedule(int fd) {
252  SYSCHECK(write(pipe_write_, &fd, sizeof(int)));
253  }
254 private:
255  void pop_event_() {
256  int fd;
257  SYSCHECK(read(pipe_read_, &fd, sizeof(int)));
258  if( fd != -1 )
259  callbacks_[fd](CALL);
260  }
261  uint64_t translate_(uint64_t ev) {
262  return (ev&EPOLLIN?IN:0u) | (ev&EPOLLOUT?OUT:0u) | (ev&EPOLLET?ET:0u);
263  }
264  uint64_t translate_from_(uint64_t ps_ev) {
265  return (ps_ev&IN?EPOLLIN:0u) | (ps_ev&OUT?EPOLLOUT:0u) |(ps_ev&ET?EPOLLET:0u);
266  }
267 
268  std::unordered_map<int, callback_t> callbacks_;
269  std::vector<epoll_event> buf_;
270  unique_fd_ epfd_;
271  unique_fd_ pipe_read_, pipe_write_;
272 };
273 
274 namespace {
275 /* read_buffer
276  * This manages buffers which will be used by read(3).
277  * Internal use only.
278  */
279 struct read_buffer : istream {
280  static const int BUFSIZE = MUI_TCP_BUF_SIZE;
281  struct node_t { char buf[BUFSIZE]; };
282 
283  std::pair<int,char*> get_buffer() {
284  if(nodes_.empty() || tail_ == BUFSIZE) {
285  nodes_.emplace_back();
286  tail_ = 0u;
287  }
288  return std::make_pair(BUFSIZE-tail_, nodes_.back().buf+tail_);
289  }
290  void move_tail(int size) { // consume bytes. size must be <= get_buffer().first
291  psize_ += size;
292  size_ += size;
293  tail_ += size;
294  }
295 
296  void read(char* dest, std::size_t size) { // read bytes and seek
297  size_ -= size;
298  while(size) {
299  std::size_t sz = std::min<std::size_t>(size, (pcur_!=nodes_.size()-1?BUFSIZE-ccur_:tail_-ccur_));
300  dest = std::copy_n(nodes_[pcur_].buf+ccur_, sz, dest);
301  size -= sz;
302  ccur_ += sz;
303  if( size ) {
304  ++pcur_;
305  ccur_ = 0u;
306  }
307  }
308  }
309  void revert() { // undo seeks
310  pcur_ = 0u;
311  ccur_ = head_;
312  size_ = psize_;
313  }
314  void detach() { // erase the used bytes
315  nodes_.erase(nodes_.begin(), nodes_.begin()+pcur_);
316  head_ = ccur_;
317  psize_ = size_;
318  }
319  std::size_t size() const { return size_; }
320 
321  std::deque<node_t> nodes_;
322  std::size_t psize_ = 0u;
323  std::size_t size_ = 0u;
324  std::size_t pcur_ = 0u;
325  unsigned ccur_ = 0u;
326  unsigned head_ = 0u;
327  unsigned tail_ = 0u;
328 };
329 }
330 
331 class read_que {
332 public:
333  read_que( unique_fd_&& fd, std::function<void(message)> callback )
334  : fd_(std::move(fd)), callback_(std::move(callback)) {}
335  read_que( read_que&& ) = default;
336  read_que& operator=( read_que&& ) = default;
337 
338  void try_recv(int){
339  while(true){
340  std::pair<int,char*> p = buf_.get_buffer();
341  ssize_t r = read(fd_, p.second, p.first);
342  int err = errno;
343  if( r > 0 ){
344  buf_.move_tail(r);
345  char c;
346  buf_ >> c;
347  buf_.revert();
348  std::uint64_t smlsize = smalluint::detect_size(c);
349  if( buf_.size() < smlsize) continue;
350 
351  smalluint vecsize;
352  buf_ >> vecsize;
353  buf_.revert();
354  if( buf_.size() < smlsize + vecsize.num ) continue;
355 
356  message msg;
357  buf_ >> msg;
358  callback_(std::move(msg));
359  buf_.detach();
360  continue;
361  } else if( r == -1 && (err == EAGAIN || err == EWOULDBLOCK) ){
362  break;
363  }else if( r == -1 ) throw std::system_error(err,std::system_category());
364  else throw std::runtime_error("tcp connection is broken.\n");
365  }
366  }
367  int get_fd() const { return fd_; }
368 private:
369  unique_fd_ fd_;
370  std::function<void(message)> callback_;
371  read_buffer buf_;
372 };
373 
374 class write_que {
375 public:
376  explicit write_que(unique_fd_&& fd): fd_(std::move(fd)) {}
377  write_que(write_que&& rhs): fd_(std::move(rhs.fd_)), send_(std::move(send_)) {}
379  std::swap(fd_, rhs.fd_);
380  send_.swap(rhs.send_);
381  return *this;
382  }
383 
384  void try_send(int){
385  while(true) {
386  mutex_.lock();
387  auto iter = send_.begin();
388  auto end = send_.end();
389  mutex_.unlock();
390  if(iter == end) break;
391  auto& p = *iter;
392 
393  std::size_t sz = p.second.size() - p.first;
394  char* buf = p.second.data() + p.first;
395  ssize_t r = write(fd_, buf, sz);
396  int err = errno;
397  if( r == sz ) {
398  std::lock_guard<std::mutex> lock(mutex_);
399  send_.pop_front();
400  } else if( r > 0 ) {
401  p.first += r;
402  } else if( r == -1 && (err == EAGAIN || err == EWOULDBLOCK) ){
403  break;
404  } else if( r == -1 ) throw std::system_error(err,std::system_category());
405  else throw std::runtime_error("tcp connection is broken.\n");
406  }
407  }
408  void push( std::vector<char> data ){
409  if( data.size() > std::numeric_limits<ssize_t>::max() )
410  throw std::range_error("cannot send data larger than ssize_t");
411  if( !data.empty() ) {
412  std::lock_guard<std::mutex> lock(mutex_);
413  send_.emplace_back(0u, std::move(data));
414  }
415  }
416  int get_fd() const { return fd_; }
417 private:
418  unique_fd_ fd_;
419  std::mutex mutex_;
420  std::list<std::pair<std::size_t,std::vector<char> > > send_;
421 };
422 
423 class comm_fd: public communicator {
424 public:
425  comm_fd(int local_rank, int local_size, std::vector<unique_fd_>&& wfds, std::vector<unique_fd_>&& rfds)
426  : local_rank_(local_rank), local_size_(local_size), remote_size_(wfds.size()), poll_(rfds.size()+wfds.size()) {
427  assert(wfds.size() == rfds.size());
428  die_.store(false);
429 
430  rques_.reserve(rfds.size());
431  wques_.reserve(wfds.size());
432 
433  for(auto& r: rfds) rques_.emplace_back(std::move(r),std::bind(&comm_fd::push_msg_,this,std::placeholders::_1));
434  for(auto& q: rques_)
435  poll_.add(q.get_fd(), poll_scheduler::IN | poll_scheduler::ET, std::bind(&read_que::try_recv,&q,std::placeholders::_1));
436  for(auto& w: wfds) wques_.emplace_back(std::move(w));
437  for(auto& q: wques_)
438  poll_.add(q.get_fd(), poll_scheduler::OUT | poll_scheduler::ET, std::bind(&write_que::try_send,&q,std::placeholders::_1));
439 
440  std::thread th = std::thread(std::bind(&comm_fd::run_, this));
441  thread_.swap(th);
442  }
443 
445  die_.store(true);
446  poll_.schedule(-1);
447  thread_.join();
448  }
449 
450  int local_rank() const { return local_rank_; }
451  int local_size() const { return local_size_; }
452  int remote_size() const { return remote_size_; }
453 protected:
454  void send_impl_(message msg, const std::vector<bool>& dest) {
455  std::vector<char> v(streamed_size(msg));
456  auto stream = make_ostream(v.begin());
457  stream << msg;
458  for( int i=0; i<remote_size(); ++i ) {
459  if(dest[i]){
460  wques_[i].push(v);
461  poll_.schedule(wques_[i].get_fd());
462  }
463  }
464  }
466  std::unique_lock<std::mutex> lock(recv_mutex_);
467  recv_cv_.wait(lock, [=](){ return !mesgs_.empty(); });
468  message msg = std::move(mesgs_.front());
469  mesgs_.pop_front();
470  return msg;
471  }
472 private:
473  void push_msg_(message msg) {
474  std::unique_lock<std::mutex> lock(recv_mutex_);
475  mesgs_.emplace_back(std::move(msg));
476  recv_cv_.notify_one(); // only ONE thrad can cann comm_fd.recv_impl_();
477  }
478 
479  void run_() {
480  while(true){
481  if( die_ ) break;
482  poll_.run();
483  }
484  }
485 
486  int local_rank_, local_size_, remote_size_;
487 
488  std::thread thread_;
489  std::atomic_bool die_;
490 
491  poll_scheduler poll_;
492  std::vector<read_que> rques_;
493  std::vector<write_que> wques_;
494 
495  std::mutex recv_mutex_;
496  std::condition_variable recv_cv_;
497  std::list<message> mesgs_;
498 };
499 
500 inline communicator* create_comm_tcp( const char* str ){
501  uri u(str);
502  bool is_server = u.host().empty();
503 
504  struct free_addrinfo_ { void operator()( addrinfo* ptr ) const { freeaddrinfo(ptr); } };
505  unique_fd_ sock;
506  std::unique_ptr<addrinfo,free_addrinfo_> info;
507  addrinfo hint, *tmp=NULL;
508 
509  std::memset((char*)&hint, 0, sizeof(hint));
510  hint.ai_family = AF_INET; // use ip v4
511  hint.ai_socktype = SOCK_STREAM; // TCP/IP
512  hint.ai_flags = AI_NUMERICSERV|(is_server?AI_PASSIVE:0);
513 
514  if(int err = getaddrinfo(is_server?0:u.host().c_str(), u.path().c_str(), &hint, &tmp))
515  throw std::runtime_error(gai_strerror(err));
516  info.reset(tmp);
517 
518  for(; tmp!=NULL; tmp = tmp->ai_next){
519  int s = socket(tmp->ai_family, tmp->ai_socktype, tmp->ai_protocol);
520  if(s == -1 ) continue;
521  sock.reset(s);
522 
523  if( is_server ){
524  if( bind(sock, tmp->ai_addr, tmp->ai_addrlen) != -1 ) break;
525  } else {
526  if( connect(sock, tmp->ai_addr, tmp->ai_addrlen) != -1 ) break;
527  }
528  }
529  if(tmp == NULL) throw std::runtime_error("tcp connection error.");
530 
531  unique_fd_ wfd, rfd;
532 
533  if( is_server ){
534  SYSCHECK(listen(sock,1));
535  wfd.reset(SYSCHECK(accept(sock,0,0)));
536  SYSCHECK(fcntl(wfd,F_SETFL,O_NONBLOCK));
537  rfd.reset(SYSCHECK(dup(wfd)));
538  } else {
539  SYSCHECK(fcntl(sock,F_SETFL,O_NONBLOCK));
540  wfd.swap(sock);
541  rfd.reset(SYSCHECK(dup(wfd)));
542  }
543  std::vector<mui::unique_fd_> w; w.emplace_back(std::move(wfd));
544  std::vector<mui::unique_fd_> r; r.emplace_back(std::move(rfd));
545  return static_cast<communicator*>(new comm_fd(0,1,std::move(w),std::move(r)));
546 }
547 
548 const static bool comm_tcp_registered_ = comm_factory::instance().link( "tcp", create_comm_tcp );
549 
550 inline communicator* create_comm_shm( const char* str ){
551  uri u(str);
552  std::string patha = std::string("/dev/shm/") + u.host() + "_A";
553  std::string pathb = std::string("/dev/shm/") + u.host() + "_B";
554  bool is_wr = false;
555 
556  if(mkfifo(patha.c_str(), S_IRUSR | S_IWUSR)) {
557  int err = errno;
558  if(err != EEXIST) throw std::system_error(err, std::system_category());
559  is_wr = true;
560  }
561  if(!is_wr)
562  if(int ret = mkfifo(pathb.c_str(), S_IRUSR | S_IWUSR))
563  throw std::system_error(errno, std::system_category());
564 
565  unique_fd_ afd = SYSCHECK(open(patha.c_str(), is_wr?O_RDONLY:O_WRONLY));
566  SYSCHECK(fcntl(afd,F_SETFL,O_NONBLOCK));
567  unique_fd_ bfd = SYSCHECK(open(pathb.c_str(), is_wr?O_WRONLY:O_RDONLY));
568  SYSCHECK(fcntl(bfd,F_SETFL,O_NONBLOCK));
569 
570  std::vector<mui::unique_fd_> w; w.emplace_back(std::move(is_wr?bfd:afd));
571  std::vector<mui::unique_fd_> r; r.emplace_back(std::move(is_wr?afd:bfd));
572  return static_cast<communicator*>(new comm_fd(0,1,std::move(w),std::move(r)));
573 }
574 
575 const static bool comm_shm_registered_ = comm_factory::instance().link( "shm", create_comm_shm);
576 
577 }
578 
579 #if 0
580 void print( mui::message msg )
581 {
582  auto stream = mui::make_istream(msg.data());
583  std::string str;
584  stream >> str;
585  std::cerr<< "GET MESSAGE!!\t" << str << std::endl;
586 }
587 
588 int main(int argc, char **argv)
589 {
590  //std::unique_ptr<mui::communicator> c(mui::create_comm_tcp(argc<2?"tcp:///37129":"tcp://localhost/37129"));
591  std::unique_ptr<mui::communicator> c(mui::create_comm_shm("shm://my_pipe/"));
592 
593  c->send(mui::message::make("test",std::string("Hello, ")));
594  mui::message msg = c->recv();
595  print(msg);
596  sleep(3);
597  return 0;
598 }
599 #endif
600 
601 #endif
Definition: comm_tcp.h:423
void send_impl_(message msg, const std::vector< bool > &dest)
Definition: comm_tcp.h:454
message recv_impl_()
Definition: comm_tcp.h:465
int remote_size() const
Definition: comm_tcp.h:452
comm_fd(int local_rank, int local_size, std::vector< unique_fd_ > &&wfds, std::vector< unique_fd_ > &&rfds)
Definition: comm_tcp.h:425
int local_size() const
Definition: comm_tcp.h:451
int local_rank() const
Definition: comm_tcp.h:450
~comm_fd()
Definition: comm_tcp.h:444
Definition: comm.h:55
Definition: stream.h:61
Definition: stream.h:67
virtual void write(const char *ptr, std::size_t size)=0
Definition: comm_tcp.h:198
poll_scheduler(const poll_scheduler &)=delete
~poll_scheduler()
Definition: comm_tcp.h:220
static const uint64_t IN
Definition: comm_tcp.h:200
void run()
Definition: comm_tcp.h:228
poll_scheduler & operator=(const poll_scheduler &)=delete
poll_scheduler(int max)
Definition: comm_tcp.h:207
static const uint64_t ET
Definition: comm_tcp.h:203
static const uint64_t CALL
Definition: comm_tcp.h:202
void schedule(int fd)
Definition: comm_tcp.h:251
static const uint64_t OUT
Definition: comm_tcp.h:201
std::function< void(uint64_t)> callback_t
Definition: comm_tcp.h:205
void add(int fd, uint64_t default_events, callback_t callback)
Definition: comm_tcp.h:239
Definition: comm_tcp.h:331
void try_recv(int)
Definition: comm_tcp.h:338
int get_fd() const
Definition: comm_tcp.h:367
read_que(read_que &&)=default
read_que(unique_fd_ &&fd, std::function< void(message)> callback)
Definition: comm_tcp.h:333
read_que & operator=(read_que &&)=default
static dispatcher< std::string, std::function< communicator *(const char[], const bool)> > & instance()
Definition: lib_singleton.h:58
Definition: comm_tcp.h:168
unique_fd_(const unique_fd_ &)=delete
unique_fd_ & operator=(const unique_fd_ &)=delete
void reset(int fd)
Definition: comm_tcp.h:184
~unique_fd_()
Definition: comm_tcp.h:182
unique_fd_ & operator=(unique_fd_ &&rhs)
Definition: comm_tcp.h:176
unique_fd_(int fd=0)
Definition: comm_tcp.h:170
void swap(unique_fd_ &rhs)
Definition: comm_tcp.h:188
unique_fd_(unique_fd_ &&rhs)
Definition: comm_tcp.h:172
Definition: lib_uri.h:55
const std::string & host() const
Definition: lib_uri.h:64
const std::string & path() const
Definition: lib_uri.h:65
Definition: comm_tcp.h:374
void try_send(int)
Definition: comm_tcp.h:384
write_que(write_que &&rhs)
Definition: comm_tcp.h:377
write_que & operator=(write_que &&rhs)
Definition: comm_tcp.h:378
write_que(unique_fd_ &&fd)
Definition: comm_tcp.h:376
int get_fd() const
Definition: comm_tcp.h:416
void push(std::vector< char > data)
Definition: comm_tcp.h:408
File containing class definition of communication interface. This is the base class for all other com...
Structures and methods to create a new communicator based on chosen protocols.
std::size_t pcur_
Definition: comm_tcp.h:324
std::deque< node_t > nodes_
Definition: comm_tcp.h:321
unsigned head_
Definition: comm_tcp.h:326
unsigned tail_
Definition: comm_tcp.h:327
char buf[BUFSIZE]
Definition: comm_tcp.h:281
std::size_t size_
Definition: comm_tcp.h:323
unsigned ccur_
Definition: comm_tcp.h:325
std::size_t psize_
Definition: comm_tcp.h:322
Base class to contain and manipulate a unique URI (Uniform Resource Identifier).
int main()
Definition: matrix_arithmetics.cpp:206
Structure to contain and manipulate data from internal data to MPI message.
Definition: comm.h:54
communicator * create_comm_tcp(const char *str)
Definition: comm_tcp.h:500
istream & operator>>(istream &stream, smalluint &sml)
Definition: comm_tcp.h:103
ostream & operator<<(ostream &stream, const smalluint &sml)
Definition: comm_tcp.h:127
oitr_stream< OutputIterator > make_ostream(OutputIterator cur)
Definition: stream.h:142
communicator * create_comm_shm(const char *str)
Definition: comm_tcp.h:550
std::size_t streamed_size()
Definition: stream.h:161
SCALAR max(vexpr< E, SCALAR, D > const &u)
Definition: point.h:350
void swap(storage< Args... > &lhs, storage< Args... > &rhs)
Definition: dynstorage.h:234
iitr_stream< ConstInputIterator > make_istream(ConstInputIterator begin)
Definition: stream.h:118
Definition: message.h:61
const char * data() const
Definition: message.h:94
static message make(const id_type &id, types &&... data)
Definition: message.h:72
Definition: comm_tcp.h:161
mutex_timeout()
Definition: comm_tcp.h:162
Definition: comm_tcp.h:88
static int detect_size(unsigned char byte)
Definition: comm_tcp.h:90
uint64_t num
Definition: comm_tcp.h:89