Canal ClientExample

Canal介绍

      基于mysql数据库binlog的增量订阅&消费

 

ClientExample

依赖配置:(目前暂未正式发布到mvn仓库,所以需要各位下载canal源码后手工执行下mvn clean install -Dmaven.test.skip)

1.<dependency>
2.    <groupId>com.alibaba.otter</groupId>
3.    <artifactId>canal.client</artifactId>
4.    <version>1.0.1-SNAPSHOT</version>
5.</dependency>

 

1. 创建mvn标准工程:

1.mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample  

2.  修改pom.xml,添加依赖

 

3.  ClientSample代码

1.package com.alibaba.otter.canal.sample;
2.
3.import java.net.InetSocketAddress;
4.import java.util.List;
5.
6.import com.alibaba.otter.canal.common.utils.AddressUtils;
7.import com.alibaba.otter.canal.protocol.Message;
8.import com.alibaba.otter.canal.protocol.CanalEntry.Column;
9.import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
10.import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
11.import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
12.import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
13.import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
14.
15.public class SimpleCanalClientExample {
16.
17.    public static void main(String args[]) {
18.        // 创建链接
19.        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
20.                                                                                            11111), "example", "", "");
21.        int batchSize = 1000;
22.        int emptyCount = 0;
23.        try {
24.            connector.connect();
25.            connector.subscribe(".*\\..*");
26.            connector.rollback();
27.            int totalEmtryCount = 120;
28.            while (emptyCount < totalEmtryCount) {
29.                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
30.                long batchId = message.getId();
31.                int size = message.getEntries().size();
32.                if (batchId == -1 || size == 0) {
33.                    emptyCount++;
34.                    System.out.println("empty count : " + emptyCount);
35.                    try {
36.                        Thread.sleep(1000);
37.                    } catch (InterruptedException e) {
38.                    }
39.                } else {
40.                    emptyCount = 0;
41.                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
42.                    printEntry(message.getEntries());
43.                }
44.
45.                connector.ack(batchId); // 提交确认
46.                // connector.rollback(batchId); // 处理失败, 回滚数据
47.            }
48.
49.            System.out.println("empty too many times, exit");
50.        } finally {
51.            connector.disconnect();
52.        }
53.    }
54.
55.    private static void printEntry(List<Entry> entrys) {
56.        for (Entry entry : entrys) {
57.            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
58.                continue;
59.            }
60.
61.            RowChange rowChage = null;
62.            try {
63.                rowChage = RowChange.parseFrom(entry.getStoreValue());
64.            } catch (Exception e) {
65.                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
66.                                           e);
67.            }
68.
69.            EventType eventType = rowChage.getEventType();
70.            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
71.                                             entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
72.                                             entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
73.                                             eventType));
74.
75.            for (RowData rowData : rowChage.getRowDatasList()) {
76.                if (eventType == EventType.DELETE) {
77.                    printColumn(rowData.getBeforeColumnsList());
78.                } else if (eventType == EventType.INSERT) {
79.                    printColumn(rowData.getAfterColumnsList());
80.                } else {
81.                    System.out.println("-------> before");
82.                    printColumn(rowData.getBeforeColumnsList());
83.                    System.out.println("-------> after");
84.                    printColumn(rowData.getAfterColumnsList());
85.                }
86.            }
87.        }
88.    }
89.
90.    private static void printColumn(List<Column> columns) {
91.        for (Column column : columns) {
92.            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
93.        }
94.    }
95.}

4. 运行Client

首先启动Canal Server,可参加QuickStart : http://agapple.iteye.com/blogs/1796070

启动Canal Client后,可以从控制台从看到类似消息:

1.empty count : 1
2.empty count : 2
3.empty count : 3
4.empty count : 4

此时代表当前数据库无变更数据

5.  触发数据库变更

