Alibaba Canal Manager Model 配置管理实现

Alibaba Canal Manager Model 配置管理实现

Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 alibaba/canal
其中 Server 端配置有两种管理方式: Spring 和 Manager。其中 Spring 方式是基于spring xml + properties 进行定义构建 spring 配置, Manager 方式则可以对接 Web console/manager 系统。本文主要记录一下 Manager 方式的对接逻辑,源码在 canal-deployer 模块,相对比较简单。

源码入口

版本:canal-1.0.24
查看 Canal Server 端的脚本 ./bin/startup.sh,可以找到启动入口类是 CanalLauncher。该类 main 方法首先加载了配置文件到内存用于启动参数,./conf/canal.properties 文件。将参数传递给 final 类 CanalController,所以我们主要查看 CanalController 类。

配置 Manager 方式

要使用 Manager 方式,需要对启动参数进行设置。由前文可知启动时会先读取 canal.properties 文件,所以先需要在该文件增加以下配置

# 配置方式
canal.instance.global.mode=manager
# 是否开启自动扫描
canal.auto.scan=true
# 自动扫描间隔,单位秒
canal.auto.scan.interval=5
# 全局的manager配置方式的链接信息,用于标识该 Server
canal.instance.global.manager.address = 127.0.0.1:1099

这里需要简单说明几个概念
Server: 表示由 ./bin/startup.sh 脚本启动的程序,即一个 JVM;
Instance: 对应一个 Mysql 实例,代码为 CanalInstance 接口;
Destination: 字符串类型,对应一个 Instance;

CanalServerWithEmbedded 类: 连接 mysql master,管理多个 CanalInstance;
CanalServerWithNetty 类: 基于 netty 网络服务的 server 实现,用于与 Client 通讯;
CanalConfigClient 类: 存放配置相关信息;
CanalInstanceGenerator 接口: canal 实例生产者,根据 destination 以及 InstanceConfig 生产 CanalInstance;

其 Server 架构如下图

启动一个 Server,里面可有多个 Instance,一个 Instance 读取一个 Mysql 实例的binlog 日志,Destination 则是对一个 Instance 实例的描述字符串,在该 Server 中唯一。

Manager 实现

Canal-Server 配置加载方式

配置初始化

查看 CanalController 类
在其构造 函数可以看到 CanalInstanceGenerator 实例作为 CanalServerWithEmbedded 实例的组件。

public CanalController(final Properties properties) {
        managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {

            public CanalConfigClient apply(String managerAddress) {
                return getManagerClient(managerAddress);
            }
        });

        // 初始化全局参数设置
        globalInstanceConfig = initGlobalConfig(properties);
        instanceConfigs = new MapMaker().makeMap();
        // 初始化instance config
        initInstanceConfig(properties);

        // 准备canal server
        cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
        ip = getProperty(properties, CanalConstants.CANAL_IP);
        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
        embededCanalServer = CanalServerWithEmbedded.instance();
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
        canalServer = CanalServerWithNetty.instance();
        canalServer.setIp(ip);
        canalServer.setPort(port);
        ......
}

查看 CanalInstanceGenerator 实例的具体实现,在初始化全局配置 initGlobalConfig 方法中

instanceGenerator = new CanalInstanceGenerator() {

    public CanalInstance generate(String destination) {
        InstanceConfig config = instanceConfigs.get(destination);
        if (config == null) {
            throw new CanalServerException("can't find destination:{}");
        }

        logger.info("CanalInstanceGenerator generate mode[{}]", config.getMode());
        if (config.getMode().isManager()) {
            ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
            instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
            return instanceGenerator.generate(destination);
        } else if (config.getMode().isSpring()) {
            SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
            synchronized (this) {
                try {
                    // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
                    instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
                    return instanceGenerator.generate(destination);
                } catch (Throwable e) {
                    logger.error("generator instance failed.", e);
                    throw new CanalException(e);
                } finally {
                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
                }
            }
        } else {
            throw new UnsupportedOperationException("unknow mode :" + config.getMode());
        }

    }
};

