千家信息网

nacos server中PushService的原理和应用

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章主要讲解了"nacos server中PushService的原理和应用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"nacos server
千家信息网最后更新 2025年01月31日nacos server中PushService的原理和应用

这篇文章主要讲解了"nacos server中PushService的原理和应用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"nacos server中PushService的原理和应用"吧!

本文主要研究一下nacos server的PushService

PushService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Componentpublic class PushService implements ApplicationContextAware, ApplicationListener {    @Autowired    private SwitchDomain switchDomain;    private ApplicationContext applicationContext;    private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);    private static final int MAX_RETRY_TIMES = 1;    private static volatile ConcurrentMap ackMap        = new ConcurrentHashMap();    private static ConcurrentMap> clientMap        = new ConcurrentHashMap>();    private static volatile ConcurrentHashMap udpSendTimeMap = new ConcurrentHashMap();    public static volatile ConcurrentHashMap pushCostMap = new ConcurrentHashMap();    private static int totalPush = 0;    private static int failedPush = 0;    private static ConcurrentHashMap lastPushMillisMap = new ConcurrentHashMap<>();    private static DatagramSocket udpSocket;    private static Map futureMap = new ConcurrentHashMap<>();    private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setDaemon(true);            t.setName("com.alibaba.nacos.naming.push.retransmitter");            return t;        }    });    private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {        @Override        public Thread newThread(Runnable r) {            Thread t = new Thread(r);            t.setDaemon(true);            t.setName("com.alibaba.nacos.naming.push.udpSender");            return t;        }    });    static {        try {            udpSocket = new DatagramSocket();            Receiver receiver = new Receiver();            Thread inThread = new Thread(receiver);            inThread.setDaemon(true);            inThread.setName("com.alibaba.nacos.naming.push.receiver");            inThread.start();            executorService.scheduleWithFixedDelay(new Runnable() {                @Override                public void run() {                    try {                        removeClientIfZombie();                    } catch (Throwable e) {                        Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");                    }                }            }, 0, 20, TimeUnit.SECONDS);        } catch (SocketException e) {            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");        }    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }    //......    public static void removeClientIfZombie() {        int size = 0;        for (Map.Entry> entry : clientMap.entrySet()) {            ConcurrentMap clientConcurrentMap = entry.getValue();            for (Map.Entry entry1 : clientConcurrentMap.entrySet()) {                PushClient client = entry1.getValue();                if (client.zombie()) {                    clientConcurrentMap.remove(entry1.getKey());                }            }            size += clientConcurrentMap.size();        }        if (Loggers.PUSH.isDebugEnabled()) {            Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", size);        }    }    //......}
  • PushService实现了ApplicationContextAware、ApplicationListener接口;它有两个ScheduledExecutorService,一个用于retransmitter,一个用于udpSender;其static代码块创建了一个deamon线程执行Receiver,同时注册了一个定时任务执行removeClientIfZombie,它会遍历clientMap,移除zombie的client

Receiver

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public static class Receiver implements Runnable {        @Override        public void run() {            while (true) {                byte[] buffer = new byte[1024 * 64];                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);                try {                    udpSocket.receive(packet);                    String json = new String(packet.getData(), 0, packet.getLength(), Charset.forName("UTF-8")).trim();                    AckPacket ackPacket = JSON.parseObject(json, AckPacket.class);                    InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();                    String ip = socketAddress.getAddress().getHostAddress();                    int port = socketAddress.getPort();                    if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {                        Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);                    }                    String ackKey = getACKKey(ip, port, ackPacket.lastRefTime);                    AckEntry ackEntry = ackMap.remove(ackKey);                    if (ackEntry == null) {                        throw new IllegalStateException("unable to find ackEntry for key: " + ackKey                            + ", ack json: " + json);                    }                    long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);                    Loggers.PUSH.info("received ack: {} from: {}:, cost: {} ms, unacked: {}, total push: {}",                        json, ip, port, pushCost, ackMap.size(), totalPush);                    pushCostMap.put(ackKey, pushCost);                    udpSendTimeMap.remove(ackKey);                } catch (Throwable e) {                    Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);                }            }        }        //......        public static class AckPacket {            public String type;            public long lastRefTime;            public String data;        }    }
  • Receiver实现了Runnable接口,其run方法使用while true循环来执行udpSocket.receive,之后解析AckPacket,从ackMap移除该ackKey,更新pushCostMap,同时从udpSendTimeMap移除该ackKey

