Alibaba Canal Manager Model 配置管理实现
Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 alibaba/canal。
其中 Server 端配置有两种管理方式: Spring 和 Manager。其中 Spring 方式是基于spring xml + properties 进行定义构建 spring 配置, Manager 方式则可以对接 Web console/manager 系统。本文主要记录一下 Manager 方式的对接逻辑,源码在 canal-deployer 模块,相对比较简单。
源码入口
版本:canal-1.0.24
查看 Canal Server 端的脚本 ./bin/startup.sh,可以找到启动入口类是 CanalLauncher。该类 main 方法首先加载了配置文件到内存用于启动参数,./conf/canal.properties 文件。将参数传递给 final 类 CanalController,所以我们主要查看 CanalController 类。
配置 Manager 方式
要使用 Manager 方式,需要对启动参数进行设置。由前文可知启动时会先读取 canal.properties 文件,所以先需要在该文件增加以下配置
# 配置方式
canal.instance.global.mode=manager
# 是否开启自动扫描
canal.auto.scan=true
# 自动扫描间隔,单位秒
canal.auto.scan.interval=5
# 全局的manager配置方式的链接信息,用于标识该 Server
canal.instance.global.manager.address = 127.0.0.1:1099
这里需要简单说明几个概念
Server: 表示由 ./bin/startup.sh 脚本启动的程序,即一个 JVM;
Instance: 对应一个 Mysql 实例,代码为 CanalInstance 接口;
Destination: 字符串类型,对应一个 Instance;
CanalServerWithEmbedded 类: 连接 mysql master,管理多个 CanalInstance;
CanalServerWithNetty 类: 基于 netty 网络服务的 server 实现,用于与 Client 通讯;
CanalConfigClient 类: 存放配置相关信息;
CanalInstanceGenerator 接口: canal 实例生产者,根据 destination 以及 InstanceConfig 生产 CanalInstance;
其 Server 架构如下图
启动一个 Server,里面可有多个 Instance,一个 Instance 读取一个 Mysql 实例的binlog 日志,Destination 则是对一个 Instance 实例的描述字符串,在该 Server 中唯一。
Manager 实现
配置初始化
查看 CanalController 类
在其构造 函数可以看到 CanalInstanceGenerator 实例作为 CanalServerWithEmbedded 实例的组件。
public CanalController(final Properties properties) {
managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {
public CanalConfigClient apply(String managerAddress) {
return getManagerClient(managerAddress);
}
});
// 初始化全局参数设置
globalInstanceConfig = initGlobalConfig(properties);
instanceConfigs = new MapMaker().makeMap();
// 初始化instance config
initInstanceConfig(properties);
// 准备canal server
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
ip = getProperty(properties, CanalConstants.CANAL_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
......
}
查看 CanalInstanceGenerator 实例的具体实现,在初始化全局配置 initGlobalConfig 方法中
instanceGenerator = new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
InstanceConfig config = instanceConfigs.get(destination);
if (config == null) {
throw new CanalServerException("can't find destination:{}");
}
logger.info("CanalInstanceGenerator generate mode[{}]", config.getMode());
if (config.getMode().isManager()) {
ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
return instanceGenerator.generate(destination);
} else if (config.getMode().isSpring()) {
SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
synchronized (this) {
try {
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
return instanceGenerator.generate(destination);
} catch (Throwable e) {
logger.error("generator instance failed.", e);
throw new CanalException(e);
} finally {
System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
}
}
} else {
throw new UnsupportedOperationException("unknow mode :" + config.getMode());
}
}
};
可以看到 Manger 方式的配置主要是 CanalConfigClient,从 managerClients 集合中获取。该集合实现主要在构造器中,调用 getManagerClient 方法。所以我们主要通过该方法,构造 CanalConfigClient 实例返回即可。查看 CanalConfigClient 类主要有两个方法 findCanal, findFilter,可以确定 CanalConfigClient 表示整个 Server 的配置。getManagerClient 方法参数 managerAddress 则是在配置文件 canal.properties 中 destination 对应的 ip 地址,若没有配置则使用本机ip。这里比较奇怪的是,既然是通过 destination 获取的 ip,然后构建 CanalConfigClient 实例,为啥不让 CanalConfigClient 实例表示一个 destination 的配置而却是整个 Sever 的配置。
实现 CanalConfigClient 类,主要是一些参数的配置(如 Mysql 连接用户名、密码等), Canal 和字符串 Filter 的构建。Canal 实例的构建可以参考一下 CanalInstanceWithManager 类所用到的一些参数。
Canal canal = new Canal();
CanalParameter canalParameter = new CanalParameter();
canalParameter.setSlaveId(dc.getSlaveId());
canalParameter.setDbUsername(dc.getUsername());
canalParameter.setDbPassword(dc.getPassword());
canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
List<InetSocketAddress> dbAddresses = new ArrayList<>();
dbAddresses.add(new InetSocketAddress(dc.getHost(), dc.getPort()));
canalParameter.setDbAddresses(dbAddresses);
canal.setCanalParameter(canalParameter);
canal.setName(destination);
到此可以说已经完成 Manger 方式配置的初始化,那如何更新配置呢,主要是用到了 ManagerInstanceConfigMonitor 类,该类在 CanalController 构造器中初始化。
配置刷新
CanalContoller 构造器中
......
// 初始化monitor机制
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
if (autoScan) {
defaultAction = new InstanceAction() {
public void start(String destination) {
InstanceConfig config = instanceConfigs.get(destination);
if (config == null) {
// 重新读取一下instance config
config = parseInstanceConfig(properties, destination);
instanceConfigs.put(destination, config);
}
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
}
public void stop(String destination) {
// 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
InstanceConfig config = instanceConfigs.remove(destination);
if (config != null) {
embededCanalServer.stop(destination);
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (runningMonitor.isStart()) {
runningMonitor.stop();
}
}
}
public void reload(String destination) {
// 目前任何配置变化,直接重启,简单处理
stop(destination);
start(destination);
}
};
instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
public InstanceConfigMonitor apply(InstanceMode mode) {
int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
if (mode.isSpring()) {
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
// 设置conf目录,默认是user.dir + conf目录组成
String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
if (StringUtils.isEmpty(rootDir)) {
rootDir = "../conf";
}
if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
monitor.setRootConf(rootDir);
} else {
// eclipse debug模式
monitor.setRootConf("src/main/resources/");
}
return monitor;
} else if (mode.isManager()) {
// 配置更新,实现 ManagerInstanceConfigMonitor 参考 SpringInstanceConfigMonitor, 使用上面的 defaultAction 即可
ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
monitor.setIp(ip);
return monitor;
} else {
throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
}
}
});
}
.....
查看 ManagerInstanceConfigMonitor 类,主要是实现 scan 方法,若配置有更新的话,回调 InstanceAction。这里还有个地方比较奇怪,可以看到 InstanceAction 回调方法参数是字符串 destination?我们在 scan 方法中调用后端服务器接口,已经获取到新的配置,却只能回调字符串 destination,然后在 CanalController.getManagerClient 方法根据 destination 再去调服务器获取具体配置?这里多了一次调用服务器接口,感觉比较奇怪。
总结
Manager 模式实现步骤
1. 在 canal.properties 配置文件设置 canal.instance.global.mode=manager
等;
2. 在 CanalController 类构建 CanalConfigClient 实例,根据 ManagerAddress 从 managerClients 获取 CanalConfigClient (根据destination 获取 Canal, Filter);
3. 实现 ManagerInstanceConfigMonitor,启用定时线程刷新配置,使用 InstanceAction 当作回调与 CanalServerWithEmbedded 通信,
配置的过滤 Filter 传递到 EventParser 中的 BinlogParser (实现类 LogEventConvert),所以感觉这里的 Filter 没什么意义,倒不如让 Client 消费所有数据,然后下发再做过滤。