千家信息网

containerd源码接口调用的方法是什么

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容主要讲解"containerd源码接口调用的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"containerd源码接口调用的方法是什么"
千家信息网最后更新 2025年02月03日containerd源码接口调用的方法是什么

本篇内容主要讲解"containerd源码接口调用的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"containerd源码接口调用的方法是什么"吧!

源码接口调用详情

从ctr调用containerd-api

####checkpoint(用于快照,docker目前该功能不完善) |ctr cmd | containerd-api | | ----------- |--------| | list | /types.API/ListCheckpoint | | create | /types.API/CreateCheckpoint | | delete | /types.API/DeleteCheckpoint |

####containers |ctr cmd | containerd-api | | ----------- |--------| | list、state| /types.API/State | | pause、resume、update | /types.API/UpdateContainer | | create | /types.API/CreateContainer | | stats | /types.API/Stats | | watch | /types.API/State, /types.API/Events | | exec | /types.API/Events, /types.API/AddProcess, /types.API/UpdateProcess | | kill | /types.API/Signal | | start | /types.API/Events, /types.API/CreateContainer , /types.API/UpdateProcess | | update | /types.API/UpdateContainer | ####events /types.API/Events ####state /types.API/State ####version /types.API/GetServerVersion --return result

从containerd-api至supervisor任务处理

注:API--server.go --> daemon - supervisor.go(handleTask func) ####checkpoint |containerd-api | supervisor | | ----------- |--------| | /types.API/ListCheckpoint (supervisor.GetContainersTask)| getContainers | | /types.API/CreateCheckpoint | createCheckpoint | | /types.API/DeleteCheckpoint | deleteCheckpoint |

####containers |containerd-api | supervisor | | ----------- |--------| | /types.API/State /types.API/Stats (supervisor.GetContainersTask) | getContainers | | /types.API/UpdateContainer (supervisor.UpdateTask) | updateContainer | | /types.API/CreateContainer (supervisor.StartTask) | start | | /types.API/Events | Events| | /types.API/AddProcess | addProcess | | /types.API/UpdateProcess | updateProcess | | /types.API/Signal| signal|

从supervisor至runtime(runC)

####checkpoint |supervisor | runtime | | ----------- |--------| | getContainers | - | | createCheckpoint | (runtime)CheckPoint -->exec.Command(c.runtime,arg....) | | deleteCheckpoint | (runtime)DeleteCheckpoint| ####containers |supervisor | runtime | | ----------- |--------| | getContainers | - | | updateContainer |(runtime)Resume Pause UpdateResources-->exec.Command(c.runtime,arg....) | | start | (runtime supervisor/worker.go) Start -->exec.Command(c.shim,c.id,c.bundle,c.runtime)| | addProcess | (runtime) exec --> exec.Command(c.shim,c.id,c.bundle,c.runtime) | | updateProcess | - | | signal | -|

##以createContainer为例走读代码 ###deamon启动监听tasks及startTasks进程 ####进入main.go main方法调用daemon方法

app.Action = func(context *cli.Context) {       if err := daemon(context); err != nil {              logrus.Fatal(err)       }}

####进入main.go daemon方法

for i := 0; i < 10; i++ {       wg.Add(1)       w := supervisor.NewWorker(sv, wg)       go w.Start()}if err := sv.Start(); err != nil {       return err}

####初始化supervisor/worker.go NewWorker并启动监听startTask并处理

func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {       return &worker{              s:  s,              wg: wg,       }}func (w *worker) Start() {       defer w.wg.Done()       for t := range w.s.startTasks {              started := time.Now()              process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))              if err != nil {                     logrus.WithFields(logrus.Fields{                            "error": err,                            "id":    t.Container.ID(),                     }).Error("containerd: start container")                     t.Err <- err                     evt := &DeleteTask{                            ID:      t.Container.ID(),                            NoEvent: true,                            Process: process,                     }                     w.s.SendTask(evt)                     continue              }

###启动supervisor/supervisor.go task监听task并处理

func (s *Supervisor) Start() error {       logrus.WithFields(logrus.Fields{              "stateDir":    s.stateDir,              "runtime":     s.runtime,              "runtimeArgs": s.runtimeArgs,              "memory":      s.machine.Memory,              "cpus":        s.machine.Cpus,       }).Debug("containerd: supervisor running")       go func() {              for i := range s.tasks {                     s.handleTask(i)               }

###containers容器创建示例 Ctl控制台命令入口 ctr/main.go containersCommand

execCommand,killCommand,listCommand,pauseCommand,resumeCommand,startCommand,stateCommand,statsCommand,watchCommand,updateCommand,

####ctr/container.go

var startCommand = cli.Command{       Name:      "start",       Usage:     "start a container",       ArgsUsage: "ID BundlePath", ----…...         events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/       if err != nil {              fatal(err.Error(), 1)       }       if _, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/              fatal(err.Error(), 1)       }       if context.Bool("attach") {              go func() {                     io.Copy(stdin, os.Stdin)                     if _, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/                            Id:         id,                            Pid:        "init",                            CloseStdin: true,                     }); err != nil {                            fatal(err.Error(), 1)                     }                     restoreAndCloseStdin()              }()              if tty {                     resize(id, "init", c)                     go func() {                            s := make(chan os.Signal, 64)                            signal.Notify(s, syscall.SIGWINCH)                            for range s {                                   if err := resize(id, "init", c); err != nil {                                          log.Println(err)                                   }                            }                     }()              }              waitForExit(c, events, id, "init", restoreAndCloseStdin)       } },

