本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云
我百度了一下 Canal,发现与 Canal 相关的技术文章并不多,再加上我上一篇文章《阿里 canal 内存溢出 Java heap space 问题解决》中走入的误区,导致了我想要写一篇关于 Canal 教程的文章。所以便有了本文,下面就和我一起来看看如何使用 canal 吧。
根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。
mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。
经 canal 解析过的对象,我们使用起来就非常的方便了。
再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。
根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.2.tar.gz 进行解析 MySQL 的 binlog 日志。
下载后,复制 canal.deployer-1.1.2.tar.gz 到 MySQL 主机上,比如放在 /var/xttblog/otter 目录下。然后依次执行下面的命令:
mkdir canal cd canal tar -zxvf ../canal.deployer-1.1.2.tar.gz
然后修改 canal 的配置文件 vi conf/example/instance.properties。
这三项改成你自己的,比如我的配置如下:
canal.instance.dbUsername=xttblog canal.instance.dbPassword=xttblog canal.instance.connectionCharset = UTF-8 canal.instance.defaultDatabaseName =xttblog
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。
show variables like 'binlog_format'; show variables like 'log_bin'; select version();
canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。
执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。
执行 vi /etc/my.cnf 命令。添加下面 3 个配置。
log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着执行 sudo service mysqld restart 重启 MySQL。
需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate 操作的权利。
如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。
CREATE USER xttblog IDENTIFIED BY ‘xttblog’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘‘xttblog’’@’%’; FLUSH PRIVILEGES;
MySQL 启动后,就可以开启 canal 服务了。
/var/xttblog/otter/canal/bin/startup.sh
开启后,观察 canal 服务的日志,确保服务正常。
tail -n 200 /var/xttblog/otter/canal/logs/canal/canal.log
确定没有问题后,开始编写我们的测试程序。
pom.xml 中导入下面的依赖。
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency>
然后编写下面的 Java 代码。
package com.xttblog.canal.test; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; /** * CanalTest * @author www.xttblog.com * @date 2019/2/27 上午10:01 */ public class CanalTest { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event " + "has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> " + "binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。
最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!
本文原文出处:业余草: » 阿里巴巴开源的 Canal 使用教程