千家信息网

Ribbon中怎么使用 LoadBalancer 实现负载均衡

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,这篇文章给大家介绍Ribbon中怎么使用 LoadBalancer 实现负载均衡,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Ribbon 负载均衡器Ribbon 的负载均衡器是
千家信息网最后更新 2025年02月01日Ribbon中怎么使用 LoadBalancer 实现负载均衡

这篇文章给大家介绍Ribbon中怎么使用 LoadBalancer 实现负载均衡,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。


Ribbon 负载均衡器

Ribbon 的负载均衡器是通过 LoadBalancerClient 来实现的,在应用启动的时候,LoadBalancerClient 默认会从 EurekaClient 获取服务列表,并将服务注册列表缓存在本地,当调用 LoadBalancerClient 的 choose() 方法的时候, 根据负载均衡策略 IRule 来选择一个可用的服务,从而实现负载均衡。

当然,LoadBalancerClient 也可以不从 EurekaClient 中获取服务列表,这是需要自己维护一个服务注册列表信息,具体操作如下:

ribbon:  eureka:    enabled: false  stores:  ribbon:    listOfServers: baidu.com, google.com

Ribbon 负载均衡器流程图

主要流程:

1. 当应用启动的时候,ILoadBalancer 从 EurekaClient 获取服务列表
2. 然后每 10 秒 向 EurekaClient 发送一次心跳检测,如果注册列表发生了变化,则更新获取重新获取
3. LoadBalancerClient 调用 choose() 方法来选择服务的时候,会调用 ILoadBalancer 的 chooseServer() 来获取一个可以的服务,
4. 在 ILoadBalancer 进行获取服务的时候,会根据负载均衡策略 IRule 来进行选择
5. 返回可用的服务

Ribbon 负载均衡器实现原理

下面就来看看每个类的实现原理

RibbonLoadBalancerClient

RibbonLoadBalancerClient 它是 Ribbon 负载均衡实现的一个重要的类,最终的负载均衡的请求处理都由它来执行,先来看下它的类图:

它实现了 LoadBalancerClient 接口,而 LoadBalancerClient 接口实现了 ServiceInstanceChooser 接口:

ServiceInstanceChooser

该接口用来从负载均衡器中获取一个可用的服务,只有一个方法:

public interface ServiceInstanceChooser {        /**         * @param serviceId:服务ID         * @return 可用的服务实例         */        ServiceInstance choose(String serviceId);}

LoadBalancerClient

表示负载均衡的客户端,是一个接口,继承了 ServiceInstanceChooser 接口 ,共有三个方法:

public interface LoadBalancerClient extends ServiceInstanceChooser {        /**         * 执行请求         * @param serviceId :用于查找 LoadBalancer的服务ID         * @param request:允许实现执行前后操作         */         T execute(String serviceId, LoadBalancerRequest request) throws IOException;        /**         * 执行请求         * @param serviceId :用于查找 LoadBalancer的服务ID         * @param serviceInstance :执行请求的服务         * @param request:允许实现执行前后操作         */         T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException;        /**         * 创建具有真实主机和端口的正确URI,有些系统使用带有逻辑服务名的URL作为主机,调用该方法将会使用 host:port 来替换逻辑服务名         * @param instance :用于重建URI的服务实例         * @param original :具有逻辑服务名的URL         * @return A reconstructed URI.         */        URI reconstructURI(ServiceInstance instance, URI original);}

RibbonLoadBalancerClient 实现如下:

主要看下从 ServiceInstanceChooser,LoadBalancerClient 中实现的接口方法

