如何用源码分析canal的deployer模块
发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。CanalLauncher是启动入口类
千家信息网最后更新 2024年12月13日如何用源码分析canal的deployer模块
这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
CanalLauncher是启动入口类
获取canal.properties配置文件
如果canal.properties配置文件中属性root.admin.manager有值,那么构造PlainCanalConfigClient,调用PlainCanalConfigClient的findServer获取PlainCanal,调用PlainCanal的getProperties方法获取properties
通过properties构造 CanalStarter并调用其start方法
CanalStarter是启动类
public synchronized void start() throws Throwable { //首先根据canal.serverMode构造CanalMQProducer,如果是kafka,构造的是CanalKafkaProducer String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); if (serverMode.equalsIgnoreCase("kafka")) { canalMQProducer = new CanalKafkaProducer(); } else if (serverMode.equalsIgnoreCase("rocketmq")) { canalMQProducer = new CanalRocketMQProducer(); } if (canalMQProducer != null) { // disable netty System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true"); // 设置为raw避免ByteString->Entry的二次解析 System.setProperty("canal.instance.memory.rawEntry", "false"); } //接下来构造CanalController并调用其start方法 logger.info("## start the canal server."); controller = new CanalController(properties); controller.start(); logger.info("## the canal server is running now ......"); ... //构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性 if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); MQProperties mqProperties = buildMQProperties(properties); String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS); canalMQStarter.start(mqProperties, destinations); controller.setCanalMQStarter(canalMQStarter); } ... running = true; }
CanalController是实例调度控制器
public CanalController(final Properties properties){ // 初始化managerClients用于请求admin managerClients = MigrateMap.makeComputingMap(new Function() { public PlainCanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml initInstanceConfig(properties); ... // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性 embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); embededCanalServer.setMetricsPort(metricsPort); this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER); this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD)); ... final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点 if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系统目录 zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例 final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function () { ... })); // 初始化InstanceAction,用于启动和关闭实例 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { ... }; // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance instanceConfigMonitors = MigrateMap.makeComputingMap(new Function () { public InstanceConfigMonitor apply(InstanceMode mode) { ... } }); } }
ManagerInstanceConfigMonitor是实例扫描器
public void start() { super.start(); //启动定时任务,定时扫描所有instance executor.scheduleWithFixedDelay(new Runnable() { public void run() { try { scan(); if (isFirst) { isFirst = false; } } catch (Throwable e) { logger.error("scan failed", e); } } }, 0, scanIntervalInSecond, TimeUnit.SECONDS); }private void scan() { //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启 String instances = configClient.findInstances(null); final Listis = Lists.newArrayList(StringUtils.split(instances, ',')); List start = Lists.newArrayList(); List stop = Lists.newArrayList(); List restart = Lists.newArrayList(); for (String instance : is) { if (!configs.containsKey(instance)) { PlainCanal newPlainCanal = configClient.findInstance(instance, null); if (newPlainCanal != null) { configs.put(instance, newPlainCanal); start.add(instance); } } else { PlainCanal plainCanal = configs.get(instance); PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5()); if (newPlainCanal != null) { // 配置有变化 restart.add(instance); configs.put(instance, newPlainCanal); } } } configs.forEach((instance, plainCanal) -> { if (!is.contains(instance)) { stop.add(instance); } }); stop.forEach(instance -> { notifyStop(instance); }); restart.forEach(instance -> { notifyReload(instance); }); start.forEach(instance -> { notifyStart(instance); }); }private void notifyStart(String destination) { try { //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例 defaultAction.start(destination); actions.put(destination, defaultAction); // 启动成功后记录配置文件信息 } catch (Throwable e) { logger.error(String.format("scan add found[%s] but start failed", destination), e); } }
ServerRunningMonitor是针对server的running实例控制
public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; }public synchronized void start() { super.start(); try { //首先调用listener的processStart方法 processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start // 监视/otteradmin/canal/destinations/{0}/running节点变化 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } }private void processStart() { if (listener != null) { try { //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port listener.processStart(); } catch (Exception e) { logger.error("processStart failed", e); } } }private void initRunning() { if (!isStart()) { return; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); //尝试创建/otteradmin/canal/destinations/{0}/running节点 zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例 processActiveEnter();// 触发一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }
canal.properties配置
canal.register.ip =canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.admin.register.auto = truecanal.admin.register.cluster =
上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
实例
方法
节点
配置
分析
就是
属性
文件
尝试
模块
源码
成功
全局
内容
机器
状态
本机
变化
控制
频繁
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
云锁服务器设置安全登录
广西格林根软件开发
易华录网络安全与发展研究所
上海交友软件开发哪里好
数据库连接怎么实现
分布式数据库数据获取
检验软件开发质量管理
通过m ysql创建数据库
浙江三维人口管理系统软件开发
网络安全技术与实训参考文献
计算机网络技术毕业个人简历
sysbase数据库
邮件服务器 域名
软件开发潜能
远古残破服务器小说
以下为常用的中文期刊数据库的是
保障网络安全需要从广义层面
义宁网络安全导师
服务器管理界面怎样打开
广义的网络安全保证性是指
宝山区网络营销软件开发咨询热线
个人网络安全体系如何构建
邢台市网络安全工作
浙江新闻网络安全
公安部信息网络安全报警网
网络安全的心得怎么写
五邑大学研究生数据库原理
网络技术组职责
科技互联网网站
魔兽世界怀旧服审判服务器比例