PushClient

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public class PushClient {        private String namespaceId;        private String serviceName;        private String clusters;        private String agent;        private String tenant;        private String app;        private InetSocketAddress socketAddr;        private DataSource dataSource;        private Map params;        public Map getParams() {            return params;        }        public void setParams(Map params) {            this.params = params;        }        public long lastRefTime = System.currentTimeMillis();        public PushClient(String namespaceId,                          String serviceName,                          String clusters,                          String agent,                          InetSocketAddress socketAddr,                          DataSource dataSource,                          String tenant,                          String app) {            this.namespaceId = namespaceId;            this.serviceName = serviceName;            this.clusters = clusters;            this.agent = agent;            this.socketAddr = socketAddr;            this.dataSource = dataSource;            this.tenant = tenant;            this.app = app;        }        public DataSource getDataSource() {            return dataSource;        }        public PushClient(InetSocketAddress socketAddr) {            this.socketAddr = socketAddr;        }        public boolean zombie() {            return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);        }        @Override        public String toString() {            return "serviceName: " + serviceName                + ", clusters: " + clusters                + ", ip: " + socketAddr.getAddress().getHostAddress()                + ", port: " + socketAddr.getPort()                + ", agent: " + agent;        }        public String getAgent() {            return agent;        }        public String getAddrStr() {            return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort();        }        public String getIp() {            return socketAddr.getAddress().getHostAddress();        }        @Override        public int hashCode() {            return Objects.hash(serviceName, clusters, socketAddr);        }        @Override        public boolean equals(Object obj) {            if (!(obj instanceof PushClient)) {                return false;            }            PushClient other = (PushClient) obj;            return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr);        }        public String getClusters() {            return clusters;        }        public void setClusters(String clusters) {            this.clusters = clusters;        }        public String getNamespaceId() {            return namespaceId;        }        public void setNamespaceId(String namespaceId) {            this.namespaceId = namespaceId;        }        public String getServiceName() {            return serviceName;        }        public void setServiceName(String serviceName) {            this.serviceName = serviceName;        }        public String getTenant() {            return tenant;        }        public void setTenant(String tenant) {            this.tenant = tenant;        }        public String getApp() {            return app;        }        public void setApp(String app) {            this.app = app;        }        public InetSocketAddress getSocketAddr() {            return socketAddr;        }        public void refresh() {            lastRefTime = System.currentTimeMillis();        }    }
  • PushClient封装了要推送的目标服务地址等信息,它提供了zombie方法来判断目标服务是否zombie,它判断距离lastRefTime的时间差是否超过switchDomain指定的该serviceName的PushCacheMillis(默认为10秒),超过则判定为zombie

PushService.onApplicationEvent

