千家信息网

RocketMQ设计之故障规避机制的示例分析

发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,这篇文章给大家分享的是有关RocketMQ设计之故障规避机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。NameServer为了简化和客户端通信,发现Broke
千家信息网最后更新 2025年01月18日RocketMQ设计之故障规避机制的示例分析

这篇文章给大家分享的是有关RocketMQ设计之故障规避机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外

MQFaultStrategy类的:

public class MQFaultStrategy {    private final static InternalLogger log = ClientLogger.getLog();    private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();    private boolean sendLatencyFaultEnable = false;    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};    public long[] getNotAvailableDuration() {        return notAvailableDuration;    }    public void setNotAvailableDuration(final long[] notAvailableDuration) {        this.notAvailableDuration = notAvailableDuration;    }    public long[] getLatencyMax() {        return latencyMax;    }    public void setLatencyMax(final long[] latencyMax) {        this.latencyMax = latencyMax;    }    public boolean isSendLatencyFaultEnable() {        return sendLatencyFaultEnable;    }    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {        this.sendLatencyFaultEnable = sendLatencyFaultEnable;    }    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        //是否开启故障延迟机制        if (this.sendLatencyFaultEnable) {            try {                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0)                        pos = 0;                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    //判断Queue是否可用                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                            return mq;                    }                }                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {                        mq.setBrokerName(notBestBroker);                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                    }                    return mq;                } else {                    latencyFaultTolerance.remove(notBestBroker);                }            } catch (Exception e) {                log.error("Error occurred when selecting message queue", e);            }            return tpInfo.selectOneMessageQueue();        }        //默认轮询        return tpInfo.selectOneMessageQueue(lastBrokerName);    }    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {        if (this.sendLatencyFaultEnable) {            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);        }    }    private long computeNotAvailableDuration(final long currentLatency) {        for (int i = latencyMax.length - 1; i >= 0; i--) {            if (currentLatency >= latencyMax[i])                return this.notAvailableDuration[i];        }        return 0;    }}

在选择查找路由时,选择消息队列的关键步骤:

  • 先按轮询算法选择一个消息队列

  • 从故障列表判断该消息队列是否可用

LatencyFaultToleranceImpl中判断是否可用:

@Overridepublic boolean isAvailable(final String name) {    final FaultItem faultItem = this.faultItemTable.get(name);    if (faultItem != null) {        return faultItem.isAvailable();    }    return true;}public boolean isAvailable() {            return (System.currentTimeMillis() - startTimestamp) >= 0;        }
  • 判断是否在故障列表中,不在故障列表中代表可用。

  • 在故障列表中判断当前时间是否大于等于故障规避的开始时间startTimestamp

在消息发送结束后和发送出现异常时调用updateFaultItem()方法来更新故障列表,computeNotAvailableDuration()根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。

LatencyFaultToleranceImpl类的updateFaultItem方法:

@Overridepublic void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {    FaultItem old = this.faultItemTable.get(name);    if (null == old) {        final FaultItem faultItem = new FaultItem(name);        faultItem.setCurrentLatency(currentLatency);        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        //加入故障列表        old = this.faultItemTable.putIfAbsent(name, faultItem);        if (old != null) {            old.setCurrentLatency(currentLatency);            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        }    } else {        old.setCurrentLatency(currentLatency);        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);    }}

FaultItem存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用

感谢各位的阅读!关于"RocketMQ设计之故障规避机制的示例分析"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

0