千家信息网

RadosClient OSDC怎么使用

发表于:2024-11-29 作者:千家信息网编辑
千家信息网最后更新 2024年11月29日,本篇内容介绍了"RadosClient OSDC怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成
千家信息网最后更新 2024年11月29日RadosClient OSDC怎么使用

本篇内容介绍了"RadosClient OSDC怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

RadosClient.h

class librados::RadosClient : public Dispatcher                 //继承自Dispatcher(消息分发类){public:  using Dispatcher::cct;  md_config_t *conf;                     //配置文件private:  enum {    DISCONNECTED,    CONNECTING,    CONNECTED,  } state;                               //网络连接状态  MonClient monclient;                   // monc  Messenger *messenger;                  //网络消息接口  uint64_t instance_id;    //相关消息分发 Dispatcher类的函数重写  bool _dispatch(Message *m);  bool ms_dispatch(Message *m);  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);  void ms_handle_connect(Connection *con);  bool ms_handle_reset(Connection *con);  void ms_handle_remote_reset(Connection *con);  Objecter *objecter;            // Osdc模块中的 用于发送封装好的OP消息  Mutex lock;  Cond cond;  SafeTimer timer;               //定时器  int refcnt;  version_t log_last_version;  rados_log_callback_t log_cb;  void *log_cb_arg;  string log_watch;  int wait_for_osdmap();public:  Finisher finisher;             // 回调函数的类  explicit RadosClient(CephContext *cct_);  ~RadosClient();  int ping_monitor(string mon_id, string *result);  int connect();         // 连接  void shutdown();  int watch_flush();  int async_watch_flush(AioCompletionImpl *c);  uint64_t get_instance_id();  int wait_for_latest_osdmap();  // 根据pool名字或id创建ioctx  int create_ioctx(const char *name, IoCtxImpl **io);  int create_ioctx(int64_t, IoCtxImpl **io);  int get_fsid(std::string *s);  // pool相关操作  int64_t lookup_pool(const char *name);  bool pool_requires_alignment(int64_t pool_id);  int pool_requires_alignment2(int64_t pool_id, bool *requires);  uint64_t pool_required_alignment(int64_t pool_id);  int pool_required_alignment2(int64_t pool_id, uint64_t *alignment);  int pool_get_auid(uint64_t pool_id, unsigned long long *auid);  int pool_get_name(uint64_t pool_id, std::string *auid);  int pool_list(std::list >& ls);  int get_pool_stats(std::list& ls, map& result);  int get_fs_stats(ceph_statfs& result);  /*  -1 was set as the default value and monitor will pickup the right crush rule with below order:    a) osd pool default crush replicated ruleset    b) the first ruleset in crush ruleset    c) error out if no value find  */  // 同步创建pool 和 异步创建pool  int pool_create(string& name, unsigned long long auid=0, int16_t crush_rule=-1);  int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0,                        int16_t crush_rule=-1);  int pool_get_base_tier(int64_t pool_id, int64_t* base_tier);  //同步删除和异步删除  int pool_delete(const char *name);  int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);  int blacklist_add(const string& client_address, uint32_t expire_seconds);  //Mon命令处理,调用monclient.start_mon_command 把命令发送给Mon处理  int mon_command(const vector& cmd, const bufferlist &inbl,                  bufferlist *outbl, string *outs);  int mon_command(int rank,                  const vector& cmd, const bufferlist &inbl,                  bufferlist *outbl, string *outs);  int mon_command(string name,                  const vector& cmd, const bufferlist &inbl,                  bufferlist *outbl, string *outs);  //OSD命令处理,objector->osd_command 把命令发送给OSD处理  int osd_command(int osd, vector& cmd, const bufferlist& inbl,                  bufferlist *poutbl, string *prs);   //PG命令处理,objector->pg_command 把命令发送给PG处理  int pg_command(pg_t pgid, vector& cmd, const bufferlist& inbl,                 bufferlist *poutbl, string *prs);  void handle_log(MLog *m);  int monitor_log(const string& level, rados_log_callback_t cb, void *arg);  void get();  bool put();  void blacklist_self(bool set);};
connect() 连接
int librados::RadosClient::connect(){  common_init_finish(cct);  int err;  // already connected?  if (state == CONNECTING)    return -EINPROGRESS;  if (state == CONNECTED)    return -EISCONN;  state = CONNECTING;  // get monmap  err = monclient.build_initial_monmap();   //通过配置文件获取初始化的Monitor  if (err < 0)    goto out;  err = -ENOMEM;  messenger = Messenger::create_client_messenger(cct, "radosclient");        //创建通信模块  if (!messenger)    goto out;  // require OSDREPLYMUX feature.  this means we will fail to talk to  // old servers.  this is necessary because otherwise we won't know  // how to decompose the reply data into its consituent pieces.  messenger->set_default_policy(Messenger::Policy::lossy_client(0, CEPH_FEATURE_OSDREPLYMUX));  ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl;  ldout(cct, 1) << "starting objecter" << dendl;  //创建objecter  objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,                          &finisher,                          cct->_conf->rados_mon_op_timeout,                          cct->_conf->rados_osd_op_timeout);  if (!objecter)    goto out;  objecter->set_balanced_budget();  // mc添加 messenger  monclient.set_messenger(messenger);   // objecter 初始化  objecter->init();    // messenger添加 dispather  messenger->add_dispatcher_tail(objecter);  messenger->add_dispatcher_tail(this);   // messenger启动  messenger->start();  ldout(cct, 1) << "setting wanted keys" << dendl;  monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);  ldout(cct, 1) << "calling monclient init" << dendl;  // mc 初始化  err = monclient.init();  if (err) {    ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;    shutdown();    goto out;  }  err = monclient.authenticate(conf->client_mount_timeout);  if (err) {    ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;    shutdown();    goto out;  }  messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));  objecter->set_client_incarnation(0);    // objecter 启动  objecter->start();  lock.Lock();  // 定时器初始化  timer.init();  monclient.renew_subs();  //执行回调的完成类start  finisher.start();  // 更新 状态为已连接  state = CONNECTED;  instance_id = monclient.get_global_id();  ...}
create_ioctx 根据pool创建ioctx
int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io){  // 获取 poolid  int64_t poolid = lookup_pool(name);  ...  // 创建 IoCtxImpl  *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);  return 0;}
Mon OSD pg 命令操作
int librados::RadosClient::mon_command(const vector& cmd,                                       const bufferlist &inbl,                                       bufferlist *outbl, string *outs){  // mc start_mon_command 发送到monitor  monclient.start_mon_command(cmd, inbl, outbl, outs,                               new C_SafeCond(&mylock, &cond, &done, &rval));}int librados::RadosClient::osd_command(int osd, vector& cmd,                                       const bufferlist& inbl,                                       bufferlist *poutbl, string *prs){  // 发送到osd  int r = objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,                         new C_SafeCond(&mylock, &cond, &done, &ret));}int librados::RadosClient::pg_command(pg_t pgid, vector& cmd,                                      const bufferlist& inbl,                                      bufferlist *poutbl, string *prs){  // 发送到pg  int r = objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,                        new C_SafeCond(&mylock, &cond, &done, &ret));}

Ioctximpl

librados::IoCtx的实现IoCtxImpl

  1. 把请求封装成ObjectOperation 类(osdc 中的)

  2. 把相关的pool信息添加到里面,封装成Objecter::Op对像

  3. 调用相应的函数 objecter- >op_submit 发送给相应的OSD

  4. 操作完成后,调用相应的回调函数。

如rados_write

extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, size_t len, uint64_t off){  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;  object_t oid(o);  bufferlist bl;  bl.append(buf, len);  int retval = ctx->write(oid, bl, len, off);}

调用IoCtxImpl::write

int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,                               size_t len, uint64_t off){  ::ObjectOperation op;  prepare_assert_ops(&op);              // assert ops  bufferlist mybl;  mybl.substr_of(bl, 0, len);  op.write(off, mybl);                  // 封装到op.write Objecter.h ObjectOperation write  return operate(oid, &op, NULL);       // IoCtxImpl::operate}
int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,                                 ceph::real_time *pmtime, int flags){  int op = o->ops[0].op.op;    Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,                                                          *o, snapc, ut, flags,                                                          NULL, oncommit, &ver);  objecter->op_submit(objecter_op);}

