如何实现ceph SimpleMessenger模块消息的接收
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,小编给大家分享一下如何实现ceph SimpleMessenger模块消息的接收,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!OSD服务端消息的接收起始于OSD::init()中的m
千家信息网最后更新 2025年01月24日如何实现ceph SimpleMessenger模块消息的接收
小编给大家分享一下如何实现ceph SimpleMessenger模块消息的接收,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
OSD服务端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函数
|- 358 void add_dispatcher_head(Dispatcher *d) {|| 359 bool first = dispatchers.empty();|| 360 dispatchers.push_front(d);|| 361 if (d->ms_can_fast_dispatch_any())|| 362 fast_dispatchers.push_front(d);|| 363 if (first)|| 364 ready(); //如果dispatcher list空,启动SimpleMessenger::ready,不为空证明SimpleMessenger已经启动了|| 365 }
在SimpleMessenger::ready()中,启动DispatchQueue等待mqueue,如果绑定了端口就启动 accepter接收线程
76 void SimpleMessenger::ready()- 77 {| 78 ldout(cct,10) << "ready " << get_myaddr() << dendl;| 79 dispatch_queue.start(); //启动DispatchQueue,等待mqueue| 80 | 81 lock.Lock();| 82 if (did_bind)| 83 accepter.start();| 84 lock.Unlock();| 85 }
Accepter是Thread的继承类,Accepter::start()最终调用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe类中
void *Accepter::entry(){ ... struct pollfd pfd; pfd.fd = listen_sd; pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { int r = poll(&pfd, 1, -1); if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) break; // accept entity_addr_t addr; socklen_t slen = sizeof(addr.ss_addr()); int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); if (sd >= 0) { errors = 0; ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl; msgr->add_accept_pipe(sd); //注册一个pipe,启动读线程,从该sd中读取数据 } else { ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << cpp_strerror(errno) << dendl; if (++errors > 4) break; } } ... return 0;
在SimpleMessenger::add_accept_pipe(int sd)中,申请一个Pipe类并把sd加入到Pipe中,开始Pipe::start_reader()
340 Pipe *SimpleMessenger::add_accept_pipe(int sd)- 341 { | 342 lock.Lock();| 343 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);| 344 p->sd = sd;| 345 p->pipe_lock.Lock();| 346 p->start_reader();| 347 p->pipe_lock.Unlock();| 348 pipes.insert(p);| 349 accepting_pipes.insert(p);| 350 lock.Unlock();| 351 return p;| 352 }
Pipe类内部有一个Reader和Writer线程类,Pipe::start_reader()启动Pipe::Reader::entry(),最终启动Pipe::reader函数
134 void Pipe::start_reader()- 135 {| 136 assert(pipe_lock.is_locked());| 137 assert(!reader_running);|- 138 if (reader_needs_join) {|| 139 reader_thread.join();|| 140 reader_needs_join = false;|| 141 }| 142 reader_running = true;| 143 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);| 144 }
|- 48 class Reader : public Thread {|| 49 Pipe *pipe;|| 50 public:|| 51 explicit Reader(Pipe *p) : pipe(p) {}|| 52 void *entry() { pipe->reader(); return 0; }|| 53 } reader_thread;
在Pipe::reader函数中根据tag接收不同类型的消息,如果是CEPH_MSGR_TAG_MSG类型消息调用read_message接收消息,并把消息加入到mqueue中
void Pipe::reader(){ pipe_lock.Lock(); if (state == STATE_ACCEPTING) { accept(); //第一次进入此函数处理 assert(pipe_lock.is_locked()); } // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { assert(pipe_lock.is_locked()); ...... ...... else if (tag == CEPH_MSGR_TAG_MSG) { ldout(msgr->cct,20) << "reader got MSG" << dendl; Message *m = 0; int r = read_message(&m, auth_handler.get()); pipe_lock.Lock(); if (!m) { if (r < 0) fault(true); continue; } ...... ...... ...... // note last received message. in_seq = m->get_seq(); cond.Signal(); // wake up writer, to ack this ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; in_q->fast_preprocess(m); //mds 、mon不会进入此函数,预处理 if (delay_thread) { utime_t release; if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { release = m->get_recv_stamp(); release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; } delay_thread->queue(release, m); } else { if (in_q->can_fast_dispatch(m)) { reader_dispatching = true; pipe_lock.Unlock(); in_q->fast_dispatch(m); pipe_lock.Lock(); reader_dispatching = false; if (state == STATE_CLOSED || notify_on_dispatch_done) { // there might be somebody waiting notify_on_dispatch_done = false; cond.Signal(); } } else { //mds进入此else in_q->enqueue(m, m->get_priority(), conn_id); //把接收到的messenger加入到mqueue中 } } } ...... ...... } // reap? reader_running = false; reader_needs_join = true; unlock_maybe_reap(); ldout(msgr->cct,10) << "reader done" << dendl;}
在Pipe::DispatchQueue::enqueue函数中加入到mqueue中
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id){ Mutex::Locker l(lock); ldout(cct,20) << "queue " << m << " prio " << priority << dendl; add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( id, priority, QueueItem(m)); } else { mqueue.enqueue( id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); //唤醒dispatch_queue.start() 启动的dispatchThread,进入entry进行处理}
看完了这篇文章,相信你对"如何实现ceph SimpleMessenger模块消息的接收"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
消息
函数
线程
模块
篇文章
类型
处理
不同
完了
数据
更多
知识
端口
第一次
行业
资讯
资讯频道
频道
中加
服务
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
逆战为什么会显示无法连接服务器
估算软件开发错误个数
mc服务器房子
信阳中等计算机网络技术教程
用什么编写数据库
网络安全 创新标语
达迈互联网科技有限公司
锐士数据库
平板电脑无法登录服务器怎回事
网络安全运维日常工作
华为云金属服务器
西山区软件开发
数据库管理员英文名
苏州第三方软件开发如何收费
了解最新的网络技术要定什么杂志
龙翼网络技术有限公司
js清空网页数据库数据库
数据库原理及应用最新版答案
福建电商软件开发费用
数据库效率函数
每房网络技术有限公司信息
数据库信息怎么显示屏幕
用数据库录入员工信息
做商城软件开发哪家便宜
数据库设计师案例
查看数据库 oltp
运维人员测试训练服务器
网络安全需要学什么技术
我的世界服务器通信加密
学士帽计算机网络技术网课