千家信息网

libgo 源码剖析(2. libgo调度策略源码实现)

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:调度器 Scheduler(简称 S)执行器 Processer(简称 P)协程 Task(简称 T)三
千家信息网最后更新 2025年01月23日libgo 源码剖析(2. libgo调度策略源码实现)

本文将从源码实现上对 libgo 的调度策略进行分析,主要涉及到上一篇文章中的三个结构体的定义:

  • 调度器 Scheduler(简称 S)
  • 执行器 Processer(简称 P)
  • 协程 Task(简称 T)

三者的关系如下图所示:


本文会列出类内的主要成员和主要函数做以分析。


1. 协程调度器:class Scheduler

libgo/scheduler/scheduler.h

class Scheduler{public:    /*    *  创建一个调度器,初始化 libgo    *  创建主线程的执行器,如果后续 STart 的时候没有参数,默认只有一个执行器去做    *  当仅使用一个线程进行协程调度时, 协程地执行会严格地遵循其创建顺序.    * */    static Scheduler* Create();    /*    * 创建一个协程 Task 对象,并添加到当前的执行器 processer 的任务队列中,    * 调度器的任务数 taskCount_ +1    * */    void CreateTask(TaskF const& fn, TaskOpt const& opt);    /* 启动调度器    * @minThreadNumber : 最小调度线程数, 为0时, 设置为cpu核心数.    * @maxThreadNumber : 最大调度线程数, 为0时, 设置为minThreadNumber.    *          如果maxThreadNumber大于minThreadNumber, 则当协程产生长时间阻塞时,    *          可以自动扩展调度线程数.    *  唤醒定时器线程    *  每个调度线程都会调用 Process 开始调度,最后开启 id 为 0 的调度线程    * 如果 maxThreadNumber_ > 1 的话,会开启调度线程 DispatcherThread    * */    void Start(int minThreadNumber = 1, int maxThreadNumber = 0);    /*    *  停止调度,停止后无法恢复, 仅用于安全退出main函数    *  如果某个调度线程被协程阻塞, 必须等待阻塞结束才能退出.    * */    void Stop();private:    /*    *  调度线程,主要为平衡多个 processer 的负载将高负载或阻塞的 p 中的协程 steal 给低负载的 p    *  如果全部阻塞但是还有协程待执行,会起新线程,线程数不超过    maxThreadNumber_    *  会将阻塞 P 中的协程分摊给负载较少的 P    * */    void DispatcherThread();    /*    *  创建一个新的 Processer,并添加到双端队列 processers_ 中    * */    void NewProcessThread();private:    atomic_t taskCount_{0};   // 用来统计协程数量    Deque processers_;    // DispatcherThread双端队列,用来存放所有的执行器,每个执行器都会单独开一个线程去执行,线程中回调 Process() 方法。    LFLock started_;    // libgo 提供的自选锁};

调度器负责管理 1~N 个调度线程,每个调度线程一个执行器 Processer。调度器仅负责均衡各个执行器的负载,防止全部卡住的情况,并不涉及协程的切换等工作。

使用

ligbo提供了默认的协程调度器 co_sched

#define g_Scheduler ::co::Scheduler::getInstance()#define co_sched g_Scheduler

用户也可以创建自己的协程调度器

co::Scheduler* my_sched = co::Scheduler::Create();

启动调度

std::thread t([my_sched]{mysched->Start();});t.detach();

调度器原理

  1. schedule 负责整个系统的协程调度,协程的运行依赖于执行器 Processer(简称 P),因此在调度器初始化的时候会选择创建 P 的数量(支持动态增长),所有的执行器会添加到双端队列中。主线程也作为一个执行器,在创建 Scheduler 对象的时候创建,位于双端队列下标为 0 的位置(注意:只是创建对象,并没有开始运行);

  2. 当调用了 Start() 函数后,会正式开始运行。在 Start 函数内部,会创建指定数量的执行器 P,具体数量取决于参数,默认会创建 minThreadNumber 个,当全部执行器都阻塞之后,会动态扩展,最多 maxThreadNumber 个执行器。每个执行器都会运行于一个单独的线程,执行器负责该线程内部协程的切换和执行;