public class RibbonLoadBalancerClient implements LoadBalancerClient {    // 工厂:主要用来创建客户端,创建负载均衡器,进行客户端配置等    // 对于每一个客户端名称都会创建一个Spring ApplicationContext,可以从中获取需要的bean        private SpringClientFactory clientFactory;        protected ILoadBalancer getLoadBalancer(String serviceId) {                return this.clientFactory.getLoadBalancer(serviceId);        }    ..................}public class SpringClientFactory extends NamedContextFactory {    // 获取客户端        public > C getClient(String name, Class clientClass) {                return getInstance(name, clientClass);        }        // 获取负载均衡器        public ILoadBalancer getLoadBalancer(String name) {                return getInstance(name, ILoadBalancer.class);        }        //获取客户端配置        public IClientConfig getClientConfig(String name) {                return getInstance(name, IClientConfig.class);        }        // 获取 RibbonLoadBalancerContext        public RibbonLoadBalancerContext getLoadBalancerContext(String serviceId) {                return getInstance(serviceId, RibbonLoadBalancerContext.class);        }        // 获取对应的bean        public  T getInstance(String name, Class type) {                AnnotationConfigApplicationContext context = getContext(name);                .....                return context.getBean(type);            .....        }}

choose() 方法

该方法主要用来获取一个可用的服务实例

