Canal Client API

1.  首先需要先启动canal server,可参见:Canal Server的QuickStart

2.  运行canal client,可参见:canal client的ClientExample

 

如何下载

1.  如果是maven用户,可配置mvn dependency

1.<dependency>
2.    <groupId>com.alibaba.otter</groupId>
3.    <artifactId>canal.client</artifactId>
4.    <version>x.y.z</version>
5.</dependency>

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>x.y.z</version>
</dependency>

 对应的version版本,可见https://github.com/alibaba/canal/releases

 

2.  其他用户,可通过mvn仓库直接下载jar包

mvn仓库下载url :  http://central.maven.org/maven2/com/alibaba/otter/canal.client/

选择对应的version,下载jar/source/javadoc文件即可. 

 

类设计

在了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal. 

 

大致分为几部分:

  • ClientIdentity
    canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)
  • CanalConnector
    SimpleCanalConnector/ClusterCanalConnector :   两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制
  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server. 
  • ClientRunningMonitor/ClientRunningListener/ClientRunningData
    client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点.  保证整个系统的高可用性.

javadoc查看:

server/client交互协议


  

具体的网络协议格式,可参见:CanalProtocol.proto

get/ack/rollback协议介绍:

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    a. batch id 唯一标识
    b. entries 具体的数据对象,可参见下面的数据介绍
  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
    a. 拿够batchSize条记录或者超过timeout时间
    b. timeout=0,阻塞等到足够的batchSize
  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api. 

流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化.  (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

流式api设计:

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

流式api带来的异步响应模型:

 

数据对象格式简单介绍:EntryProtocol.proto

1.Entry
2.    Header
3.        logfileName [binlog文件名]
4.        logfileOffset [binlog position]
5.        executeTime [binlog里记录变更发生的时间戳,精确到秒]
6.        schemaName
7.        tableName
8.        eventType [insert/update/delete类型]
9.    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]
10.    storeValue  [byte数据,可展开,对应的类型为RowChange]
11.
12.RowChange
13.    isDdl       [是否是ddl变更操作,比如create table/drop table]
14.    sql         [具体的ddl sql]
15.    rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
16.        beforeColumns [Column类型的数组,变更前的数据字段]
17.        afterColumns [Column类型的数组,变更后的数据字段]
18.
19.Column
20.    index
21.    sqlType     [jdbc type]
22.    name        [column name]
23.    isKey       [是否为主键]
24.    updated     [是否发生过变更]
25.    isNull      [值是否为null]
26.    value       [具体的内容,注意为string文本]
Entry
    Header
        logfileName [binlog文件名]
        logfileOffset [binlog position]
        executeTime [binlog里记录变更发生的时间戳,精确到秒]
        schemaName
        tableName
        eventType [insert/update/delete类型]
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]
    storeValue  [byte数据,可展开,对应的类型为RowChange]  

RowChange
    isDdl       [是否是ddl变更操作,比如create table/drop table]
    sql         [具体的ddl sql]
    rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
        beforeColumns [Column类型的数组,变更前的数据字段]
        afterColumns [Column类型的数组,变更后的数据字段]  

Column
    index
    sqlType     [jdbc type]
    name        [column name]
    isKey       [是否为主键]
    updated     [是否发生过变更]
    isNull      [值是否为null]
    value       [具体的内容,注意为string文本]  

说明:

  • 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
  • 可以提供ddl的变更语句
  • insert只有after columns,  delete只有before columns,而update则会有before / after columns数据.

Client使用例子

 

1. 创建Connector

a.  创建SimpleCanalConnector (直连ip,不支持server/client的failover机制)

1.CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");  

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");

b.  创建ClusterCanalConnector (基于zookeeper获取canal server ip,支持server/client的failover机制)

1.CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");  
CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");