  3. 当创建协程时,会将协程添加到某一个处于活跃状态的执行器,如果恰好都不活跃,也会添加到某一个 P 中,这并不影响执行器的正常工作,因为调度器的调度线程会去处理它;

  4. Start 函数内部,除了上述执行器所在线程,还会开启调度线程 DispatcherThread,调度线程会平衡各个 P 的协程数量和负载,进行 steal,如果所有 P 都阻塞,会根据 maxThreadNumber 动态增加 P 的数量,如果仅仅部分 P 阻塞,会将阻塞的 P 中的协程全部拿出(steal),均摊到负载最小的 P 中;

  5. Schedule 也会选择性开启协程的定时器线程;

  6. 开启 FastSteadyClock 线程。

关于定时器以及时钟的实现,会在之后的文章中讨论。


2. 协程执行器:class Processer

libgo/scheduler/processer.h

每个协程执行器对应一个线程,负责本线程的协程调度,但并非线程安全的,是协程调度的核心。

class Processer{public:    // 协程挂起标识,用于后续进行唤醒和超时判断    struct SuspendEntry {             // ...    };    // 协程切出    ALWAYS_INLINE static void StaticCoYield();    // 挂起当前协程    static SuspendEntry Suspend();    // 挂起当前协程, 并在指定时间后自动唤醒    static SuspendEntry Suspend(FastSteadyClock::duration dur);    // 唤醒协程    static bool Wakeup(SuspendEntry const& entry);private:    /*    *  执行器对协程的调度,也是执行器所在现在的主处理逻辑    * */    void Process();    /*    * 从当前执行器中偷 n 个协程并返回    * n 为0则全部偷出来,否则取出相应的个数    * */    SList Steal(std::size_t n);    /*    *  给当前执行器打标记,用于检测协程是否阻塞    * */    void Mark();private:    int id_;    // 线程 id,与 shcedule 中的 _processer 下标对应    Scheduler * scheduler_;     // 该执行器依赖的调度器    volatile bool active_ = true;   // 该执行器的活跃状态,活跃表明该执行器未被阻塞,由调度器的调度线程控制    volatile int64_t markTick_ = 0;     // mark 的时间戳    volatile uint64_t markSwitch_ = 0;  // mark 的时候处于第几次协程调度    volatile uint64_t switchCount_ = 0; // 协程调度的次数    // 当前正在运行的协程    Task* runningTask_{nullptr};    Task* nextTask_{nullptr};    // 协程队列    typedef TSQueue TaskQueue;    TaskQueue runnableQueue_;   // 运行协程队列    TaskQueue waitQueue_;   // 等待协程队列    TSQueue gcQueue_;  // 待回收的协程队列,协程运行完毕之后,会被添加到该队列中,等待回收    TaskQueue newQueue_;    // 新添加到该执行器中的协程,包括刚刚 steal 过来的协程,该队列中的协程暂不会执行,会由 Process() 函数将该队列中的协程不断添加到 runnableQueue_ 中    volatile uint64_t switchCount_ = 0; // 协程调度的次数    // 执行器等待的条件变量    std::mutex cvMutex_;    std::condition_variable cv_;    std::atomic_bool waiting_{false};};

执行器对协程的调度 Process()

执行器 Processer 维护了三个线程安全的协程队列:

  • runnableQueue_:可运行协程队列;
  • waitQueue_:存放挂起的协程;
  • newQueue_:该队列中存放的是新加入的协程,包括新创建的协程,唤醒挂起的协程,还有 steal 来的协程;
void Processer::Process(){    GetCurrentProcesser() = this;    bool & isStop = *stop_;    while (!isStop)    {        runnableQueue_.front(runningTask_);        // 获取一个可以运行对协程对象        if (!runningTask_) {            if (AddNewTasks())                runnableQueue_.front(runningTask_);            if (!runningTask_) {                WaitCondition();    // 没有可以执行的协程,wait 条件变量                AddNewTasks();                continue;            }        }        addNewQuota_ = 1;        while (runningTask_ && !isStop) {            runningTask_->state_ = TaskState::runnable;            runningTask_->proc_ = this;            ++switchCount_;            runningTask_->SwapIn();            switch (runningTask_->state_) {                case TaskState::runnable:                    {                        std::unique_lock lock(runnableQueue_.LockRef());                        auto next = (Task*)runningTask_->next;                        if (next) {                            runningTask_ = next;                            runningTask_->check_ = runnableQueue_.check_;                            break;                        }                        if (addNewQuota_ < 1 || newQueue_.emptyUnsafe()) {                            runningTask_ = nullptr;                        } else {                            lock.unlock();                            if (AddNewTasks()) {                                runnableQueue_.next(runningTask_, runningTask_);                                -- addNewQuota_;                            } else {                                std::unique_lock lock2(runnableQueue_.LockRef());                                runningTask_ = nullptr;                            }                        }                    }                    break;                case TaskState::block:                    {                        std::unique_lock lock(runnableQueue_.LockRef());                        runningTask_ = nextTask_;                        nextTask_ = nullptr;                    }                    break;                case TaskState::done:                default:                    {                        runnableQueue_.next(runningTask_, nextTask_);                        if (!nextTask_ && addNewQuota_ > 0) {                            if (AddNewTasks()) {                                runnableQueue_.next(runningTask_, nextTask_);                                -- addNewQuota_;                            }                        }                        DebugPrint(dbg_task, "task(%s) done.", runningTask_->DebugInfo());                        runnableQueue_.erase(runningTask_);                        if (gcQueue_.size() > 16)    // 执行完毕的协程,需要回收资源                            GC();                        gcQueue_.push(runningTask_);                        if (runningTask_->eptr_) {                            std::exception_ptr ep = runningTask_->eptr_;                            std::rethrow_exception(ep);                        }                        std::unique_lock lock(runnableQueue_.LockRef());                        runningTask_ = nextTask_;                        nextTask_ = nullptr;                    }                    break;            }        }    }}

在调度器 Schedule 执行 Stop() 函数之前,执行器 P 会一直处于调度协程阶段 Process()。在期间,执行器 P 会将运行队列 runnableQueue 中的第一个协程获取进行执行,如果可运行队列为空,执行器会尝试将处于 newQueue 中的协程添加到可运行队列中去,如果 newQueue_ 为空,说明此时该执行器处于无协程可调度状态,通过设置条件变量,将执行器设置为等待状态;

当获取到一个可执行协程之后,会执行该协程的任务。协程的执行流程是通过状态机来实现的。(协程有三个状态:运行中,阻塞,执行完毕)

  • 对于运行中的协程,我们只需要确定下一个要执行的协程对象即可;
  • 对于阻塞的协程,只有当协程挂起时(调用了 Suspend 方法),状态才会切换到这里,因此,这时候只需要去执行 nextTask 即可;
  • 对于运行完毕的协程,只有当 Task 处理函数执行完成之后,状态才会切换到这里,因此,需要考虑对该协程资源进行回收;

条件变量

Processer 使用了 std::mutex,并且提供了条件变量用来唤醒。当调度器尝试获取下一个可运行的协程对象时,若此时无可用协程对象,就会主动去等待该条件变量,默认100毫秒的超时时间。

void Processer::WaitCondition(){    GC();    std::unique_lock lock(cvMutex_);    waiting_ = true;    cv_.wait_for(lock, std::chrono::milliseconds(100));    waiting_ = false;}void Processer::NotifyCondition(){    cv_.notify_all();}

当调度器向该执行器中增加了新的协程对象时,会唤醒该条件变量,继续执行 Process 流程。使用条件变量唤醒的效率,要远远高于不断去轮询。

为什么在使用了条件变量后还要设置超时时间,定时轮询,即使条件变量没有被唤醒也希望它返回呢?
因为我们不希望线程会在这里阻塞,只要没有新的协程加入,就一直在死等。我们希望线程在等待的同时,也可以定时跳出,执行一些其它的检测工作等。

从执行器中偷指定数量的协程出来 -> steal()

简单来说,从执行器中取协程出来,就是从执行器维护的双端队列中获取执行个数的结点。

为什么要取出来?前面提到过,要么该执行器负载过大,要么该执行器处于阻塞的状态。

SList Processer::Steal(std::size_t n){    if (n > 0) {        // steal 指定个数协程        newQueue_.AssertLink();        auto slist = newQueue_.pop_back(n);        newQueue_.AssertLink();        if (slist.size() >= n)            return slist;        std::unique_lock lock(runnableQueue_.LockRef());        bool pushRunningTask = false, pushNextTask = false;        if (runningTask_)            pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);        if (nextTask_)            pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);        auto slist2 = runnableQueue_.pop_backWithoutLock(n - slist.size());        if (pushRunningTask)            runnableQueue_.pushWithoutLock(runningTask_);        if (pushNextTask)            runnableQueue_.pushWithoutLock(nextTask_);        lock.unlock();        slist2.append(std::move(slist));        if (!slist2.empty())            DebugPrint(dbg_scheduler, "Proc(%d).Stealed = %d", id_, (int)slist2.size());        return slist2;    } else {        // steal all        newQueue_.AssertLink();        auto slist = newQueue_.pop_all();        newQueue_.AssertLink();        std::unique_lock lock(runnableQueue_.LockRef());        bool pushRunningTask = false, pushNextTask = false;        if (runningTask_)            pushRunningTask = runnableQueue_.eraseWithoutLock(runningTask_, true) || slist.erase(runningTask_, newQueue_.check_);        if (nextTask_)            pushNextTask = runnableQueue_.eraseWithoutLock(nextTask_, true) || slist.erase(nextTask_, newQueue_.check_);        auto slist2 = runnableQueue_.pop_allWithoutLock();        if (pushRunningTask)            runnableQueue_.pushWithoutLock(runningTask_);        if (pushNextTask)            runnableQueue_.pushWithoutLock(nextTask_);        lock.unlock();        slist2.append(std::move(slist));        if (!slist2.empty())            DebugPrint(dbg_scheduler, "Proc(%d).Stealed all = %d", id_, (int)slist2.size());        return slist2;    }}

首先,会从 newQueue 队列中获取协程结点,因为 newQueue 中的结点还没有添加到运行队列中,因此可以直接取出;如果 newQueue 中协程数量不足,会从 runnableQueue 队列尾部中继续获取结点。由于 runnableQueue 队列中我们记录了正在执行的协程和下一次将执行的协程(runningTask & nextTask),需要特殊处理。在从 runnableQueue 偷协程之前,会将 runningTask & nextTask 从队列删除,待偷完结点之后再次添加到当前 runnableQueue_ 队列中。

简单说,偷协程的工作,不会从队列中获取到 runningTask & nextTask 标识的协程。

阻塞判断

void Processer::Mark(){    if (runningTask_ && markSwitch_ != switchCount_) {        markSwitch_ = switchCount_;        markTick_ = NowMicrosecond();    }}uint32_t cycle_timeout_us = 10 * 1000; bool Processer::IsBlocking(){    if (!markSwitch_ || markSwitch_ != switchCount_) return false;    return NowMicrosecond() > markTick_ + CoroutineOptions::getInstance().cycle_timeout_us;}

Mark 函数会在调度器的调度函数中被调用,需要注意的是,只有执行器处于活跃状态时才会调用。Mark 顾名思义,是给该执行打标记,会记录mark的时间戳,并记录下是在第多少次协程调度的过程中做了标记,Mark 的作用是用来进行执行器的阻塞检测。

处于活跃状态的执行器,总是在执行着协程的切换,因此,会不断自增 switchCount_ 的值,根据 IsBlocking 函数得知,当我们此时标签记录的协程调度次数超过10ms没有发生改变,我们认为该执行器发生阻塞,Scheduler 会进行 Steal 操作。

协程挂起 Suspend

static SuspendEntry Suspend();

一种方式是直接挂起,会将该协程状态转换为 TaskState::block,然后将该协程从 runnableQueue 中删除,再添加到 waitQueue 中;

另外一种方式是挂起之后(第一种方式执行完毕之后),允许配置一个时间段之后去自动唤醒该协程。

wakeup

用于唤醒协程

唤醒协程要做的,就是讲待唤醒的协程从 waitQueue_ 中删除并重新添加到 newQueue_中去。

StaticCoYield

用于在一个执行器中切出当前协程

有两种可能,一种是协程被阻塞需要挂起;另外一种是协程执行完毕,主动切出。

具体实现是通过获取当前执行器正在执行的协程 Task,调用 SwapOut() 方法实现。

ALWAYS_INLINE void Processer::StaticCoYield(){    auto proc = GetCurrentProcesser();    if (proc) proc->CoYield();}ALWAYS_INLINE void Processer::CoYield(){    Task *tk = GetCurrentTask();    assert(tk);    ++ tk->yieldCount_;#if ENABLE_DEBUGGER    DebugPrint(dbg_yield, "yield task(%s) state = %s", tk->DebugInfo(), GetTaskStateName(tk->state_));    if (Listener::GetTaskListener())        Listener::GetTaskListener()->onSwapOut(tk->id_);#endif    tk->SwapOut();}

几个需要注意的问题

> 可能会切出协程上下文的几种情况:
  1. 协程被挂起;
  2. 协程执行完毕;
  3. 用户主动切出 co_yield。
    #define co_yield do { ::co::Processer::StaticCoYield(); } while (0)
> 协程被挂起的几种情况:
  1. 系统函数被 hook;
  2. libgo_poll (被 hook 的 io 操作函数会调用 libgo_poll 实现切换)
  3. select
  4. sleep、usleep、nanosleep
  5. 调用了协程锁 CoMutex(co_mutex),协程读写锁 CoRWMutex(co_rwmutex),或者使用了 channel。
> 切入协程上下文的几种情况:
  1. 执行器在调度(Process)期间;
  2. 唤醒挂起协程不会切入上下文,只是从等待队列中重新添加到 newQueue_。

3. 协程对象:struct Task

# 协程状态enum class TaskState{    runnable,   // 可运行    block,      // 阻塞    done,       // 协程运行完毕};typedef std::function TaskF;    // c++11提供的函数模板struct Task{    TaskState state_ = TaskState::runnable;    uint64_t id_;       // 当前调度器下协程编号,从0开始    TaskF fn_;          // 协程运行的函数    uint64_t yieldCount_ = 0;   // 协程切出的次数    Context ctx_;       // 上下文信息    Processer* proc_ = nullptr;     // 归属于哪个执行器    // 提供了协程切入、切出、切换到指定线程三个函数    ALWAYS_INLINE void SwapIn();    ALWAYS_INLINE void SwapTo(Task* other);    ALWAYS_INLINE void SwapOut();private:    static void StaticRun(intptr_t vp);     // 参数为 Task*,函数会去执行该 Task 的 fn_(),执行完毕后,协程状态改为 TaskState::done,并在执行器 P 中切出};

每个 Task 对象是一个协程,在使用过程中,创建一个协程实际就是创建了一个 Task 对象,再添加到对应的执行器 P 中。之前提到过,执行器进行协程调度是通过一个状态机来实现的,这里的 TaskState 就是协程状态,协程函数 fn_ 会在 StaticRun 静态方法中调用,该静态方法注册到了协程上下文 _ctx 中。

除此之外,Task 类内部,也提供了协程的切入切出方法,本质也是调用了上下文的切换。

StaticRun

控制协程的运行,内部调用了 Task::Run() 方法,会在协程函数 fn_ 执行完毕之后,将协程状态转换为 TaskState::done,并将协程切出。

void Task::Run(){    auto call_fn = [this]() {        this->fn_();        this->fn_ = TaskF(); //让协程function对象的析构也在协程中执行    };    \\ ...        call_fn();    \\ ...    state_ = TaskState::done;    Processer::StaticCoYield();}void Task::StaticRun(intptr_t vp){    Task* tk = (Task*)vp;    tk->Run();}

这里就是对 libgo 调度相关实现的描述,本文跳过了对定时器和时钟部分的实现,这个会在之后单独叙述。本文涉及到的代码在源码目录下的

libgo-master/libgo/scheduler/processer.cpp   libgo-master/libgo/scheduler/processer.hlibgo-master/libgo/scheduler/scheduler.cpplibgo-master/libgo/scheduler/scheduler.h

有兴趣的读者可以对照源码学习,欢迎讨论学习

0