###api处理 ####api/grpc/types/api.pb.go

func (c *aPIClient) Events(ctx context.Context, in *EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) {       stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc, "/types.API/Events", opts...)       if err != nil {              return nil, err       }       x := &aPIEventsClient{stream}       if err := x.ClientStream.SendMsg(in); err != nil {              return nil, err       }       if err := x.ClientStream.CloseSend(); err != nil {              return nil, err       }       return x, nil} func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {       out := new(CreateContainerResponse)       err := grpc.Invoke(ctx, "/types.API/CreateContainer", in, out, c.cc, opts...)       if err != nil {              return nil, err       }       return out, nil}func (c *aPIClient) UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) {       out := new(UpdateProcessResponse)       err := grpc.Invoke(ctx, "/types.API/UpdateProcess", in, out, c.cc, opts...)       if err != nil {              return nil, err       }       return out, nil}  func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error {       m := new(EventsRequest)       if err := stream.RecvMsg(m); err != nil {              return err       }       return srv.(APIServer).Events(m, &aPIEventsServer{stream})}  func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {       in := new(CreateContainerRequest)       if err := dec(in); err != nil {              return nil, err       }       if interceptor == nil {              return srv.(APIServer).CreateContainer(ctx, in)       }       info := &grpc.UnaryServerInfo{              Server:     srv,              FullMethod: "/types.API/CreateContainer",       }       handler := func(ctx context.Context, req interface{}) (interface{}, error) {              return srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))       }       return interceptor(ctx, in, info, handler)}  func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {       in := new(UpdateProcessRequest)       if err := dec(in); err != nil {              return nil, err       }       if interceptor == nil {              return srv.(APIServer).UpdateProcess(ctx, in)       }       info := &grpc.UnaryServerInfo{              Server:     srv,              FullMethod: "/types.API/UpdateProcess",       }       handler := func(ctx context.Context, req interface{}) (interface{}, error) {              return srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))       }       return interceptor(ctx, in, info, handler)}

api/grpc/server/server.go 进入第一步中的tasks及sendTasks处理队列

func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error { events := s.sv.Events(t, r.StoredOnly, r.Id) func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) { s.sv.SendTask(e) apiC, err := createAPIContainer(r.Container, false) func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {       e := &supervisor.UpdateProcessTask{}       e.ID = r.Id       e.PID = r.Pid       e.Height = int(r.Height)       e.Width = int(r.Width)       e.CloseStdin = r.CloseStdin       s.sv.SendTask(e)       if err := <-e.ErrorCh(); err != nil {              return nil, err       }       return &types.UpdateProcessResponse{}, nil}

####supervisor/create.go

func (s *Supervisor) start(t *StartTask) error {s.startTasks <- task }

####supervisor/worker.go

func (w *worker) Start() {       defer w.wg.Done()       for t := range w.s.startTasks {

####runtime/container.go

func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {       processRoot := filepath.Join(c.root, c.id, InitProcessID)       if err := os.Mkdir(processRoot, 0755); err != nil {              return nil, err       }       cmd := exec.Command(c.shim,              c.id, c.bundle, c.runtime,        ) ---执行 docker-containerd-shim命令        cmd.Dir = processRoot       cmd.SysProcAttr = &syscall.SysProcAttr{              Setpgid: true,       }       spec, err := c.readSpec()       if err != nil {              return nil, err       }       config := &processConfig{              checkpoint:  checkpointPath,              root:        processRoot,              id:          InitProcessID,              c:           c,              stdio:       s,              spec:        spec,              processSpec: specs.ProcessSpec(spec.Process),       }       p, err := newProcess(config)       if err != nil {              return nil, err       }       if err := c.createCmd(InitProcessID, cmd, p); err != nil {              return nil, err       }       return p, nil}

查看shim的Main方法注释参数传递

// Arg0: id of the container // Arg1: bundle path // Arg2: runtime binary

##containerd-shim接收后处理 ###containerd-shim/main.go

func start(log *os.File) error { p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))if err != nil {       return err}defer func() {       if err := p.Close(); err != nil {              writeMessage(log, "warn", err)       }}()if err := p.create(); err != nil {       p.delete()       return err}

###containerd-shim/process.go跳转执行runc命令

func (p *process) create() error {cmd := exec.Command(p.runtime, args...)

到此,相信大家对"containerd源码接口调用的方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0