       public ServiceInstance choose(String serviceId, Object hint) {                Server server = getServer(getLoadBalancer(serviceId), hint);                if (server == null) {                        return null;                }        // RibbonServer 实现了 ServiceInstance                return new RibbonServer(serviceId, server, isSecure(server, serviceId),                                serverIntrospector(serviceId).getMetadata(server));        }       // 根据服务ID获取负载均衡器,会调用 SpringClientFactory 的方法进行获取        protected ILoadBalancer getLoadBalancer(String serviceId) {                return this.clientFactory.getLoadBalancer(serviceId);        }       // 根据负载均衡器来获取可用的服务        protected Server getServer(ILoadBalancer loadBalancer, Object hint) {                if (loadBalancer == null) {                        return null;                }                return loadBalancer.chooseServer(hint != null ? hint : "default");        }

最后会调用 ILoadBalancer.chooseServer 来获取可用服务,后面再来说 ILoadBalancer 。

execute() 方法

该方法执行请求

       public  T execute(String serviceId, ServiceInstance serviceInstance,                        LoadBalancerRequest request) throws IOException {                Server server = null;                if (serviceInstance instanceof RibbonServer) {                        server = ((RibbonServer) serviceInstance).getServer();                }                RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);        // 状态记录器,记录着服务的状态                RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);        ...........            T returnVal = request.apply(serviceInstance);                statsRecorder.recordStats(returnVal);                return returnVal;        ...........        }    // apply 方法调用如下,最终返回 ClientHttpResponse        public ListenableFuture intercept(final HttpRequest request,                        final byte[] body, final AsyncClientHttpRequestExecution execution)                        throws IOException {                final URI originalUri = request.getURI();                String serviceName = originalUri.getHost();                return this.loadBalancer.execute(serviceName,                                new LoadBalancerRequest>() {                                        @Override                                        public ListenableFuture apply(                                                        final ServiceInstance instance) throws Exception {                                                HttpRequest serviceRequest = new ServiceRequestWrapper(request,                                                                instance, AsyncLoadBalancerInterceptor.this.loadBalancer);                                                return execution.executeAsync(serviceRequest, body);                                        }                                });        }

以上就是负载均衡器流程图左边部分的原理,接下来看下右边的部分

ILoadBalancer

通过上面的分析,负载均衡器获取一个可用的服务,最终会调用 ILoadBalancer 的 chooseServer 方法,下面就来看下 ILoadBalancer 的实现原理

首先来看下 ILoadBalancer 的整体类图:

在上面的类图中,主要的逻辑实在 BaseLoadBalancer 中实现,而 DynamicServerListLoadBalancer 主要提供动态获取服务列表的能力。

ILoadBalancer

首先来看下 ILoadBalancer,它表示一个负载均衡器接口,

public interface ILoadBalancer {        // 添加服务        public void addServers(List newServers);                //获取服务        public Server chooseServer(Object key);                //标记某个服务下线        public void markServerDown(Server server);        //获取状态为UP的所有可用服务列表    public List getReachableServers();    //获取所有服务列表,包括可用的和不可用的        public List getAllServers();}

AbstractLoadBalancer

实现 ILoadBalancer 接口,提供一些默认实现

public abstract class AbstractLoadBalancer implements ILoadBalancer {    public enum ServerGroup{ALL, STATUS_UP, STATUS_NOT_UP}    public Server chooseServer() {            return chooseServer(null);    }    public abstract List getServerList(ServerGroup serverGroup);    // 获取状态    public abstract LoadBalancerStats getLoadBalancerStats();    }

IClientConfigAware

客户端配置

public interface IClientConfigAware {    public abstract void initWithNiwsConfig(IClientConfig clientConfig);}

BaseLoadBalancer

负载均衡器的主要实现逻辑,在该类中,会根据负载均衡策略 IRule 来获取可用的服务,会通过 IPing 来检测服务的可用性,此外,该类中从 EurkaClient 获取到服务列表后,会在该类中保存下来,会维护所有的服务列表和可用的服务列表。

首先来看下它的一些属性,然后再来看每个对应的方法

public class BaseLoadBalancer extends AbstractLoadBalancer implements        PrimeConnections.PrimeConnectionListener, IClientConfigAware {    // 默认的负载均衡策略:轮询选择服务实例    private final static IRule DEFAULT_RULE = new RoundRobinRule();    protected IRule rule = DEFAULT_RULE;    // 默认 ping 的策略,会调用 IPing 来检测服务是否可用    private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;    protected IPing ping = null;    // 所有服务列表    protected volatile List allServerList = Collections.synchronizedList(new ArrayList());    // 状态为 up 的服务列表    protected volatile List upServerList = Collections.synchronizedList(new ArrayList());    // 锁    protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();    protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();    // 定时任务,去 ping 服务是否可用    protected Timer lbTimer = null;    // ping 的时间间隔,10秒    protected int pingIntervalSeconds = 10;    // ping 的最大次数    protected int maxTotalPingTimeSeconds = 5;    // 负载均衡器的状态    protected LoadBalancerStats lbStats;    // 客户端配置    private IClientConfig config;        // 服务列表变化监听器    private List changeListeners = new CopyOnWriteArrayList();    // 服务状态变化监听器    private List serverStatusListeners = new CopyOnWriteArrayList();    // 构造方法,使用默认的配置来创建负载均衡器,还有其他重载的构造方法,可用根据需要来创建负载均衡器    public BaseLoadBalancer() {        this.name = DEFAULT_NAME;        this.ping = null;        setRule(DEFAULT_RULE);        setupPingTask();        lbStats = new LoadBalancerStats(DEFAULT_NAME);    }   .....................}

在上面的属性中,Ribbon 提供了一些默认的配置:

IClientConfig 表示客户端的配置,实现类为 DefaultClientConfigImpl,在该类中配置了默认的值,:

public class DefaultClientConfigImpl implements IClientConfig {    // ping 的默认策略 DummyPing        public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName();    public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule";    public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";    public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000;    public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9;     .............................................}

IRule 表示 负载均衡策略,即如何去选择服务实例,默认为 RoundRobinRule,即通过轮询的方式选择服务。Ribbon 默认提供的有 7 种。

IPing 表示检测服务是否可用策略,Ribbon 也提供了很多策略,共有 5 种,默认为 DummyPing

关于 IRule 和 IPing 的策略,后面会专门进行研究。

在 BaseLoadBalancer 中,除了提供一个无参的构造方法(使用的是默认的配置)外,还提供了很多重载的构造方法,下面来看下根据客户端的配置来创建BaseLoadBalancer :

// 根据客户端配置来创建 BaseLoadBalancerpublic BaseLoadBalancer(IClientConfig config) {        initWithNiwsConfig(config);}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {        // 负载均衡策略        String ruleClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName);        // ping 策略        String pingClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingClassName);        IRule rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClassName, clientConfig);        IPing ping = (IPing) ClientFactory.instantiateInstanceWithClientConfig(pingClassName, clientConfig);        // 状态        LoadBalancerStats stats = createLoadBalancerStatsFromConfig(clientConfig);        // 初始化配置        initWithConfig(clientConfig, rule, ping, stats);}void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {        this.config = clientConfig;        String clientName = clientConfig.getClientName();        this.name = clientName;                // ping 的周期        int pingIntervalTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingInterval,Integer.parseInt("30")));        // 最大 ping 的次数        int maxTotalPingTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,Integer.parseInt("2")));                setPingInterval(pingIntervalTime);        setMaxTotalPingTime(maxTotalPingTime);        setRule(rule);        setPing(ping);        setLoadBalancerStats(stats);        rule.setLoadBalancer(this);        if (ping instanceof AbstractLoadBalancerPing) {                ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);        }        .................        // 注册监控/可忽略        init();}

在上面的构造方法中,可用根据客户端配置的信息来创建一个BaseLoadBalancer,如客户端可以配置负载均衡策略,ping的策略,ping的时间间隔和最大次数等。

判断服务的可用性

在 Ribbon 中,负载均衡器多久才去更新获取服务列表呢?在 BaseLoadBalancer 类中,有一个 setupPingTask 方法,在该方法内部,会创建 PingTask 定时任务去检测服务的可用性,而 PingTask 又会创建 Pinger 对象,在 Pinger 对象的 runPinger() 方法中,会根据ping策略即 pingerStrategy 的 pingServers(ping, allServer) 来获取服务的可用性,主要逻辑如下:

void setupPingTask() {        if (canSkipPing()) {                return;        }        // 如果已经有了定时任务,则取消        if (lbTimer != null) {                lbTimer.cancel();        }        // 第二个参数为true,表示它是一个deamon线程        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);        // 创建 PingTask, 它继承于 TimerTask,定时执行 run 方法        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);        ......}class PingTask extends TimerTask {        public void run() {                // 默认 pingStrategy = new SerialPingStrategy()                new Pinger(pingStrategy).runPinger();        }}public void runPinger() throws Exception {        // 如果正在ping,则返回        if (!pingInProgress.compareAndSet(false, true)) {                 return; // Ping in progress - nothing to do        }        // 所有的服务,包括不可用的服务        Server[] allServers = null;        boolean[] results = null;        Lock allLock = null;        Lock upLock = null;        try {                allLock = allServerLock.readLock();                allLock.lock();                allServers = allServerList.toArray(new Server[allServerList.size()]);                allLock.unlock();                // 所有服务的数量                int numCandidates = allServers.length;                // 所有服务ping的结果                results = pingerStrategy.pingServers(ping, allServers);                // 状态可用的服务列表                 final List newUpList = new ArrayList();                // 状态改变的服务列表                final List changedServers = new ArrayList();                for (int i = 0; i < numCandidates; i++) {                        // 最新的状态                        boolean isAlive = results[i];                        Server svr = allServers[i];                        // 老的状态                        boolean oldIsAlive = svr.isAlive();                        // 更新状态                        svr.setAlive(isAlive);                                                // 如果状态改变了,则放到集合中,会进行重新拉取                        if (oldIsAlive != isAlive) {                                changedServers.add(svr);                        }                        // 状态可用的服务                        if (isAlive) {                                newUpList.add(svr);                        }                }                upLock = upServerLock.writeLock();                upLock.lock();                upServerList = newUpList;                upLock.unlock();                // 变态改变监听器                notifyServerStatusChangeListener(changedServers);        } finally {                // ping 完成                pingInProgress.set(false);        }}// 检测服务的状态@Overridepublic boolean[] pingServers(IPing ping, Server[] servers) {        int numCandidates = servers.length;        boolean[] results = new boolean[numCandidates];        for (int i = 0; i < numCandidates; i++) {                results[i] = false;                if (ping != null) {                        results[i] = ping.isAlive(servers[i]);                }        }        return results;}

在上面的逻辑中,Ribbon 每10秒向 EurekaClient 发送 ping 来判断服务的可用性,如果服务的可用性发生了改变或服务的数量和之前的不一致,则会更新或重新拉取服务。有了这些服务之后,会根据负载均衡策略 IRule 来选择一个可用的服务。

根据负载均衡策略 IRule 来选择一个可用的服务

在前文说到 Ribbon 客户端 RibbonLoadBalancerClient 选择服务的时候,最终会调用 ILoadBalancer.chooseServer 来选择服务,接下来就来看下这个方法:

public Server chooseServer(Object key) {        .......    //rule= new RoundRobinRule()        return rule.choose(key);        ....}

关于 Ribbon 的负载均衡策略 IRule, Ribbon 提供了 7 种,后面再来分析,现在只需要知道通过 IRule 来选择服务就可以了。

初始化获取所有服务列表

在上面的分析中,Ribbon 会每10秒定时的去检测服务的可用性,如果服务状态发生了变化则重新获取,之后,再根据负载均衡策略 IRule 来选择一个可用的服务;但是,在初始化的时候,是从哪里获取服务列表呢?下面就来分析这个问题

BaseLoadBalancer 有个子类 DynamicServerListLoadBalancer,它具有使用动态源获取服务器列表的功能。即服务器列表在运行时可能会更改。此外,还可以通过条件来过滤掉不符合所需条件的服务。

public class DynamicServerListLoadBalancer extends BaseLoadBalancer {        // 是否正在进行服务列表的更新    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);        // 服务列表    volatile ServerList serverListImpl;        // 服务过滤器    volatile ServerListFilter filter;}

1. 初始化时获取所有服务

在 DynamicServerListLoadBalancer 中,有个 restOfInit 方法,在初始化时进行调用,在该方法中,会从 Eureka 客户端中拉取所有的服务列表:

void restOfInit(IClientConfig clientConfig) {        .............        updateListOfServers();        ........}public void updateListOfServers() {        List servers = new ArrayList();        if (serverListImpl != null) {                // 获取所有服务列表                servers = serverListImpl.getUpdatedListOfServers();                // 根据条件过滤服务                if (filter != null) {                        servers = filter.getFilteredListOfServers(servers);                }        }        updateAllServerList(servers);}protected void updateAllServerList(List ls) {        if (serverListUpdateInProgress.compareAndSet(false, true)) {                try {                        for (T s : ls) {                                s.setAlive(true); // 状态设置为可用                        }                        setServersList(ls);                        super.forceQuickPing(); // 强制检测服务状态                } finally {                        serverListUpdateInProgress.set(false);                }        }}

获取所有服务列表 servers = serverListImpl.getUpdatedListOfServers(); 最终会调用 DiscoveryEnabledNIWSServerList 的方法:

servers = serverListImpl.getUpdatedListOfServers();public List getUpdatedListOfServers(){        return obtainServersViaDiscovery();}private List obtainServersViaDiscovery() {        List serverList = new ArrayList();        ........        // 通过 eurekaClient 来获取注册的服务列表         EurekaClient eurekaClient = eurekaClientProvider.get();        if (vipAddresses!=null){                for (String vipAddress : vipAddresses.split(",")) {                                        List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);                        for (InstanceInfo ii : listOfInstanceInfo) {                                if (ii.getStatus().equals(InstanceStatus.UP)) {                                        .....                                        DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);                                        serverList.add(des);                                }                        }                        ......                }        }        return serverList;}

通过上面方法的分析,Ribbon 最终会通过 EurekaClient 来获取服务列表的,而 EurekaClient 的实现类是 DiscoveryClient,而在 Eureka 中,DiscoveryClient 类具有服务的注册,发现,续约,获取服务列表等功能。

此外,该类中还可以通过过滤器来获取不符合条件的服务。

以上就是 Ribbon 负载均衡器的一个实现原理。最后再来看下流程图,加深印象:

关于Ribbon中怎么使用 LoadBalancer 实现负载均衡就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0