如何用源码分析canal的deployer模块
发表于:2024-10-05 作者:千家信息网编辑
千家信息网最后更新 2024年10月05日,这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。CanalLauncher是启动入口类
千家信息网最后更新 2024年10月05日如何用源码分析canal的deployer模块
这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
CanalLauncher是启动入口类
获取canal.properties配置文件
如果canal.properties配置文件中属性root.admin.manager有值,那么构造PlainCanalConfigClient,调用PlainCanalConfigClient的findServer获取PlainCanal,调用PlainCanal的getProperties方法获取properties
通过properties构造 CanalStarter并调用其start方法
CanalStarter是启动类
public synchronized void start() throws Throwable { //首先根据canal.serverMode构造CanalMQProducer,如果是kafka,构造的是CanalKafkaProducer String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); if (serverMode.equalsIgnoreCase("kafka")) { canalMQProducer = new CanalKafkaProducer(); } else if (serverMode.equalsIgnoreCase("rocketmq")) { canalMQProducer = new CanalRocketMQProducer(); } if (canalMQProducer != null) { // disable netty System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true"); // 设置为raw避免ByteString->Entry的二次解析 System.setProperty("canal.instance.memory.rawEntry", "false"); } //接下来构造CanalController并调用其start方法 logger.info("## start the canal server."); controller = new CanalController(properties); controller.start(); logger.info("## the canal server is running now ......"); ... //构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性 if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); MQProperties mqProperties = buildMQProperties(properties); String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS); canalMQStarter.start(mqProperties, destinations); controller.setCanalMQStarter(canalMQStarter); } ... running = true; }
CanalController是实例调度控制器
public CanalController(final Properties properties){ // 初始化managerClients用于请求admin managerClients = MigrateMap.makeComputingMap(new Function() { public PlainCanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml initInstanceConfig(properties); ... // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性 embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); embededCanalServer.setMetricsPort(metricsPort); this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER); this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD)); ... final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点 if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系统目录 zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例 final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function () { ... })); // 初始化InstanceAction,用于启动和关闭实例 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { ... }; // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance instanceConfigMonitors = MigrateMap.makeComputingMap(new Function () { public InstanceConfigMonitor apply(InstanceMode mode) { ... } }); } }
ManagerInstanceConfigMonitor是实例扫描器
public void start() { super.start(); //启动定时任务,定时扫描所有instance executor.scheduleWithFixedDelay(new Runnable() { public void run() { try { scan(); if (isFirst) { isFirst = false; } } catch (Throwable e) { logger.error("scan failed", e); } } }, 0, scanIntervalInSecond, TimeUnit.SECONDS); }private void scan() { //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启 String instances = configClient.findInstances(null); final Listis = Lists.newArrayList(StringUtils.split(instances, ',')); List start = Lists.newArrayList(); List stop = Lists.newArrayList(); List restart = Lists.newArrayList(); for (String instance : is) { if (!configs.containsKey(instance)) { PlainCanal newPlainCanal = configClient.findInstance(instance, null); if (newPlainCanal != null) { configs.put(instance, newPlainCanal); start.add(instance); } } else { PlainCanal plainCanal = configs.get(instance); PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5()); if (newPlainCanal != null) { // 配置有变化 restart.add(instance); configs.put(instance, newPlainCanal); } } } configs.forEach((instance, plainCanal) -> { if (!is.contains(instance)) { stop.add(instance); } }); stop.forEach(instance -> { notifyStop(instance); }); restart.forEach(instance -> { notifyReload(instance); }); start.forEach(instance -> { notifyStart(instance); }); }private void notifyStart(String destination) { try { //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例 defaultAction.start(destination); actions.put(destination, defaultAction); // 启动成功后记录配置文件信息 } catch (Throwable e) { logger.error(String.format("scan add found[%s] but start failed", destination), e); } }
ServerRunningMonitor是针对server的running实例控制
public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; }public synchronized void start() { super.start(); try { //首先调用listener的processStart方法 processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start // 监视/otteradmin/canal/destinations/{0}/running节点变化 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } }private void processStart() { if (listener != null) { try { //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port listener.processStart(); } catch (Exception e) { logger.error("processStart failed", e); } } }private void initRunning() { if (!isStart()) { return; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); //尝试创建/otteradmin/canal/destinations/{0}/running节点 zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例 processActiveEnter();// 触发一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }
canal.properties配置
canal.register.ip =canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.admin.register.auto = truecanal.admin.register.cluster =
上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
实例
方法
节点
配置
分析
就是
属性
文件
尝试
模块
源码
成功
全局
内容
机器
状态
本机
变化
控制
频繁
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器检测软件
网络安全项目技术方案书
网络安全公安考试
平安和招商软件开发
怎么让数据库主键编号
js如何删除数据库数据
我的世界服务器公告版
实体框架数据库提供程序
服务器运维价格
贵池区智能软件开发服务设备
mysql 数据库端口
宝山区java软件开发
辽宁网络技术三级考试
网络安全问题怎么防
环保软件开发的前景
ue4专用服务器显卡
我的世界2b2t服务器怎么打
软件开发架构师培训课程
郑州网络安全产业
软件开发是理科还工科
按键精灵如何修改表格数据库
软件开发类机械设备
Linux服务器命令无法使用
电脑网络安全科技馆厦门
中国碳的核算数据库
数据库中的core
地籍数据库报告
数据库sql 唯一约束
加强网络安全建设 为实现
思科网络技术学院视频