千家信息网

nacos client中PushReceiver的原理和应用

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解"nacos client中PushReceiver的原理和应用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"nacos client中P
千家信息网最后更新 2025年01月23日nacos client中PushReceiver的原理和应用

本篇内容主要讲解"nacos client中PushReceiver的原理和应用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"nacos client中PushReceiver的原理和应用"吧!

本文主要研究一下nacos client的PushReceiver

PushReceiver

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java

public class PushReceiver implements Runnable {    private ScheduledExecutorService executorService;    private static final int UDP_MSS = 64 * 1024;    private DatagramSocket udpSocket;    private HostReactor hostReactor;    public PushReceiver(HostReactor hostReactor) {        try {            this.hostReactor = hostReactor;            udpSocket = new DatagramSocket();            executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {                @Override                public Thread newThread(Runnable r) {                    Thread thread = new Thread(r);                    thread.setDaemon(true);                    thread.setName("com.alibaba.nacos.naming.push.receiver");                    return thread;                }            });            executorService.execute(this);        } catch (Exception e) {            NAMING_LOGGER.error("[NA] init udp socket failed", e);        }    }    @Override    public void run() {        while (true) {            try {                // byte[] is initialized with 0 full filled by default                byte[] buffer = new byte[UDP_MSS];                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);                udpSocket.receive(packet);                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);                String ack;                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {                    hostReactor.processServiceJSON(pushPacket.data);                    // send ack to server                    ack = "{\"type\": \"push-ack\""                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime                        + "\", \"data\":" + "\"\"}";                } else if ("dump".equals(pushPacket.type)) {                    // dump data to server                    ack = "{\"type\": \"dump-ack\""                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime                        + "\", \"data\":" + "\""                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))                        + "\"}";                } else {                    // do nothing send ack only                    ack = "{\"type\": \"unknown-ack\""                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime                        + "\", \"data\":" + "\"\"}";                }                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));            } catch (Exception e) {                NAMING_LOGGER.error("[NA] error while receiving push data", e);            }        }    }    public static class PushPacket {        public String type;        public long lastRefTime;        public String data;    }    public int getUDPPort() {        return udpSocket.getLocalPort();    }}
  • PushReceiver实现了Runnable接口,其构造器会创建udpSocket以及ScheduledThreadPoolExecutor,然后往ScheduledThreadPoolExecutor注册自己

  • 其run方法使用while true循环来执行udpSocket.receive(packet),之后将接收到的数据解析为PushPacket,然后根据不同pushPacket.type做不同处理

  • 当pushPacket.type为dom或者service的时候会调用hostReactor.processServiceJSON(pushPacket.data);当pushPacket.type为dump的时候会将hostReactor.getServiceInfoMap()序列化到ack中,最后将ack返回去

HostReactor

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java

public class HostReactor {    private static final long DEFAULT_DELAY = 1000L;    private static final long UPDATE_HOLD_INTERVAL = 5000L;    private final Map> futureMap = new HashMap>();    private Map serviceInfoMap;    private Map updatingMap;    private PushReceiver pushReceiver;    private EventDispatcher eventDispatcher;    private NamingProxy serverProxy;    private FailoverReactor failoverReactor;    private String cacheDir;    private ScheduledExecutorService executor;    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {        this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);    }    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,                       boolean loadCacheAtStart, int pollingThreadCount) {        executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {            @Override            public Thread newThread(Runnable r) {                Thread thread = new Thread(r);                thread.setDaemon(true);                thread.setName("com.alibaba.nacos.client.naming.updater");                return thread;            }        });        this.eventDispatcher = eventDispatcher;        this.serverProxy = serverProxy;        this.cacheDir = cacheDir;        if (loadCacheAtStart) {            this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));        } else {            this.serviceInfoMap = new ConcurrentHashMap(16);        }        this.updatingMap = new ConcurrentHashMap();        this.failoverReactor = new FailoverReactor(this, cacheDir);        this.pushReceiver = new PushReceiver(this);    }        //......    public ServiceInfo processServiceJSON(String json) {        ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());        if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {            //empty or error push, just ignore            return oldService;        }        boolean changed = false;        if (oldService != null) {            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()                    + ", new-t: " + serviceInfo.getLastRefTime());            }            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);            Map oldHostMap = new HashMap(oldService.getHosts().size());            for (Instance host : oldService.getHosts()) {                oldHostMap.put(host.toInetAddr(), host);            }            Map newHostMap = new HashMap(serviceInfo.getHosts().size());            for (Instance host : serviceInfo.getHosts()) {                newHostMap.put(host.toInetAddr(), host);            }            Set modHosts = new HashSet();            Set newHosts = new HashSet();            Set remvHosts = new HashSet();            List> newServiceHosts = new ArrayList>(                newHostMap.entrySet());            for (Map.Entry entry : newServiceHosts) {                Instance host = entry.getValue();                String key = entry.getKey();                if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),                    oldHostMap.get(key).toString())) {                    modHosts.add(host);                    continue;                }                if (!oldHostMap.containsKey(key)) {                    newHosts.add(host);                }            }            for (Map.Entry entry : oldHostMap.entrySet()) {                Instance host = entry.getValue();                String key = entry.getKey();                if (newHostMap.containsKey(key)) {                    continue;                }                if (!newHostMap.containsKey(key)) {                    remvHosts.add(host);                }            }            if (newHosts.size() > 0) {                changed = true;                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "                    + serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));            }            if (remvHosts.size() > 0) {                changed = true;                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "                    + serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));            }            if (modHosts.size() > 0) {                changed = true;                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "                    + serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));            }            serviceInfo.setJsonFromServer(json);            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {                eventDispatcher.serviceChanged(serviceInfo);                DiskCache.write(serviceInfo, cacheDir);            }        } else {            changed = true;            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON                .toJSONString(serviceInfo.getHosts()));            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);            eventDispatcher.serviceChanged(serviceInfo);            serviceInfo.setJsonFromServer(json);            DiskCache.write(serviceInfo, cacheDir);        }        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());        if (changed) {            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +                " -> " + JSON.toJSONString(serviceInfo.getHosts()));        }        return serviceInfo;    }    public Map getServiceInfoMap() {        return serviceInfoMap;    }    //......}
  • processServiceJSON方法会对比将接收到serviceInfo与本地对比,然后判断是否变更,并在需要的时候更新本地的serviceInfo并回调eventDispatcher.serviceChanged(serviceInfo)以及DiskCache.write(serviceInfo, cacheDir);HostReactor的构造器有个loadCacheAtStart参数(默认为false),如果为true则会使用DiskCache.read(this.cacheDir)从本地文件读取serviceInfo信息来初始化serviceInfoMap

