55 #include <sys/types.h>
56 #include <sys/socket.h>
58 #include <netinet/in.h>
59 #include <arpa/inet.h>
61 #include <sys/epoll.h>
62 #include <sys/eventfd.h>
66 #include <condition_variable>
91 if(
byte <= 0x7fu )
return 1;
92 else if(
byte <= 0xbfu )
return 2;
93 else if(
byte <= 0xdfu )
return 3;
94 else if(
byte <= 0xefu )
return 4;
95 else if(
byte <= 0xf7u )
return 5;
96 else if(
byte <= 0xfbu )
return 6;
97 else if(
byte <= 0xfdu )
return 7;
98 else if(
byte <= 0xfeu )
return 8;
109 if(
byte <= 0x7fu ) { size = 1; num =
byte&0xffu; }
110 else if(
byte <= 0xbfu ) { size = 2; num =
byte&0x3fu; }
111 else if(
byte <= 0xdfu ) { size = 3; num =
byte&0x1fu; }
112 else if(
byte <= 0xefu ) { size = 4; num =
byte&0x0fu; }
113 else if(
byte <= 0xf7u ) { size = 5; num =
byte&0x07u; }
114 else if(
byte <= 0xfbu ) { size = 6; num =
byte&0x03u; }
115 else if(
byte <= 0xfdu ) { size = 7; num =
byte&0x01u; }
116 else if(
byte <= 0xfeu ) size = 8;
119 for(
int i=0; i<size-1; ++i ){
129 uint64_t num = sml.
num;
130 unsigned char bytes[9];
132 if( num <= 0x000000000000007full ) size = 1;
133 else if( num <= 0x0000000000003fffull ) size = 2;
134 else if( num <= 0x00000000001fffffull ) size = 3;
135 else if( num <= 0x000000000fffffffull ) size = 4;
136 else if( num <= 0x00000007ffffffffull ) size = 5;
137 else if( num <= 0x000003ffffffffffull ) size = 6;
138 else if( num <= 0x0001ffffffffffffull ) size = 7;
139 else if( num <= 0x00ffffffffffffffull ) size = 8;
142 for(
int j=size-1; j>=0; --j ){
143 bytes[j] = (num&0xffull);
146 bytes[0] |= (0xfff00ull>>(size-1));
147 stream.
write((
char*)bytes,size);
153 static const std::size_t MUI_TCP_BUF_SIZE = 4000;
154 int SYSCHECK(
int value){
156 if(value == -1)
throw std::system_error(err, std::system_category());
162 mutex_timeout(): std::runtime_error(
"tcp error: lock time out. May be dead locked.") {}
183 operator int ()
const {
return fd_; }
190 void checked_close_(){
if( fd_ ) close(fd_); }
200 static const uint64_t
IN = 1;
201 static const uint64_t
OUT = 4;
202 static const uint64_t
CALL = 32;
203 static const uint64_t
ET = 1u<<31;
209 callbacks_.reserve(
max+1);
211 epfd_ = SYSCHECK(epoll_create(
max));
214 SYSCHECK(pipe(pipes));
215 pipe_read_.
reset(pipes[0]);
216 pipe_write_.
reset(pipes[1]);
217 add(pipe_read_,
IN, std::bind(&poll_scheduler::pop_event_,
this));
221 for(
auto& a: callbacks_ )
222 SYSCHECK(epoll_ctl(epfd_, EPOLL_CTL_DEL, a.first, NULL));
229 buf_.resize(callbacks_.size());
230 int nfd = epoll_wait(epfd_, buf_.data(), buf_.size(), 100);
232 if( nfd == 0 )
return;;
233 if( nfd == -1 && err == EAGAIN )
return;
234 else if( nfd == -1 )
throw std::system_error(err,std::system_category());
235 for(
int i=0; i<nfd; ++i )
236 callbacks_[buf_[i].data.fd](translate_(buf_[i].events));
240 callbacks_.emplace(fd, std::move(callback));
243 ev.events = translate_from_(default_events) | EPOLLET;
245 SYSCHECK(epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev));
247 callbacks_.erase(callbacks_.find(fd));
252 SYSCHECK(write(pipe_write_, &fd,
sizeof(
int)));
257 SYSCHECK(read(pipe_read_, &fd,
sizeof(
int)));
259 callbacks_[fd](
CALL);
261 uint64_t translate_(uint64_t ev) {
262 return (ev&EPOLLIN?
IN:0u) | (ev&EPOLLOUT?
OUT:0u) | (ev&EPOLLET?
ET:0u);
264 uint64_t translate_from_(uint64_t ps_ev) {
265 return (ps_ev&
IN?EPOLLIN:0u) | (ps_ev&
OUT?EPOLLOUT:0u) |(ps_ev&
ET?EPOLLET:0u);
268 std::unordered_map<int, callback_t> callbacks_;
269 std::vector<epoll_event> buf_;
271 unique_fd_ pipe_read_, pipe_write_;
279 struct read_buffer : istream {
280 static const int BUFSIZE = MUI_TCP_BUF_SIZE;
281 struct node_t {
char buf[BUFSIZE]; };
283 std::pair<int,char*> get_buffer() {
290 void move_tail(
int size) {
296 void read(
char* dest, std::size_t size) {
319 std::size_t size()
const {
return size_; }
334 : fd_(std::move(fd)), callback_(std::move(callback)) {}
340 std::pair<int,char*> p = buf_.get_buffer();
341 ssize_t r = read(fd_, p.second, p.first);
349 if( buf_.size() < smlsize)
continue;
354 if( buf_.size() < smlsize + vecsize.
num )
continue;
358 callback_(std::move(msg));
361 }
else if( r == -1 && (err == EAGAIN || err == EWOULDBLOCK) ){
363 }
else if( r == -1 )
throw std::system_error(err,std::system_category());
364 else throw std::runtime_error(
"tcp connection is broken.\n");
370 std::function<void(
message)> callback_;
380 send_.swap(rhs.send_);
387 auto iter = send_.begin();
388 auto end = send_.end();
390 if(iter == end)
break;
393 std::size_t sz = p.second.size() - p.first;
394 char*
buf = p.second.data() + p.first;
395 ssize_t r = write(fd_,
buf, sz);
398 std::lock_guard<std::mutex> lock(mutex_);
402 }
else if( r == -1 && (err == EAGAIN || err == EWOULDBLOCK) ){
404 }
else if( r == -1 )
throw std::system_error(err,std::system_category());
405 else throw std::runtime_error(
"tcp connection is broken.\n");
408 void push( std::vector<char> data ){
410 throw std::range_error(
"cannot send data larger than ssize_t");
411 if( !data.empty() ) {
412 std::lock_guard<std::mutex> lock(mutex_);
413 send_.emplace_back(0u, std::move(data));
420 std::list<std::pair<std::size_t,std::vector<char> > > send_;
426 : local_rank_(
local_rank), local_size_(
local_size), remote_size_(wfds.size()), poll_(rfds.size()+wfds.size()) {
427 assert(wfds.size() == rfds.size());
430 rques_.reserve(rfds.size());
431 wques_.reserve(wfds.size());
433 for(
auto& r: rfds) rques_.emplace_back(std::move(r),std::bind(&comm_fd::push_msg_,
this,std::placeholders::_1));
436 for(
auto& w: wfds) wques_.emplace_back(std::move(w));
440 std::thread th = std::thread(std::bind(&comm_fd::run_,
this));
466 std::unique_lock<std::mutex> lock(recv_mutex_);
467 recv_cv_.wait(lock, [=](){
return !mesgs_.empty(); });
468 message msg = std::move(mesgs_.front());
474 std::unique_lock<std::mutex> lock(recv_mutex_);
475 mesgs_.emplace_back(std::move(msg));
476 recv_cv_.notify_one();
486 int local_rank_, local_size_, remote_size_;
489 std::atomic_bool die_;
491 poll_scheduler poll_;
492 std::vector<read_que> rques_;
493 std::vector<write_que> wques_;
495 std::mutex recv_mutex_;
496 std::condition_variable recv_cv_;
497 std::list<message> mesgs_;
502 bool is_server = u.
host().empty();
504 struct free_addrinfo_ {
void operator()( addrinfo* ptr )
const { freeaddrinfo(ptr); } };
506 std::unique_ptr<addrinfo,free_addrinfo_> info;
507 addrinfo hint, *tmp=NULL;
509 std::memset((
char*)&hint, 0,
sizeof(hint));
510 hint.ai_family = AF_INET;
511 hint.ai_socktype = SOCK_STREAM;
512 hint.ai_flags = AI_NUMERICSERV|(is_server?AI_PASSIVE:0);
514 if(
int err = getaddrinfo(is_server?0:u.
host().c_str(), u.
path().c_str(), &hint, &tmp))
515 throw std::runtime_error(gai_strerror(err));
518 for(; tmp!=NULL; tmp = tmp->ai_next){
519 int s = socket(tmp->ai_family, tmp->ai_socktype, tmp->ai_protocol);
520 if(s == -1 )
continue;
524 if( bind(sock, tmp->ai_addr, tmp->ai_addrlen) != -1 )
break;
526 if( connect(sock, tmp->ai_addr, tmp->ai_addrlen) != -1 )
break;
529 if(tmp == NULL)
throw std::runtime_error(
"tcp connection error.");
534 SYSCHECK(listen(sock,1));
535 wfd.
reset(SYSCHECK(accept(sock,0,0)));
536 SYSCHECK(fcntl(wfd,F_SETFL,O_NONBLOCK));
537 rfd.
reset(SYSCHECK(dup(wfd)));
539 SYSCHECK(fcntl(sock,F_SETFL,O_NONBLOCK));
541 rfd.
reset(SYSCHECK(dup(wfd)));
543 std::vector<mui::unique_fd_> w; w.emplace_back(std::move(wfd));
544 std::vector<mui::unique_fd_> r; r.emplace_back(std::move(rfd));
552 std::string patha = std::string(
"/dev/shm/") + u.
host() +
"_A";
553 std::string pathb = std::string(
"/dev/shm/") + u.
host() +
"_B";
556 if(mkfifo(patha.c_str(), S_IRUSR | S_IWUSR)) {
558 if(err != EEXIST)
throw std::system_error(err, std::system_category());
562 if(
int ret = mkfifo(pathb.c_str(), S_IRUSR | S_IWUSR))
563 throw std::system_error(errno, std::system_category());
565 unique_fd_ afd = SYSCHECK(open(patha.c_str(), is_wr?O_RDONLY:O_WRONLY));
566 SYSCHECK(fcntl(afd,F_SETFL,O_NONBLOCK));
567 unique_fd_ bfd = SYSCHECK(open(pathb.c_str(), is_wr?O_WRONLY:O_RDONLY));
568 SYSCHECK(fcntl(bfd,F_SETFL,O_NONBLOCK));
570 std::vector<mui::unique_fd_> w; w.emplace_back(std::move(is_wr?bfd:afd));
571 std::vector<mui::unique_fd_> r; r.emplace_back(std::move(is_wr?afd:bfd));
585 std::cerr<<
"GET MESSAGE!!\t" << str << std::endl;
588 int main(
int argc,
char **argv)
Definition: comm_tcp.h:423
void send_impl_(message msg, const std::vector< bool > &dest)
Definition: comm_tcp.h:454
message recv_impl_()
Definition: comm_tcp.h:465
int remote_size() const
Definition: comm_tcp.h:452
comm_fd(int local_rank, int local_size, std::vector< unique_fd_ > &&wfds, std::vector< unique_fd_ > &&rfds)
Definition: comm_tcp.h:425
int local_size() const
Definition: comm_tcp.h:451
int local_rank() const
Definition: comm_tcp.h:450
~comm_fd()
Definition: comm_tcp.h:444
virtual void write(const char *ptr, std::size_t size)=0
Definition: comm_tcp.h:198
poll_scheduler(const poll_scheduler &)=delete
~poll_scheduler()
Definition: comm_tcp.h:220
static const uint64_t IN
Definition: comm_tcp.h:200
void run()
Definition: comm_tcp.h:228
poll_scheduler & operator=(const poll_scheduler &)=delete
poll_scheduler(int max)
Definition: comm_tcp.h:207
static const uint64_t ET
Definition: comm_tcp.h:203
static const uint64_t CALL
Definition: comm_tcp.h:202
void schedule(int fd)
Definition: comm_tcp.h:251
static const uint64_t OUT
Definition: comm_tcp.h:201
std::function< void(uint64_t)> callback_t
Definition: comm_tcp.h:205
void add(int fd, uint64_t default_events, callback_t callback)
Definition: comm_tcp.h:239
Definition: comm_tcp.h:331
void try_recv(int)
Definition: comm_tcp.h:338
int get_fd() const
Definition: comm_tcp.h:367
read_que(read_que &&)=default
read_que(unique_fd_ &&fd, std::function< void(message)> callback)
Definition: comm_tcp.h:333
read_que & operator=(read_que &&)=default
static dispatcher< std::string, std::function< communicator *(const char[], const bool)> > & instance()
Definition: lib_singleton.h:58
Definition: comm_tcp.h:168
unique_fd_(const unique_fd_ &)=delete
unique_fd_ & operator=(const unique_fd_ &)=delete
void reset(int fd)
Definition: comm_tcp.h:184
~unique_fd_()
Definition: comm_tcp.h:182
unique_fd_ & operator=(unique_fd_ &&rhs)
Definition: comm_tcp.h:176
unique_fd_(int fd=0)
Definition: comm_tcp.h:170
void swap(unique_fd_ &rhs)
Definition: comm_tcp.h:188
unique_fd_(unique_fd_ &&rhs)
Definition: comm_tcp.h:172
const std::string & host() const
Definition: lib_uri.h:64
const std::string & path() const
Definition: lib_uri.h:65
Definition: comm_tcp.h:374
void try_send(int)
Definition: comm_tcp.h:384
write_que(write_que &&rhs)
Definition: comm_tcp.h:377
write_que & operator=(write_que &&rhs)
Definition: comm_tcp.h:378
write_que(unique_fd_ &&fd)
Definition: comm_tcp.h:376
int get_fd() const
Definition: comm_tcp.h:416
void push(std::vector< char > data)
Definition: comm_tcp.h:408
File containing class definition of communication interface. This is the base class for all other com...
Structures and methods to create a new communicator based on chosen protocols.
std::size_t pcur_
Definition: comm_tcp.h:324
std::deque< node_t > nodes_
Definition: comm_tcp.h:321
unsigned head_
Definition: comm_tcp.h:326
unsigned tail_
Definition: comm_tcp.h:327
char buf[BUFSIZE]
Definition: comm_tcp.h:281
std::size_t size_
Definition: comm_tcp.h:323
unsigned ccur_
Definition: comm_tcp.h:325
std::size_t psize_
Definition: comm_tcp.h:322
Base class to contain and manipulate a unique URI (Uniform Resource Identifier).
int main()
Definition: matrix_arithmetics.cpp:206
Structure to contain and manipulate data from internal data to MPI message.
communicator * create_comm_tcp(const char *str)
Definition: comm_tcp.h:500
istream & operator>>(istream &stream, smalluint &sml)
Definition: comm_tcp.h:103
ostream & operator<<(ostream &stream, const smalluint &sml)
Definition: comm_tcp.h:127
oitr_stream< OutputIterator > make_ostream(OutputIterator cur)
Definition: stream.h:142
communicator * create_comm_shm(const char *str)
Definition: comm_tcp.h:550
std::size_t streamed_size()
Definition: stream.h:161
SCALAR max(vexpr< E, SCALAR, D > const &u)
Definition: point.h:350
void swap(storage< Args... > &lhs, storage< Args... > &rhs)
Definition: dynstorage.h:234
iitr_stream< ConstInputIterator > make_istream(ConstInputIterator begin)
Definition: stream.h:118
const char * data() const
Definition: message.h:94
static message make(const id_type &id, types &&... data)
Definition: message.h:72
Definition: comm_tcp.h:161
mutex_timeout()
Definition: comm_tcp.h:162
Definition: comm_tcp.h:88
static int detect_size(unsigned char byte)
Definition: comm_tcp.h:90
uint64_t num
Definition: comm_tcp.h:89