可以看到 Manger 方式的配置主要是 CanalConfigClient,从 managerClients 集合中获取。该集合实现主要在构造器中,调用 getManagerClient 方法。所以我们主要通过该方法,构造 CanalConfigClient 实例返回即可。查看 CanalConfigClient 类主要有两个方法 findCanal, findFilter,可以确定 CanalConfigClient 表示整个 Server 的配置。getManagerClient 方法参数 managerAddress 则是在配置文件 canal.properties 中 destination 对应的 ip 地址,若没有配置则使用本机ip。这里比较奇怪的是,既然是通过 destination 获取的 ip,然后构建 CanalConfigClient 实例,为啥不让 CanalConfigClient 实例表示一个 destination 的配置而却是整个 Sever 的配置。

实现 CanalConfigClient 类,主要是一些参数的配置(如 Mysql 连接用户名、密码等), Canal 和字符串 Filter 的构建。Canal 实例的构建可以参考一下 CanalInstanceWithManager 类所用到的一些参数。

        Canal canal = new Canal();

        CanalParameter canalParameter = new CanalParameter();
        canalParameter.setSlaveId(dc.getSlaveId());
        canalParameter.setDbUsername(dc.getUsername());
        canalParameter.setDbPassword(dc.getPassword());
        canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
        List<InetSocketAddress> dbAddresses = new ArrayList<>();
        dbAddresses.add(new InetSocketAddress(dc.getHost(), dc.getPort()));
        canalParameter.setDbAddresses(dbAddresses);

        canal.setCanalParameter(canalParameter);
        canal.setName(destination);

到此可以说已经完成 Manger 方式配置的初始化,那如何更新配置呢,主要是用到了 ManagerInstanceConfigMonitor 类,该类在 CanalController 构造器中初始化。

配置刷新

CanalContoller 构造器中

......
    // 初始化monitor机制
        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {

                public void start(String destination) {
                    InstanceConfig config = instanceConfigs.get(destination);
                    if (config == null) {
                        // 重新读取一下instance config
                        config = parseInstanceConfig(properties, destination);
                        instanceConfigs.put(destination, config);
                    }

                    if (!embededCanalServer.isStart(destination)) {
                        // HA机制启动
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                        if (!config.getLazy() && !runningMonitor.isStart()) {
                            runningMonitor.start();
                        }
                    }
                }

                public void stop(String destination) {
                    // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
                    InstanceConfig config = instanceConfigs.remove(destination);
                    if (config != null) {
                        embededCanalServer.stop(destination);
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                        if (runningMonitor.isStart()) {
                            runningMonitor.stop();
                        }
                    }
                }

                public void reload(String destination) {
                    // 目前任何配置变化,直接重启,简单处理
                    stop(destination);
                    start(destination);
                }
            };

            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {

                public InstanceConfigMonitor apply(InstanceMode mode) {
                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));

                    if (mode.isSpring()) {
                        SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
                        monitor.setScanIntervalInSecond(scanInterval);
                        monitor.setDefaultAction(defaultAction);
                        // 设置conf目录,默认是user.dir + conf目录组成
                        String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                        if (StringUtils.isEmpty(rootDir)) {
                            rootDir = "../conf";
                        }

                        if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
                            monitor.setRootConf(rootDir);
                        } else {
                            // eclipse debug模式
                            monitor.setRootConf("src/main/resources/");
                        }
                        return monitor;
                    } else if (mode.isManager()) {
                        // 配置更新,实现 ManagerInstanceConfigMonitor 参考 SpringInstanceConfigMonitor, 使用上面的 defaultAction 即可
                        ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
                        monitor.setScanIntervalInSecond(scanInterval);
                        monitor.setDefaultAction(defaultAction);
                        monitor.setIp(ip);
                        return monitor;
                    } else {
                        throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
                    }
                }
            });
        }
.....

查看 ManagerInstanceConfigMonitor 类,主要是实现 scan 方法,若配置有更新的话,回调 InstanceAction。这里还有个地方比较奇怪,可以看到 InstanceAction 回调方法参数是字符串 destination?我们在 scan 方法中调用后端服务器接口,已经获取到新的配置,却只能回调字符串 destination,然后在 CanalController.getManagerClient 方法根据 destination 再去调服务器获取具体配置?这里多了一次调用服务器接口,感觉比较奇怪。

总结

Manager 模式实现步骤
1. 在 canal.properties 配置文件设置 canal.instance.global.mode=manager等;
2. 在 CanalController 类构建 CanalConfigClient 实例,根据 ManagerAddress 从 managerClients 获取 CanalConfigClient (根据destination 获取 Canal, Filter);
3. 实现 ManagerInstanceConfigMonitor,启用定时线程刷新配置,使用 InstanceAction 当作回调与 CanalServerWithEmbedded 通信,