1.mysql> use test;
2.Database changed
3.mysql> CREATE TABLE `xdual` (
4.    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,
5.    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
6.    ->   PRIMARY KEY (`ID`)
7.    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
8.Query OK, 0 rows affected (0.06 sec)
9.
10.mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

可以从控制台中看到:

1.empty count : 1
2.empty count : 2
3.empty count : 3
4.empty count : 4
5.================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
6.ID : 4    update=true
7.X : 2013-02-05 23:29:46    update=true

最后:

  整个代码在附件中可以下载,如有问题可及时联系。 

时间: 2024-12-09 16:08:01

Canal ClientExample的相关文章

Canal Client API

1.  首先需要先启动canal server,可参见:Canal Server的QuickStart 2.  运行canal client,可参见:canal client的ClientExample   如何下载 1.  如果是maven用户,可配置mvn dependency 1.<dependency> 2. <groupId>com.alibaba.otter</groupId> 3. <artifactId>canal.client</ar

Alibaba Canal Manager Model 配置管理实现

Alibaba Canal Manager Model 配置管理实现 Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 alibaba/canal. 其中 Server 端配置有两种管理方式: Spring 和 Manager.其中 Spring 方式是基于spring xml + properties 进行定义构建 spring 配置, Manager 方式则可以对接 Web console/manager 系统.本文主要记录一下 Manage

canal DevGuide

背景    先前开源了一个开源项目: [阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费]     本文主要是介绍一下如何给canal贡献代码,介绍其设计思路和扩展方式   设计  说明: server代表一个canal运行实例,对应于一个jvm instance对应于一个数据队列  (1个server对应1..n个instance) instance下的子模块: eventParser (数据源接入,模拟slave协议和master进行交互,协议解析) eventSink

Canal AdminGuide

背景    先前开源了一个开源项目: [阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费]     本文主要是介绍一下如何部署&使用   环境要求 1. 操作系统     a.  纯java开发,windows/linux均可支持     b.  jdk建议使用1.6.25以上的版本,稳定可靠,目前阿里巴巴使用基本为此版本.    2. mysql要求    a. 目前canal支持mysql 5.5版本以下,对mysql5.6暂不支持,(mysql4.x版本没有经过严

Canal BinlogChange(mysql5.6)

背景 先前开源了一个开源项目: [阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费]    本文主要是介绍一下canal1.0.3支持mysql5.6协议上的变化.    协议变化 1.   binlog checksum     mysql5.6之后,支持在binlog对象中增加checksum信息,比如CRC32协议.   其原理主要是在原先binlog的末尾新增了4个byte,写入一个crc32的校验值.      对应参数说明: http://dev.mysql.co

谈谈对Canal(增量数据订阅与消费)的理解

概述 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariaDB). 起源:早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求.不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元. 基于日志增量订阅&消

Canal BinlogChange(mariadb5/10)

背景 先前开源了一个开源项目: [阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费]  本文主要是介绍一下canal支持mariadb协议上的变化.    协议变化 mariadb5.5 mariadb5.5主要是基于mysql5.5的原型,类型定义基本没啥变化,大体上都保持兼容 主要的变化: 1. QueryLogEvent增加了status变量.     Q_HRNOW  用于记录毫秒的精度,枚举值下标为128 协议解析的时候,需要处理Q_HRNOW,需要跳过3字节的数据

【源码】canal和otter的高可靠性分析

一般来说,我们对于数据库最主要的要求就是:数据不丢.不管是主从复制,还是使用类似otter+canal这样的数据库同步方案,我们最基本的需求是,在数据不丢失的前提下,尽可能的保证系统的高可用,也就是在某个节点挂掉,或者数据库发生主从切换等情况下,我们的数据同步系统依然能够发挥它的作用--数据同步.本文讨论的场景是数据库发生主从切换,本文将从源码的角度,来看看otter和canal是如何保证高可用和高可靠的. 一.EventParser 通过阅读文档和源码,我们可以知道,对于一个canal ser

Canal QuickStart

Canal介绍       基于mysql数据库binlog的增量订阅&消费   QuickStart 几点说明:(mysql初始化) a.  canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row.  1.[mysqld] 2.log-bin=mysql-bin #添加这一行就ok 3.binlog-format=ROW #选择row模式 4.server_id=1 #配置mysql replaction需要定