如何用源码分析canal的deployer模块
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。CanalLauncher是启动入口类
千家信息网最后更新 2025年01月31日如何用源码分析canal的deployer模块
这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
CanalLauncher是启动入口类
获取canal.properties配置文件
如果canal.properties配置文件中属性root.admin.manager有值,那么构造PlainCanalConfigClient,调用PlainCanalConfigClient的findServer获取PlainCanal,调用PlainCanal的getProperties方法获取properties
通过properties构造 CanalStarter并调用其start方法
CanalStarter是启动类
public synchronized void start() throws Throwable { //首先根据canal.serverMode构造CanalMQProducer,如果是kafka,构造的是CanalKafkaProducer String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); if (serverMode.equalsIgnoreCase("kafka")) { canalMQProducer = new CanalKafkaProducer(); } else if (serverMode.equalsIgnoreCase("rocketmq")) { canalMQProducer = new CanalRocketMQProducer(); } if (canalMQProducer != null) { // disable netty System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true"); // 设置为raw避免ByteString->Entry的二次解析 System.setProperty("canal.instance.memory.rawEntry", "false"); } //接下来构造CanalController并调用其start方法 logger.info("## start the canal server."); controller = new CanalController(properties); controller.start(); logger.info("## the canal server is running now ......"); ... //构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性 if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); MQProperties mqProperties = buildMQProperties(properties); String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS); canalMQStarter.start(mqProperties, destinations); controller.setCanalMQStarter(canalMQStarter); } ... running = true; }
CanalController是实例调度控制器
public CanalController(final Properties properties){ // 初始化managerClients用于请求admin managerClients = MigrateMap.makeComputingMap(new Function() { public PlainCanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml initInstanceConfig(properties); ... // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性 embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); embededCanalServer.setMetricsPort(metricsPort); this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER); this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD)); ... final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点 if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系统目录 zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例 final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function () { ... })); // 初始化InstanceAction,用于启动和关闭实例 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { ... }; // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance instanceConfigMonitors = MigrateMap.makeComputingMap(new Function () { public InstanceConfigMonitor apply(InstanceMode mode) { ... } }); } }
ManagerInstanceConfigMonitor是实例扫描器
public void start() { super.start(); //启动定时任务,定时扫描所有instance executor.scheduleWithFixedDelay(new Runnable() { public void run() { try { scan(); if (isFirst) { isFirst = false; } } catch (Throwable e) { logger.error("scan failed", e); } } }, 0, scanIntervalInSecond, TimeUnit.SECONDS); }private void scan() { //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启 String instances = configClient.findInstances(null); final Listis = Lists.newArrayList(StringUtils.split(instances, ',')); List start = Lists.newArrayList(); List stop = Lists.newArrayList(); List restart = Lists.newArrayList(); for (String instance : is) { if (!configs.containsKey(instance)) { PlainCanal newPlainCanal = configClient.findInstance(instance, null); if (newPlainCanal != null) { configs.put(instance, newPlainCanal); start.add(instance); } } else { PlainCanal plainCanal = configs.get(instance); PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5()); if (newPlainCanal != null) { // 配置有变化 restart.add(instance); configs.put(instance, newPlainCanal); } } } configs.forEach((instance, plainCanal) -> { if (!is.contains(instance)) { stop.add(instance); } }); stop.forEach(instance -> { notifyStop(instance); }); restart.forEach(instance -> { notifyReload(instance); }); start.forEach(instance -> { notifyStart(instance); }); }private void notifyStart(String destination) { try { //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例 defaultAction.start(destination); actions.put(destination, defaultAction); // 启动成功后记录配置文件信息 } catch (Throwable e) { logger.error(String.format("scan add found[%s] but start failed", destination), e); } }
ServerRunningMonitor是针对server的running实例控制
public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; }public synchronized void start() { super.start(); try { //首先调用listener的processStart方法 processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start // 监视/otteradmin/canal/destinations/{0}/running节点变化 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } }private void processStart() { if (listener != null) { try { //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port listener.processStart(); } catch (Exception e) { logger.error("processStart failed", e); } } }private void initRunning() { if (!isStart()) { return; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); //尝试创建/otteradmin/canal/destinations/{0}/running节点 zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例 processActiveEnter();// 触发一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }
canal.properties配置
canal.register.ip =canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.admin.register.auto = truecanal.admin.register.cluster =
上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
实例
方法
节点
配置
分析
就是
属性
文件
尝试
模块
源码
成功
全局
内容
机器
状态
本机
变化
控制
频繁
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发搭建服务器
广州网络安全优选柚米
优合信网络技术怎么样
残忍视频软件开发
北京卫视 网络安全
您对网络安全的认识
湖北什么是软件开发标准
重视网络安全依法文明上网
网络安全法的第一条件的是
汕头应用软件开发常见问题
国外手机 网络安全
网络安全和网络意识形态关系
1u服务器多少钱
上海地铁网络安全系统
网站后台服务器配置
数据库注释后效果
数据库一个表是什么主键
2008服务器文件共享
成都龙擎网络技术公司怎么样
数据库数据暴力破解
远程服务器需要重启
软件开发学校好还是培训班好
如何赋予数据库远程连接权限
TBC怀旧服热门服务器
宜兴进口软件开发市场价格
车联网软件开发需要什么技术
校园网络安全告知书
数据库可以营销推送吗
易语言如何保证数据库安全
大学生互联网科技扶贫ppt