canal的介绍
canal的历史由来
在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,由于跨机房同步数据的业务需求 ,便孕育而生出了canal,主要是基于trigger(触发器)
的方式获取增量变更。从 2010 年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。
当前的 canal 支持的数据源端Mysql版本包括( 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x)
canal的应用场景
目前普遍基于日志增量订阅和消费的业务包括
- 基于数据库增量日志解析,提供增量数据订阅和消费
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
canal的工作原理
在介绍canal的原理之前,我们先来了解以下MySQL主从复制的原理
MySQL主从复制原理
- MySQL master 将数据变更的操作写入二进制日志
binary log
中, 其中记录的内容叫做二进制日志事件binary log events
,可以通过show binlog events
命令进行查看 - MySQL slave 会将 master 的
binary log
中的binary log events
拷贝到它的中继日志中(relay log) - MySQL slave 重读并执行
relay log
中的事件,将数据变更映射到它自己的数据库表中
了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似到逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?
canal工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送
binary log
给 slave (也就是 canal ) - canal 解析
binary log
对象(数据为byte
流)
基于这样的原理与方式,便可以完成数据库增量日志的解析,提供增量数据订阅和消费,实现mysql实时增量数据传输的功能。
既然canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中,
canal的Docker环境准备
因为目前容器化技术到火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解canal,所以关于Docker的内容不会涉及太多,主要会介绍Docker的概念命令的使用。
什么是Docker
相信绝大多数人都使用过虚拟机Vmware,在使用Vmware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一遍,而且Vmware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。
为了便于大家快速理解Docker,便与Vmware做对比来做介绍,docker 提供了一个开始,打包,运行app的平台,把app(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(类似Vmware的系统镜像)与容器(类似Vmware里安装的系统): Docker 什么是Image(镜像)
- 文件和meta data的集合(root filesystem)
- 分层的,并且每一层都可以添加改变删除文件,成为一个新的image
- 不同的image可以共享相同的layer
- Image本身是read-only的
- 通过Image创建(copy)
- 在Image layer 之上建立一个container layer(可读写)
- 类比面向对象:类和实例
- Image负责app的存储和分发,Container负责运行app
搭建canal环境
附上Docker的下载安装地址==> Docker Download
下载canal镜像docker pull canal/canal-server
docker pull mysql:5.7
查看已经下载好的镜像docker images
接下来通过镜像生成mysql容器与canal-server容器
##生成mysql容器
docker run --name mysql5.7 -e MYSQL_ROOT_PASSWORD=root -d -p 3306:3306mysql:5.7
##生成canal-server容器
docker run --name canal-server -p 11111:11111 -d canal/canal-server
复制代码
查看docker中运行的容器docker ps
MySQL的配置修改
以上只是初步准备好了基础的环境,但是怎么让canal伪装成salve并正确获取mysql中的binary log呢?
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,修改mysql配置文件来开启bin_log,通过 find / -name my.cnf
查找my.cnf, 配置文件内容如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
复制代码
进入mysql容器docker exec -it mysql5.7 bash
授权canal链接MySQL账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
mysql -uroot -proot
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
复制代码
数据库重启后, 简单测试 my.cnf 配置是否生效
show variables like 'log_bin';
show variables like 'log_bin';
show master status;
复制代码
canal-server的配置修改
进入canal-server容器docker exec -it canal-server bash
编辑canal-server的配置
vi canal-server/conf/example/instance.properties
重启canal-server容器
docker restart canal-server
进入容器查看启动日志
docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
复制代码
至此,我们的环境工作准备完成!!!
数据拉取并同步发送到ElasticSearch
环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取canal解析后的binlog数据。现在我们基于spring boot搭建一个canal demo应用。
Student.javapackage com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
@Data
public class Student implements Serializable {
private String id;
private String name;
private int age;
private String sex;
private String city;
}
复制代码
CanalConfig.java
package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
//import Student;
//import com.example.canal_demo.service.StudentService;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* @author haha
*/
@Slf4j
@Configuration
public class CanalConfig {
@Value("${canal.server.ip}")
private String canalIp;
@Value("${canal.server.port}")
private Integer canalPort;
@Value("${canal.destination}")
private String destination;
@Value("${elasticSearch.server.ip}")
private String elasticSearchIp;
@Value("${elasticSearch.server.port}")
private Integer elasticSearchPort;
@Value("${zookeeper.server.ip}")
private String zkServerIp;
@Bean
public CanalConnector canalSimpleConnector() {
// 获取CanalServer连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
return canalConnector;
}
// @Bean
// public CanalConnector canalHaConnector() {
// CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
// return canalConnector;
// }
@Bean
public RestHighLevelClient restHighLevelClient() {
RestHighLevelClient client = new RestHighLevelClient(
//传入RestClientBuilder
RestClient.builder(
new HttpHost(elasticSearchIp, elasticSearchPort)
)
);
return client;
}
}
复制代码
CanalDataParser.java 展示主要代码
public static List<Map<String, Object>> printEntry(List<Entry> entrys) {
List<Map<String, Object>> rows = new ArrayList<>();
for (Entry entry : entrys) {
long executeTime = entry.getHeader().getExecuteTime();
long delayTime = System.currentTimeMillis() - executeTime;
Date date = new Date(entry.getHeader().getExecuteTime());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
TransactionBegin begin = null;
try {
begin = TransactionBegin.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事务头信息,执行的线程id,事务耗时
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()),
simpleDateFormat.format(date),
entry.getHeader().getGtid(),
String.valueOf(delayTime)});
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
printXAInfo(begin.getPropsList());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
end = TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事务提交信息,事务id
logger.info("----------------\n");
logger.info(" END ----> transaction id: {}", end.getTransactionId());
printXAInfo(end.getPropsList());
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
}
continue;
}
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
logger.info(row_format,
new Object[]{entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType,
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql() + SEP);
continue;
}
printXAInfo(rowChage.getPropsList());
for (RowData rowData : rowChage.getRowDatasList()) {
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
HashMap<String, Object> map = new HashMap<>(16);
for (Column column: columns){
map.put(column.getName(), column.getValue());
}
rows.add(map);
}
}
}
return rows;
}
复制代码
ElasticUtils.java
package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
/**
* @author haha
*/
@Slf4j
@Component
public class ElasticUtils {
@Autowired
private static RestHighLevelClient restHighLevelClient;
public static void saveEs(Student student, String index) {
IndexRequest indexRequest = new IndexRequest(index)
.id(student.getId())
.source(JSON.toJSONString(student), XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("保存数据至ElasticSearch成功:{}", response.getId());
} catch (IOException e) {
log.error("保存数据至elasticSearch失败: {}", e);
}
}
public static void getEs(String index, String id) throws IOException {
GetRequest getRequest = new GetRequest(index, id);
GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
Map<String, Object> fields = response.getSource();
for (Map.Entry<String, Object> entry : fields.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
public static void updateEs(Student student, String index) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON);
UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("更新数据至ElasticSearch成功:{}", response.getId());
}
public static void DeleteEs(String index, String id) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(index, id);
DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("删除数据至ElasticSearch成功:{}", response.getId());
}
}
复制代码
BinLogElasticSearch.java
package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* @author haha
*/
@Slf4j
@Component
public class BinLogElasticSearch {
@Autowired
private CanalConnector canalSimpleConnector;
// @Autowired
// @Qualifier("canalHaConnector")
// private CanalConnector canalHaConnector;
public void binLogToElasticSearch() {
//连接CanalServer
canalSimpleConnector.connect();
// 订阅destination
canalSimpleConnector.subscribe();
// 轮询拉取数据
Integer batchSize = 5 * 1024;
while (true) {
// Message message = canalHaConnector.getWithoutAck(batchSize);
Message message = canalSimpleConnector.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
log.info("当前监控到binLog消息数量{}", size);
if (id == -1 || size == 0) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//1. 解析message对象
List<CanalEntry.Entry> entries = message.getEntries();
List<Map<String, Object>> rows = CanalDataParser.printEntry(entries);
for (Map<String, Object> map : rows) {
Student student = new Student();
student.setId(map.get("id").toString());
student.setAge(Integer.parseInt(map.get("age").toString()));
student.setName(map.get("name").toString());
student.setSex(map.get("sex").toString());
student.setCity(map.get("city").toString());
// 2。将解析出的对象同步到elasticSearch中
ElasticUtils.saveEs(student, "student_index");
canalSimpleConnector.ack(id);
}
}
}
}
}
复制代码
CanalDemoApplication.java(spring boot 启动类)
package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author haha
*/
@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner {
@Autowired
private BinLogElasticSearch binLogElasticSearch;
public static void main(String[] args) {
SpringApplication.run(CanalDemoApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
binLogElasticSearch.binLogToElasticSearch();
}
}
复制代码
application.properties
server.port=8081
spring.application.name = canal-demo
canal.server.ip = 192.168.124.5
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = 192.168.124.5:2181
elasticSearch.server.ip = 192.168.124.5
elasticSearch.server.port = 9200
复制代码
canal集群高可用的搭建
通过上面的学习,我们知道了单机直连方式的canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么canal的多实例集群方式如何搭建呢!
基于zookeeper获取canal实例
- 机器准备
- 运行canal的机器: 10.20.144.22 , 10.20.144.51.
- zookeeper地址为10.20.144.51:2181
- mysql地址:10.20.144.15:3306
- 按照部署和配置,在单台机器上各自完成配置,演示时instance name为example
- 修改canal.properties,加上zookeeper配置
canal.zkServers=10.20.144.51:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
复制代码
2. 创建example目录,并修改instance.properties
复制代码
canal.instance.mysql.slaveId = 1234 ##另外一台机器改成1235,保证slaveId不重复即可
canal.instance.master.address = 10.20.144.15:3306
复制代码
注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
启动两台机器的canal
启动后,你可以查看logs/example/example.log,只会看到一台机器上出现了启动成功的日志。
比如我这里启动成功的是10.20.144.51
2013-03-19 18:18:20.590 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-03-19 18:18:20.596 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-03-19 18:18:20.831 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-03-19 18:18:20.845 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful...
复制代码
查看一下zookeeper中的节点信息,也可以知道当前工作的节点为10.20.144.51:11111
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running
{"active":true,"address":"10.20.144.51:11111","cid":1}
复制代码
客户端链接, 消费数据
可以直接指定zookeeper地址和instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接:
CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", "example", "", "");
复制代码
链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等 (聪明的你,应该也可以发现,canal client也可以支持HA功能)
[zk: localhost:2181(CONNECTED) 17] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"10.12.48.171:50544","clientId":1001}
复制代码
数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点. (下次你重启client时,会从这最后一个位点继续进行消费)
[zk: localhost:2181(CONNECTED) 16] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId
复制代码
重启一下canal server
停止正在工作的10.20.144.51的canal server
复制代码
ssh 10.20.144.51
sh bin/stop.sh
复制代码
这时10.20.144.22会立马启动example instance,提供新的数据服务
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"10.20.144.22:11111","cid":1}
复制代码
与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成
异常总结
docker network create --subnet=172.18.0.0/16 mynetwork
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
复制代码
如果创建时未指定 --restart=always ,可通过update 命令
docker update --restart=always xxx
复制代码