DiskCache

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/cache/DiskCache.java

public class DiskCache {    public static void write(ServiceInfo dom, String dir) {        try {            makeSureCacheDirExists(dir);            File file = new File(dir, dom.getKeyEncoded());            if (!file.exists()) {                // add another !file.exists() to avoid conflicted creating-new-file from multi-instances                if (!file.createNewFile() && !file.exists()) {                    throw new IllegalStateException("failed to create cache file");                }            }            StringBuilder keyContentBuffer = new StringBuilder("");            String json = dom.getJsonFromServer();            if (StringUtils.isEmpty(json)) {                json = JSON.toJSONString(dom);            }            keyContentBuffer.append(json);            //Use the concurrent API to ensure the consistency.            ConcurrentDiskUtil.writeFileContent(file, keyContentBuffer.toString(), Charset.defaultCharset().toString());        } catch (Throwable e) {            NAMING_LOGGER.error("[NA] failed to write cache for dom:" + dom.getName(), e);        }    }    public static String getLineSeparator() {        return System.getProperty("line.separator");    }    public static Map read(String cacheDir) {        Map domMap = new HashMap(16);        BufferedReader reader = null;        try {            File[] files = makeSureCacheDirExists(cacheDir).listFiles();            if (files == null || files.length == 0) {                return domMap;            }            for (File file : files) {                if (!file.isFile()) {                    continue;                }                String fileName = URLDecoder.decode(file.getName(), "UTF-8");                if (!(fileName.endsWith(Constants.SERVICE_INFO_SPLITER + "meta") || fileName.endsWith(                    Constants.SERVICE_INFO_SPLITER + "special-url"))) {                    ServiceInfo dom = new ServiceInfo(fileName);                    List ips = new ArrayList();                    dom.setHosts(ips);                    ServiceInfo newFormat = null;                    try {                        String dataString = ConcurrentDiskUtil.getFileContent(file,                            Charset.defaultCharset().toString());                        reader = new BufferedReader(new StringReader(dataString));                        String json;                        while ((json = reader.readLine()) != null) {                            try {                                if (!json.startsWith("{")) {                                    continue;                                }                                newFormat = JSON.parseObject(json, ServiceInfo.class);                                if (StringUtils.isEmpty(newFormat.getName())) {                                    ips.add(JSON.parseObject(json, Instance.class));                                }                            } catch (Throwable e) {                                NAMING_LOGGER.error("[NA] error while parsing cache file: " + json, e);                            }                        }                    } catch (Exception e) {                        NAMING_LOGGER.error("[NA] failed to read cache for dom: " + file.getName(), e);                    } finally {                        try {                            if (reader != null) {                                reader.close();                            }                        } catch (Exception e) {                            //ignore                        }                    }                    if (newFormat != null && !StringUtils.isEmpty(newFormat.getName()) && !CollectionUtils.isEmpty(                        newFormat.getHosts())) {                        domMap.put(dom.getKey(), newFormat);                    } else if (!CollectionUtils.isEmpty(dom.getHosts())) {                        domMap.put(dom.getKey(), dom);                    }                }            }        } catch (Throwable e) {            NAMING_LOGGER.error("[NA] failed to read cache file", e);        }        return domMap;    }    private static File makeSureCacheDirExists(String dir) {        File cacheDir = new File(dir);        if (!cacheDir.exists() && !cacheDir.mkdirs()) {            throw new IllegalStateException("failed to create cache dir: " + dir);        }        return cacheDir;    }}
  • DiskCache的write方法会将serviceInfo写入到dir文件夹下面,文件名为serviceInfo.getKeyEncoded();read方法则是读取dir文件夹下面的文件然后解析为一个个ServiceInfo然后放到domMap,最后返回domMap

小结

  • PushReceiver实现了Runnable接口,其构造器会创建udpSocket以及ScheduledThreadPoolExecutor,然后往ScheduledThreadPoolExecutor注册自己

  • 其run方法使用while true循环来执行udpSocket.receive(packet),之后将接收到的数据解析为PushPacket,然后根据不同pushPacket.type做不同处理

  • 当pushPacket.type为dom或者service的时候会调用hostReactor.processServiceJSON(pushPacket.data);当pushPacket.type为dump的时候会将hostReactor.getServiceInfoMap()序列化到ack中,最后将ack返回去

到此,相信大家对"nacos client中PushReceiver的原理和应用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0