@Componentpublic class PushService implements ApplicationContextAware, ApplicationListener {        //......    @Override    public void onApplicationEvent(ServiceChangeEvent event) {        Service service = event.getService();        String serviceName = service.getName();        String namespaceId = service.getNamespaceId();        Future future = udpSender.schedule(new Runnable() {            @Override            public void run() {                try {                    Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");                    ConcurrentMap clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));                    if (MapUtils.isEmpty(clients)) {                        return;                    }                    Map cache = new HashMap<>(16);                    long lastRefTime = System.nanoTime();                    for (PushClient client : clients.values()) {                        if (client.zombie()) {                            Loggers.PUSH.debug("client is zombie: " + client.toString());                            clients.remove(client.toString());                            Loggers.PUSH.debug("client is zombie: " + client.toString());                            continue;                        }                        Receiver.AckEntry ackEntry;                        Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());                        String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());                        byte[] compressData = null;                        Map data = null;                        if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {                            org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);                            compressData = (byte[]) (pair.getValue0());                            data = (Map) pair.getValue1();                            Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());                        }                        if (compressData != null) {                            ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);                        } else {                            ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);                            if (ackEntry != null) {                                cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));                            }                        }                        Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",                            client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));                        udpPush(ackEntry);                    }                } catch (Exception e) {                    Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);                } finally {                    futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));                }            }        }, 1000, TimeUnit.MILLISECONDS);        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);    }    //......    public void serviceChanged(Service service) {        // merge some change events to reduce the push frequency:        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {            return;        }        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));    }    //......    }
  • onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent

PushService.udpPush

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

@Componentpublic class PushService implements ApplicationContextAware, ApplicationListener {        //......    public static class Receiver implements Runnable {            //......        public static class AckEntry {            public AckEntry(String key, DatagramPacket packet) {                this.key = key;                this.origin = packet;            }            public void increaseRetryTime() {                retryTimes.incrementAndGet();            }            public int getRetryTimes() {                return retryTimes.get();            }            public String key;            public DatagramPacket origin;            private AtomicInteger retryTimes = new AtomicInteger(0);            public Map data;        }            //......    }           private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {        if (ackEntry == null) {            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");            return null;        }        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);            ackMap.remove(ackEntry.key);            udpSendTimeMap.remove(ackEntry.key);            failedPush += 1;            return ackEntry;        }        try {            if (!ackMap.containsKey(ackEntry.key)) {                totalPush++;            }            ackMap.put(ackEntry.key, ackEntry);            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());            Loggers.PUSH.info("send udp packet: " + ackEntry.key);            udpSocket.send(ackEntry.origin);            ackEntry.increaseRetryTime();            executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),                TimeUnit.MILLISECONDS);            return ackEntry;        } catch (Exception e) {            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",                ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);            ackMap.remove(ackEntry.key);            udpSendTimeMap.remove(ackEntry.key);            failedPush += 1;            return null;        }    }        //......}
  • udpPush方法会根据Receiver.AckEntry的信息进行判断,如果其重试次数大于MAX_RETRY_TIMES则终止push,将其从ackMap、udpSendTimeMap中移除;如果可以重试则将其ackEntry.key放入ackMap及udpSendTimeMap,然后执行udpSocket.send(ackEntry.origin)及ackEntry.increaseRetryTime(),并注册Retransmitter的延时任务;如果出现异常则将其从ackMap、udpSendTimeMap移除

Retransmitter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java

    public static class Retransmitter implements Runnable {        Receiver.AckEntry ackEntry;        public Retransmitter(Receiver.AckEntry ackEntry) {            this.ackEntry = ackEntry;        }        @Override        public void run() {            if (ackMap.containsKey(ackEntry.key)) {                Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);                udpPush(ackEntry);            }        }    }
  • Retransmitter实现了Runnable方法,其run方法在ackMap包含ackEntry.key的条件下执行udpPush重试

小结

  • PushService实现了ApplicationContextAware、ApplicationListener接口

  • 其static代码块创建了一个deamon线程执行Receiver,同时注册了一个定时任务执行removeClientIfZombie,它会遍历clientMap,移除zombie的client

  • 其onApplicationEvent会处理ServiceChangeEvent,它会注册一个延时任务并将该future放入futureMap;该延时任务会从clientMap获取指定namespaceId, serviceName的clients;然后遍历clients判断是否是zombie,如果是的话则移除该client,否则创建Receiver.AckEntry,然后执行udpPush(ackEntry),最后从futureMap移除该future;serviceChanged方法提供给外部调用发布ServiceChangeEvent

感谢各位的阅读,以上就是"nacos server中PushService的原理和应用"的内容了,经过本文的学习后,相信大家对nacos server中PushService的原理和应用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0