千家信息网

如何实现ceph SimpleMessenger模块消息的接收

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,小编给大家分享一下如何实现ceph SimpleMessenger模块消息的接收,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!OSD服务端消息的接收起始于OSD::init()中的m
千家信息网最后更新 2025年01月24日如何实现ceph SimpleMessenger模块消息的接收

小编给大家分享一下如何实现ceph SimpleMessenger模块消息的接收,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

OSD服务端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函数

|-   358   void add_dispatcher_head(Dispatcher *d) {||   359     bool first = dispatchers.empty();||   360     dispatchers.push_front(d);||   361     if (d->ms_can_fast_dispatch_any())||   362       fast_dispatchers.push_front(d);||   363     if (first)||   364       ready();     //如果dispatcher list空,启动SimpleMessenger::ready,不为空证明SimpleMessenger已经启动了||   365   }

在SimpleMessenger::ready()中,启动DispatchQueue等待mqueue,如果绑定了端口就启动 accepter接收线程

      76 void SimpleMessenger::ready()-     77 {|     78   ldout(cct,10) << "ready " << get_myaddr() << dendl;|     79   dispatch_queue.start();   //启动DispatchQueue,等待mqueue|     80 |     81   lock.Lock();|     82   if (did_bind)|     83     accepter.start();|     84   lock.Unlock();|     85 }

Accepter是Thread的继承类,Accepter::start()最终调用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe类中

void *Accepter::entry(){  ...  struct pollfd pfd;  pfd.fd = listen_sd;  pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;  while (!done) {    int r = poll(&pfd, 1, -1);    if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))      break;    // accept    entity_addr_t addr;    socklen_t slen = sizeof(addr.ss_addr());    int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);    if (sd >= 0) {      errors = 0;      ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl;            msgr->add_accept_pipe(sd);     //注册一个pipe,启动读线程,从该sd中读取数据    } else {      ldout(msgr->cct,0) << "accepter no incoming connection?  sd = " << sd              << " errno " << errno << " " << cpp_strerror(errno) << dendl;      if (++errors > 4)        break;    }  }   ...  return 0;

在SimpleMessenger::add_accept_pipe(int sd)中,申请一个Pipe类并把sd加入到Pipe中,开始Pipe::start_reader()

     340 Pipe *SimpleMessenger::add_accept_pipe(int sd)-    341 {   |    342   lock.Lock();|    343   Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);|    344   p->sd = sd;|    345   p->pipe_lock.Lock();|    346   p->start_reader();|    347   p->pipe_lock.Unlock();|    348   pipes.insert(p);|    349   accepting_pipes.insert(p);|    350   lock.Unlock();|    351   return p;|    352 }

Pipe类内部有一个Reader和Writer线程类,Pipe::start_reader()启动Pipe::Reader::entry(),最终启动Pipe::reader函数

      134 void Pipe::start_reader()-     135 {|     136   assert(pipe_lock.is_locked());|     137   assert(!reader_running);|-    138   if (reader_needs_join) {||    139     reader_thread.join();||    140     reader_needs_join = false;||    141   }|     142   reader_running = true;|     143   reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);|     144 }
|-    48     class Reader : public Thread {||    49       Pipe *pipe;||    50     public:||    51       explicit Reader(Pipe *p) : pipe(p) {}||    52       void *entry() { pipe->reader(); return 0; }||    53     } reader_thread;

在Pipe::reader函数中根据tag接收不同类型的消息,如果是CEPH_MSGR_TAG_MSG类型消息调用read_message接收消息,并把消息加入到mqueue中

void Pipe::reader(){  pipe_lock.Lock();  if (state == STATE_ACCEPTING) {    accept();      //第一次进入此函数处理    assert(pipe_lock.is_locked());  }  // loop.  while (state != STATE_CLOSED &&         state != STATE_CONNECTING) {    assert(pipe_lock.is_locked());   ......   ......    else if (tag == CEPH_MSGR_TAG_MSG) {      ldout(msgr->cct,20) << "reader got MSG" << dendl;      Message *m = 0;      int r = read_message(&m, auth_handler.get());      pipe_lock.Lock();            if (!m) {        if (r < 0)          fault(true);        continue;      }     ......     ......     ......      // note last received message.      in_seq = m->get_seq();      cond.Signal();  // wake up writer, to ack this            ldout(msgr->cct,10) << "reader got message "               << m->get_seq() << " " << m << " " << *m               << dendl;      in_q->fast_preprocess(m);       //mds 、mon不会进入此函数,预处理      if (delay_thread) {        utime_t release;        if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {          release = m->get_recv_stamp();          release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;          lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;        }        delay_thread->queue(release, m);      } else {        if (in_q->can_fast_dispatch(m)) {          reader_dispatching = true;          pipe_lock.Unlock();          in_q->fast_dispatch(m);          pipe_lock.Lock();          reader_dispatching = false;          if (state == STATE_CLOSED ||              notify_on_dispatch_done) { // there might be somebody waiting            notify_on_dispatch_done = false;            cond.Signal();          }        } else {           //mds进入此else          in_q->enqueue(m, m->get_priority(), conn_id);      //把接收到的messenger加入到mqueue中        }      }    }    ......    ......  }   // reap?  reader_running = false;  reader_needs_join = true;  unlock_maybe_reap();  ldout(msgr->cct,10) << "reader done" << dendl;}

在Pipe::DispatchQueue::enqueue函数中加入到mqueue中

void DispatchQueue::enqueue(Message *m, int priority, uint64_t id){  Mutex::Locker l(lock);  ldout(cct,20) << "queue " << m << " prio " << priority << dendl;  add_arrival(m);  if (priority >= CEPH_MSG_PRIO_LOW) {    mqueue.enqueue_strict(        id, priority, QueueItem(m));  } else {    mqueue.enqueue(        id, priority, m->get_cost(), QueueItem(m));  }  cond.Signal();    //唤醒dispatch_queue.start() 启动的dispatchThread,进入entry进行处理}

看完了这篇文章,相信你对"如何实现ceph SimpleMessenger模块消息的接收"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0