配置的过滤 Filter 传递到 EventParser 中的 BinlogParser (实现类 LogEventConvert),所以感觉这里的 Filter 没什么意义,倒不如让 Client 消费所有数据,然后下发再做过滤。
时间: 2024-12-04 01:31:46

Alibaba Canal Manager Model 配置管理实现的相关文章

Canal AdminGuide

背景    先前开源了一个开源项目: [阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费]     本文主要是介绍一下如何部署&使用   环境要求 1. 操作系统     a.  纯java开发,windows/linux均可支持     b.  jdk建议使用1.6.25以上的版本,稳定可靠,目前阿里巴巴使用基本为此版本.    2. mysql要求    a. 目前canal支持mysql 5.5版本以下,对mysql5.6暂不支持,(mysql4.x版本没有经过严

谈谈对Canal(增量数据订阅与消费)的理解

概述 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariaDB). 起源:早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求.不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元. 基于日志增量订阅&消

Canal Client API

1.  首先需要先启动canal server,可参见:Canal Server的QuickStart 2.  运行canal client,可参见:canal client的ClientExample   如何下载 1.  如果是maven用户,可配置mvn dependency 1.<dependency> 2. <groupId>com.alibaba.otter</groupId> 3. <artifactId>canal.client</ar

Otter-入门篇2(Manager安装配置)

Otter-入门篇2(Manager安装配置) 前言 上一节已经简单介绍了Otter的基本信息,本节我们就来开准备搭建一个我们自己的Otter环境,因为一个Otter需要Manage+node+数据库还有很多的依赖,本节我们先来搭建Otter的管理服务器Manager. 附上: 喵了个咪的博客:w-blog.cn Otter项目地址:https://github.com/alibaba/otter Otter文档地址:https://github.com/alibaba/otter/wiki 1

阿里巴巴开源项目:分布式数据库同步系统otter(解决中美异地机房)

项目背景 阿里巴巴B2B公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了杭州和美国异地机房的需求,同时为了提升用户体验,整个机房的架构为双A,两边均可写,由此诞生了otter这样一个产品. otter第一版本可追溯到04~05年,此次外部开源的版本为第4版,开发时间从2011年7月份一直持续到现在,目前阿里巴巴B2B内部的本地/异地机房的同步需求基本全上了otte4. 目前同步规模: 同步数据量6亿 文件同步1.5TB(2000w张图片) 涉及200+个数据库实例之间的

如何基于MySQL及Redis搭建统一的kv存储服务 | 秦波

一.MySQL+Redis常用部署方式 1.1  拓扑 1.2  特点 业务层通过双写同时写MySQL及Redis.读通常在Redis,若读取不到,则从MySQL读取,然后将数据同步到Redis,Redis通常设置expire或者默认LRU进行数据淘汰. 这种使用方式会有如下问题: 1)MySQL及Redis存在数据不一致风险,尤其是长时间运行的系统 2)业务层需要处理MySQL sql schema与Redis kv数据结构上的逻辑差异 3)无统一运维 4)无法方便扩容/缩容 二.KV化的存储

如何基于日志,同步实现数据的一致性和实时抽取?

作者:王东 宜信技术研发中心架构师 目前就职于宜信技术研发中心,任架构师,负责流式计算和大数据业务产品解决方案. 曾任职于Naver china(韩国最大搜索引擎公司)中国研发中心资深工程师,多年从事CUBRID分布式数据库集群开发和CUBRID数据库引擎开发 http://www.cubrid.org/blog/news/cubrid-cluster-introduction/ 主题简介: DWS的背景介绍 dbus+wormhole总体架构和技术实现方案 DWS的实际运用案例 前言 大家好,

otter部署【原创】

环境IP:10.10.6.171 部署:mysql源库IP:10.10.6.172 部署:mysql目标库IP:10.10.6.173 部署:zookeeper,manager,node,canal (也都部署到源库服务器上) 由于otter进行数据库同步,目前仅支持row,所以需要把源库的binlog_format改为ROWlog-bin=mysql-binexpire_logs_days=3binlog_format=ROW otter为纯java编写(manager,node,canal都

【转】微服务MySQL分库分表数据到MongoDB同步方案

需求背景 近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式.我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展. 发现问题 微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库.如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题.而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时