什么是 canal#
canal,译意为水道 / 管道 / 沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供 增量数据订阅和消费。
通常情况下,canal 用来同步或监控 MySQL 数据库的数据,比如数据入库后同步到 ES
canal 将自己伪装成 MySQL 的从机,解析并处理主机发送的 binary log 日志
配置 canal#
在使用 canal 之前,你需要配置本地的 MySQL 服务,使其启用 binlog 日志
window 找到安装 MySQL 目录下的 my.ini
文件,把下面的配置加入到 [mysqld]
下
#binlog文件名
log-bin=mysql-bin
#选择row模式
binlog_format=ROW
#mysql实例id,不能和canal的slaveId重复
server_id=1
Linux 可直接修改
vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
配置完成后重启 MySQL 服务
使用以下命令查看是否启用成功
show variables like 'log_bin';
然后在官方仓库下载最新版并解压
修改 conf
文件夹下的 canal.properties
不修改使用默认也行
其中重要的是
# 指定接口
canal.port = 11111
# 指定实例
canal.destinations = example
# 指定模式,默认tcp,其它有kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
# 指定rabbitmq,不配置使用默认tcp也行
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
然后修改 example
(对应上面的实例配置) 下的 instance.properties
# 主机地址
canal.instance.master.address=127.0.0.1:3306
# 连接主机的账户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 指定连接时的编码
canal.instance.connectionCharset = UTF-8
# 指定监听的数据库与表,此配置代表监听test库下的全部表
canal.instance.filter.regex=test..*
配置完成后在 bin
目录下使用 startup.bat
启动 canal
此时就代表启动成功了
spring boot 配置#
在 spring boot 中使用 canal 有以下几种方式
- 使用官方依赖
- 使用第三方
仓库 | 版本 | 优劣 |
---|---|---|
https://github.com/alibaba/canal | 官方 | 配置简单,实现困难 |
https://github.com/NormanGyllenhaal/canal-client | 第三方 | 简单易用,断更,有 BUG |
https://github.com/behappy-project/behappy-canal | 第三方 | 上面的改版,持续更新,缺点仅支持 jdk17+ |
https://github.com/xizixuejie/canal-spring | 第三方 | 简单易用,持续更新 |
推荐使用最后一个
在 maven 中添加依赖
<dependency>
<groupId>io.github.xizixuejie</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>0.0.5</version>
</dependency>
在配置文件中注册
# canal服务地址
canal.server=127.0.0.1:11111
# canal的集群名字,要与安装canal时设置的名称一致
canal.destination=example
# 未配置也使用默认的tcp模式,但未配置可能会无法监听到,因此推荐配置
canal.server-mode=tcp
完成后在启动类增加
@EnableCanalListener(basePackages = "com.jkkj.config.canal")
开启监听器,并指定监听器的配置路径
然后配置监听器
@CanalListener(schemaName = "test", tableName = "testTable")
public class TestListener implements EntryListener<TestEntity> {
private static final Logger log = LoggerFactory.getLogger(TestListener.class);
@Override
public void insert(TestEntity testEntity) {
log.info("insert={}", testEntity);
}
@Override
public void update(TestEntity before, TestEntity after) {
log.info("update before={}", before);
log.info("update after={}", after);
}
@Override
public void delete(TestEntity testEntity) {
log.info("delete={}", testEntity);
}
}
可以使用 @CanalListener
指定库名及表名
@CanalListener(schemaName = "test", tableName = "testTable")
也可以不指定,则会根据 Entity
对象监听,这种方式需要在实体类中指定
@TableName(value = "inspection_result_home")
会根据这个 mybatisplus 注解实体类名自动转换表名
扩展#
在使用过程中,发现如果实体类中字段为下面的情况则会报错
这是因为在转换过程中没有指定解析器
你可以查看我提的这个 Issues,或等待作者修改 (作者已修改)
docker 安装#
在 Linux 下使用 docker 安装是比较简单的
- 下载镜像
docker pull canal/canal-server:v1.1.5
- 先启动一次
docker run -p 11111:11111 --name canal -d canal/canal-server:v1.1.5
- 复制配置文件
# 拷贝配置文件
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/docker/canal/conf
docker cp canal:/home/admin/canal-server/conf/canal.properties /home/docker/canal/conf
前面为容器内地址,后面的 /home/jk/docker/canal/conf
是宿主机地址
4. 修改配置文件
主要修改的地方为 instance.properties
下的
# 主机地址
canal.instance.master.address=127.0.0.1:3306
127.0.0.1
指代的是容器地址,如果你的 MySQL 安装在宿主机内,则需要指定宿主机的数据库地址。使用 ifconfig
命令查看 docker0 就表示宿主机地址
- 重启镜像
docker stop canal
docker rm canal
docker run -p 11111:11111 --name canal \
-v /home/jk/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-v /home/jk/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
-d canal/canal-server:v1.1.5
不使用最新版本 canal 镜像的原因可查看此链接