千家信息网

如何用源码分析canal的deployer模块

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。CanalLauncher是启动入口类
千家信息网最后更新 2024年12月13日如何用源码分析canal的deployer模块

这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

  • CanalLauncher是启动入口类

  1. 获取canal.properties配置文件

  2. 如果canal.properties配置文件中属性root.admin.manager有值,那么构造PlainCanalConfigClient,调用PlainCanalConfigClient的findServer获取PlainCanal,调用PlainCanal的getProperties方法获取properties

  3. 通过properties构造 CanalStarter并调用其start方法

CanalStarter是启动类

public synchronized void start() throws Throwable {        //首先根据canal.serverMode构造CanalMQProducer,如果是kafka,构造的是CanalKafkaProducer        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);        if (serverMode.equalsIgnoreCase("kafka")) {            canalMQProducer = new CanalKafkaProducer();        } else if (serverMode.equalsIgnoreCase("rocketmq")) {            canalMQProducer = new CanalRocketMQProducer();        }        if (canalMQProducer != null) {            // disable netty            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");            // 设置为raw避免ByteString->Entry的二次解析            System.setProperty("canal.instance.memory.rawEntry", "false");        }        //接下来构造CanalController并调用其start方法        logger.info("## start the canal server.");        controller = new CanalController(properties);        controller.start();        logger.info("## the canal server is running now ......");        ...        //构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性        if (canalMQProducer != null) {            canalMQStarter = new CanalMQStarter(canalMQProducer);            MQProperties mqProperties = buildMQProperties(properties);            String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);            canalMQStarter.start(mqProperties, destinations);            controller.setCanalMQStarter(canalMQStarter);        }        ...        running = true;    }
  • CanalController是实例调度控制器

public CanalController(final Properties properties){        // 初始化managerClients用于请求admin        managerClients = MigrateMap.makeComputingMap(new Function() {            public PlainCanalConfigClient apply(String managerAddress) {                return getManagerClient(managerAddress);            }        });        // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance        globalInstanceConfig = initGlobalConfig(properties);        instanceConfigs = new MapMaker().makeMap();        // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml        initInstanceConfig(properties);        ...        // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性        embededCanalServer = CanalServerWithEmbedded.instance();        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator        int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));        embededCanalServer.setMetricsPort(metricsPort);        this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);        this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);        embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));        embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));        ...        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);        //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点        if (StringUtils.isNotEmpty(zkServers)) {            zkclientx = ZkClientx.getZkClient(zkServers);            // 初始化系统目录            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);        }        // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例        final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);        ServerRunningMonitors.setServerData(serverData);        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function() {            ...        }));        // 初始化InstanceAction,用于启动和关闭实例        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));        if (autoScan) {            defaultAction = new InstanceAction() {                ...            };            // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function() {                public InstanceConfigMonitor apply(InstanceMode mode) {                    ...                }            });        }    }
  • ManagerInstanceConfigMonitor是实例扫描器

public void start() {        super.start();        //启动定时任务,定时扫描所有instance        executor.scheduleWithFixedDelay(new Runnable() {            public void run() {                try {                    scan();                    if (isFirst) {                        isFirst = false;                    }                } catch (Throwable e) {                    logger.error("scan failed", e);                }            }        }, 0, scanIntervalInSecond, TimeUnit.SECONDS);    }private void scan() {        //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启        String instances = configClient.findInstances(null);        final List is = Lists.newArrayList(StringUtils.split(instances, ','));        List start = Lists.newArrayList();        List stop = Lists.newArrayList();        List restart = Lists.newArrayList();        for (String instance : is) {            if (!configs.containsKey(instance)) {                PlainCanal newPlainCanal = configClient.findInstance(instance, null);                if (newPlainCanal != null) {                    configs.put(instance, newPlainCanal);                    start.add(instance);                }            } else {                PlainCanal plainCanal = configs.get(instance);                PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());                if (newPlainCanal != null) {                    // 配置有变化                    restart.add(instance);                    configs.put(instance, newPlainCanal);                }            }        }        configs.forEach((instance, plainCanal) -> {            if (!is.contains(instance)) {                stop.add(instance);            }        });        stop.forEach(instance -> {            notifyStop(instance);        });        restart.forEach(instance -> {            notifyReload(instance);        });        start.forEach(instance -> {            notifyStart(instance);        });    }private void notifyStart(String destination) {        try {            //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例            defaultAction.start(destination);            actions.put(destination, defaultAction);            // 启动成功后记录配置文件信息        } catch (Throwable e) {            logger.error(String.format("scan add found[%s] but start failed", destination), e);        }    }
  • ServerRunningMonitor是针对server的running实例控制

public ServerRunningMonitor(){        // 创建父节点        dataListener = new IZkDataListener() {            public void handleDataChange(String dataPath, Object data) throws Exception {                MDC.put("destination", destination);                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);                if (!isMine(runningData.getAddress())) {                    mutex.set(false);                }                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active                    releaseRunning();// 彻底释放mainstem                }                activeData = (ServerRunningData) runningData;            }            public void handleDataDeleted(String dataPath) throws Exception {                MDC.put("destination", destination);                mutex.set(false);                if (!release && activeData != null && isMine(activeData.getAddress())) {                    // 如果上一次active的状态就是本机,则即时触发一下active抢占                    initRunning();                } else {                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作                    delayExector.schedule(new Runnable() {                        public void run() {                            initRunning();                        }                    }, delayTime, TimeUnit.SECONDS);                }            }        };    }public synchronized void start() {        super.start();        try {            //首先调用listener的processStart方法            processStart();            if (zkClient != null) {                // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start                // 监视/otteradmin/canal/destinations/{0}/running节点变化                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);                zkClient.subscribeDataChanges(path, dataListener);                                initRunning();            } else {                processActiveEnter();// 没有zk,直接启动            }        } catch (Exception e) {            logger.error("start failed", e);            // 没有正常启动,重置一下状态,避免干扰下一次start            stop();        }    }private void processStart() {        if (listener != null) {            try {                //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port                listener.processStart();            } catch (Exception e) {                logger.error("processStart failed", e);            }        }    }private void initRunning() {        if (!isStart()) {            return;        }        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);        // 序列化        byte[] bytes = JsonUtils.marshalToByte(serverData);        try {            mutex.set(false);            //尝试创建/otteradmin/canal/destinations/{0}/running节点            zkClient.create(path, bytes, CreateMode.EPHEMERAL);            activeData = serverData;            //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例            processActiveEnter();// 触发一下事件            mutex.set(true);            release = false;        } catch (ZkNodeExistsException e) {            bytes = zkClient.readData(path, true);            if (bytes == null) {// 如果不存在节点,立即尝试一次                initRunning();            } else {                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);            }        } catch (ZkNoNodeException e) {            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点            initRunning();        }    }
  • canal.properties配置

canal.register.ip =canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.admin.register.auto = truecanal.admin.register.cluster =

上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0