AioCompletionImpl

OSDC

ObjectOperation
struct ObjectOperation {  vector ops;    // ops集合  int flags;              int priority;  vector out_bl;   // 输出bufferlist  vector out_handler; // 回调函数  vector out_rval;        // 返回码集合  size_t size() {               // op个数    return ops.size();  }  /**   * This is a more limited form of C_Contexts, but that requires   * a ceph_context which we don't have here.   */   // 用户添加回调函数  class C_TwoContexts : public Context {    Context *first;    Context *second;  };  /**   * Add a callback to run when this operation completes,   * after any other callbacks for it.   */   // 添加回调函数  void add_handler(Context *extra) {    size_t last = out_handler.size() - 1;    Context *orig = out_handler[last];    if (orig) {      Context *wrapper = new C_TwoContexts(orig, extra);      out_handler[last] = wrapper;    } else {      out_handler[last] = extra;    }  }  // 添加操作  OSDOp& add_op(int op) {    int s = ops.size();    ops.resize(s+1);    ops[s].op.op = op;    out_bl.resize(s+1);    out_bl[s] = NULL;    out_handler.resize(s+1);    out_handler[s] = NULL;    out_rval.resize(s+1);    out_rval[s] = NULL;    return ops[s];  }  // 添加data  void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) {    OSDOp& osd_op = add_op(op);    osd_op.op.extent.offset = off;    osd_op.op.extent.length = len;    osd_op.indata.claim_append(bl);  }  void add_clone_range(int op, uint64_t off, uint64_t len,                       const object_t& srcoid, uint64_t srcoff,                       snapid_t srcsnapid) {}  void add_xattr(int op, const char *name, const bufferlist& data) {}  void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,                     uint8_t cmp_mode, const bufferlist& data) {}  // 添加call method  void add_call(int op, const char *cname, const char *method,                bufferlist &indata,                bufferlist *outbl, Context *ctx, int *prval) {    OSDOp& osd_op = add_op(op);    unsigned p = ops.size() - 1;    out_handler[p] = ctx;    out_bl[p] = outbl;    out_rval[p] = prval;    osd_op.op.cls.class_len = strlen(cname);    osd_op.op.cls.method_len = strlen(method);    osd_op.op.cls.indata_len = indata.length();    osd_op.indata.append(cname, osd_op.op.cls.class_len);    osd_op.indata.append(method, osd_op.op.cls.method_len);    osd_op.indata.append(indata);  }    void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,                epoch_t start_epoch) {}  void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,                       collection_list_handle_t cookie, epoch_t start_epoch) {}  void add_alloc_hint(int op, uint64_t expected_object_size,                      uint64_t expected_write_size) {}  // ------  // pg 操作  void pg_ls(uint64_t count, bufferlist& filter,             collection_list_handle_t cookie, epoch_t start_epoch) {}  void pg_nls(uint64_t count, const bufferlist& filter,              collection_list_handle_t cookie, epoch_t start_epoch) {}  // 创建 操作  void create(bool excl) {    OSDOp& o = add_op(CEPH_OSD_OP_CREATE);    o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);  }  // 状态  struct C_ObjectOperation_stat : public Context {    bufferlist bl;    uint64_t *psize;    ceph::real_time *pmtime;    time_t *ptime;    struct timespec *pts;    int *prval;    // 完成大小,时间等    void finish(int r) {}    }  };  // 查看状态,获取C_ObjectOperation_stat  void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {}  void stat(uint64_t *psize, time_t *ptime, int *prval) {}  void stat(uint64_t *psize, struct timespec *pts, int *prval) {}    // object data  // 读操作  void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,            Context* ctx) {    bufferlist bl;    add_data(CEPH_OSD_OP_READ, off, len, bl);    unsigned p = ops.size() - 1;    out_bl[p] = pbl;    out_rval[p] = prval;    out_handler[p] = ctx;  }  void sparse_read(uint64_t off, uint64_t len, std::map *m,                   bufferlist *data_bl, int *prval) {}  // 写操作  void write(uint64_t off, bufferlist& bl,             uint64_t truncate_size,             uint32_t truncate_seq) {    add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);          // 添加data, 将WRITE存入ops,将数据放在op中    OSDOp& o = *ops.rbegin();    o.op.extent.truncate_size = truncate_size;    o.op.extent.truncate_seq = truncate_seq;  }  void write(uint64_t off, bufferlist& bl) {}  void write_full(bufferlist& bl) {}  void append(bufferlist& bl) {}  void zero(uint64_t off, uint64_t len) {}  void truncate(uint64_t off) {}  void remove() {}  void mapext(uint64_t off, uint64_t len) {}  void sparse_read(uint64_t off, uint64_t len) {}  void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len,                   uint64_t dst_offset) {}  // object attrs  // 属性操作  void getxattr(const char *name, bufferlist *pbl, int *prval) {}  void getxattrs(std::map *pattrs, int *prval) {}  void setxattr(const char *name, const bufferlist& bl) {}  void setxattr(const char *name, const string& s) {}  void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,                const bufferlist& bl) {}  void rmxattr(const char *name) {}  void setxattrs(map& attrs) {}  void resetxattrs(const char *prefix, map& attrs) {}  // trivialmap  void tmap_update(bufferlist& bl) {}  void tmap_put(bufferlist& bl) {}  void tmap_get(bufferlist *pbl, int *prval) {}  void tmap_get() {}  void tmap_to_omap(bool nullok=false) {}  // objectmap  void omap_get_keys(const string &start_after,                     uint64_t max_to_get,                     std::set *out_set,                     int *prval) {    OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);    bufferlist bl;    ::encode(start_after, bl);    ::encode(max_to_get, bl);    op.op.extent.offset = 0;    op.op.extent.length = bl.length();    op.indata.claim_append(bl);    if (prval || out_set) {      unsigned p = ops.size() - 1;      C_ObjectOperation_decodekeys *h =        new C_ObjectOperation_decodekeys(out_set, prval);      out_handler[p] = h;      out_bl[p] = &h->bl;      out_rval[p] = prval;    }  }  void omap_get_vals(const string &start_after,                     const string &filter_prefix,                     uint64_t max_to_get,                     std::map *out_set,                     int *prval) {}  void omap_get_vals_by_keys(const std::set &to_get,                            std::map *out_set,                            int *prval) {}  void omap_cmp(const std::map > &assertions,                int *prval) {}                  void copy_get(object_copy_cursor_t *cursor,                uint64_t max,                uint64_t *out_size,                ceph::real_time *out_mtime,                std::map *out_attrs,                bufferlist *out_data,                bufferlist *out_omap_header,                bufferlist *out_omap_data,                vector *out_snaps,                snapid_t *out_snap_seq,                uint32_t *out_flags,                uint32_t *out_data_digest,                uint32_t *out_omap_digest,                vector > *out_reqids,                uint64_t *truncate_seq,                uint64_t *truncate_size,                int *prval) {}  void undirty() {}  struct C_ObjectOperation_isdirty : public Context {};  void is_dirty(bool *pisdirty, int *prval) {}  void omap_get_header(bufferlist *bl, int *prval) {}  void omap_set(const map &map) {}  void omap_set_header(bufferlist &bl) {}  void omap_clear() {}  void omap_rm_keys(const std::set &to_remove) {}  // object classes  void call(const char *cname, const char *method, bufferlist &indata) {}  void call(const char *cname, const char *method, bufferlist &indata,            bufferlist *outdata, Context *ctx, int *prval) {}  void rollback(uint64_t snapid) {}  void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,                 version_t src_version, unsigned flags,                 unsigned src_fadvise_flags) {}};
OSDOp osd_types.h
struct OSDOp {  ceph_osd_op op;   // 操作  sobject_t soid;   // oid  bufferlist indata, outdata;   // 输入输出data  int32_t rval;     // 返回码};
Objecter
class Objecter : public md_config_obs_t, public Dispatcher {public:  Messenger *messenger;         // 消息  MonClient *monc;              // mc  Finisher *finisher;private:  OSDMap    *osdmap;            // osdmappublic:  using Dispatcher::cct;  std::multimap crush_location;  atomic_t initialized;private:  atomic64_t last_tid;  atomic_t inflight_ops;  atomic_t client_inc;  uint64_t max_linger_id;  atomic_t num_unacked;  atomic_t num_uncommitted;  atomic_t global_op_flags; // flags which are applied to each IO op  bool keep_balanced_budget;  bool honor_osdmap_full;public:  void maybe_request_map();private:  void _maybe_request_map();  version_t last_seen_osdmap_version;  version_t last_seen_pgmap_version;  mutable boost::shared_mutex rwlock;  using lock_guard = std::unique_lock;  using unique_lock = std::unique_lock;  using shared_lock = boost::shared_lock;  using shunique_lock = ceph::shunique_lock;  ceph::timer timer;  PerfCounters *logger;  uint64_t tick_event;  void start_tick();  void tick();  void update_crush_location();public:  /*** track pending operations ***/  // readpublic:  struct OSDSession;  struct op_target_t {}         // 操作目标  struct Op : public RefCountedObject {};   //  操作};
op_target_t

操作目标,封装pg信息,osd信息

struct op_target_t {    int flags;    object_t base_oid;    object_locator_t base_oloc;    object_t target_oid;        // 目标oid    object_locator_t target_oloc;   // 位置    // 是否 base_pgid    bool precalc_pgid; ///< true if we are directed at base_pgid, not base_oid    // 直接的 pgid    pg_t base_pgid; ///< explciti pg target, if any    pg_t pgid; ///< last pg we mapped to    unsigned pg_num; ///< last pg_num we mapped to    unsigned pg_num_mask; ///< last pg_num_mask we mapped to    // 启动的osd    vector up; ///< set of up osds for last pg we mapped to    // acting osd    vector acting; ///< set of acting osds for last pg we mapped to    // primary    int up_primary; ///< primary for last pg we mapped to based on the up set    int acting_primary;  ///< primary for last pg we mapped to based on the                         ///  acting set        // pool 大小    int size; ///< the size of the pool when were were last mapped    // pool 最小size    int min_size; ///< the min size of the pool when were were last mapped    // 是否按位排序    bool sort_bitwise; ///< whether the hobject_t sort order is bitwise    // 是否副本    bool used_replica;    bool paused;    int osd;      ///< the final target osd, or -1  };

操作

struct Op : public RefCountedObject {    OSDSession *session;    // session 连接    int incarnation;    op_target_t target;     // 操作目标    ConnectionRef con;  // for rx buffer only    uint64_t features;  // explicitly specified op features    vector ops;  // 操作集合    snapid_t snapid;    SnapContext snapc;    ceph::real_time mtime;    bufferlist *outbl;    vector out_bl;    vector out_handler;    vector out_rval;    int priority;    Context *onack, *oncommit;    uint64_t ontimeout;    Context *oncommit_sync; // used internally by watch/notify    ceph_tid_t tid;    eversion_t replay_version; // for op replay    int attempts;    version_t *objver;    epoch_t *reply_epoch;    ceph::mono_time stamp;    epoch_t map_dne_bound;    bool budgeted;    /// true if we should resend this message on failure    bool should_resend;    /// true if the throttle budget is get/put on a series of OPs,    /// instead of per OP basis, when this flag is set, the budget is    /// acquired before sending the very first OP of the series and    /// released upon receiving the last OP reply.    bool ctx_budgeted;    int *data_offset;    epoch_t last_force_resend;    osd_reqid_t reqid; // explicitly setting reqid  };
分片 Striper
扩展 ObjectExtent

记录分片信息

class ObjectExtent {   public:  object_t    oid;       // object id  uint64_t    objectno;     // 序号  uint64_t    offset;    // in object   object内偏移  uint64_t    length;    // in object   分片长度  uint64_t    truncate_size;      // in object  object_locator_t oloc;   // object locator (pool etc) pool位置  vector >  buffer_extents;  // off -> len.  extents in buffer being mapped (may be fragmented bc of striping!) };

"RadosClient OSDC怎么使用"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0