c.  创建ClusterCanalConnector (基于固定canal server的地址,支持固定的server ip的failover机制,不支持client的failover机制

1.CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");  

2.  get/ack/rollback使用 

 

3.   RowData数据处理


 

如果需要更详细的exmpale例子,请下载canal当前最新源码包,里面有个example工程,谢谢.

时间: 2024-12-21 22:23:40

Canal Client API的相关文章

WebLogic UDDI Client API实例学习(一)

在开发UDDI的客户端程序时,我们一般考虑的都是IBM提供的UDDI4J,但不知道大家注意到没有,在BEA的WebLogic中也提供了一个非常方便的UDDI Client API.只是它被包含在weblogic.jar文件中而没有单独发布.WebLogic UDDI Client API的结构完全遵循了UDDI的体系结构,所以,通过对这套API类库的学习和实践,相信也有助于我们更好地理解UDDI的体系架构. Inquiry和Publish的基础 对UDDI注册中心的操作不外乎两大类:查询和发布(

(RabbitMQ) Java Client API Guide

本篇翻译的是RabbitMQ官方文档关于API的内容,原文链接:http://www.rabbitmq.com/api-guide.html.博主对其内容进行大体上的翻译,有些许部分会保留英文,个人觉得这样更加有韵味,如果全部翻译成中文,会存在偏差,文不达意(主要是功力浅薄~~).文章也对部分内容进行一定的解释,增强对相关知识点的理解. Overview RabbitMQ java client uses com.rabbitmq.client as its top-level package,

WebLogic UDDI Client API实例学习(二)

发布 Publish 与查询类似,可以发布到UDDI 注册中心的数据模型也有四个:商业实体.商业服务.绑定信息和tModel.对应Publish类中的四个save方法: 四个save方法用于发布或修改UDDI的数据,四个delete方法用于删除数据. getAuthToken则可以获取注册中心的登录信息,得到的AuthInfo需要在对UDDI的数据进行发布.修改或删除时作为身份验证的参数提供. 登录注册中心 对UDDI注册中心的数据进行操作时需要提供用户注册信息或(及)操作员的信息.Publis

谈谈对Canal(增量数据订阅与消费)的理解

概述 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariaDB). 起源:早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求.不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元. 基于日志增量订阅&消

【源码】canal和otter的高可靠性分析

一般来说,我们对于数据库最主要的要求就是:数据不丢.不管是主从复制,还是使用类似otter+canal这样的数据库同步方案,我们最基本的需求是,在数据不丢失的前提下,尽可能的保证系统的高可用,也就是在某个节点挂掉,或者数据库发生主从切换等情况下,我们的数据同步系统依然能够发挥它的作用--数据同步.本文讨论的场景是数据库发生主从切换,本文将从源码的角度,来看看otter和canal是如何保证高可用和高可靠的. 一.EventParser 通过阅读文档和源码,我们可以知道,对于一个canal ser

Java规则引擎与其API应用详解

详解 本文对Java规则引擎与其API(JSR-94)及相关实现做了较详细的介绍,对其体系结构和API应用有较详尽的描述,并指出Java规则引擎,规则语言,JSR-94的相互关系,以及JSR-94的不足之处和展望 本文对Java规则引擎与其API(JSR-94)及相关实现做了较详细的介绍,对其体系结构和API应用有较详尽的描述,并指出Java规则引擎,规则语言,JSR-94的相互关系,以及JSR-94的不足之处和展望 复杂企业级项目的开发以及其中随外部条件不断变化的业务规则(business l

Google搜索客户端API for C/C++

Google 网站上提供了goole search client api for Java和for dot.net的版本,大家可以参考. For C/C++的版本特点如下 内置XML解析器. 内置gbk/utf-8编码/解码器(包含GBK编码表) 采用Pure C编写,采用标准C接口. 支持http代理. 用户需要在http://api.google.com/createkey申请key 演示程序中的key是一个网友给我的,仅供测试用,演示程序中的代理可能连接不上,由于没有设置超时,可能处于等待

Canal AdminGuide

背景    先前开源了一个开源项目: [阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费]     本文主要是介绍一下如何部署&使用   环境要求 1. 操作系统     a.  纯java开发,windows/linux均可支持     b.  jdk建议使用1.6.25以上的版本,稳定可靠,目前阿里巴巴使用基本为此版本.    2. mysql要求    a. 目前canal支持mysql 5.5版本以下,对mysql5.6暂不支持,(mysql4.x版本没有经过严

Canal ClientExample

Canal介绍       基于mysql数据库binlog的增量订阅&消费   ClientExample 依赖配置:(目前暂未正式发布到mvn仓库,所以需要各位下载canal源码后手工执行下mvn clean install -Dmaven.test.skip) 1.<dependency> 2. <groupId>com.alibaba.otter</groupId> 3. <artifactId>canal.client</artifa