Canal HA原理及安裝

manba_yqq 2022-01-08 08:02:21 阅读数:288

canal ha 原理

Canal HA原理

Canal一般用於實時同步數據場景,那麼對於實時場景HA顯得尤為重要,Canal支持HA搭建,canal的HA分為兩部分,canal server和canal client分別有對應的HA實現。大數據中使用Canal同步數據一般同步到Kafka中,這裏Kafka相當於是Canal Client,Kafka集群自帶HA屬性,所以這裏我們 只關注Canal Server HA。Canal Server HA主要是為了减少對mysql dump的請求,不同server上的instance(不同server上的相同instance)要求同一時間只能有一個處於running,其他的處於standby狀態(standby是instance的狀態),Canal Server HA原理如下:
在這裏插入圖片描述
Canal HA 保證步驟如下:
1)canal server要啟動某個canal instance時都先向zookeeper_進行一次嘗試啟動判斷。
2)創建zookeeper節點成功後,對應的canal server就啟動對應的canal instance,沒有創建成功的canal instance就會處於standby狀態。
3)一旦zookeeper發現canal server A創建的instance節點消失後,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance。
4)canal client每次進行connect時,會首先向zookeeper詢問當前是誰啟動了canal instance,然後和其建立鏈接,一旦鏈接不可用,會重新嘗試connect。

Canal HA 搭建

1)機器准備
運行Canal的機器:node3,node4
zookeeper地址:node2:2181,node3:2181,node4:2181
mysql地址:node1:3306
2)在node3,node4上單獨部署配置Canal
將Canal安裝包上傳到node3,node4,並解壓到“/opt/canal”目錄下,修改“/opt/canal/conf”下的canal.properties文件,加上zookeeper配置

#指定zookeeper集群地址
canal.zkServers = node2:2181,node3:2181,node4:2181
#配置spring的xml配置文件
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
#canal將數據寫入Kafka,可配:tcp, kafka, RocketMQ,tcp就是使用canal代碼接收
canal.serverMode = kafka
#配置canal寫入Kafka地址
canal.mq.servers = node1:9092,node2:9092,node3:9092

進入“/opt/canal/conf/example”目錄,修改“instance.properties”文件:

#另外一臺機器改成123457,保證slaveId不重複即可
canal.instance.mysql.slaveId=123456
#配置mysql master 節點及端口
canal.instance.master.address=node2:3306
#配置連接mysql的用戶名和密碼,就是前面複制權限的用戶名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#配置Canal將數據導入到Kafka topic
canal.mq.topic=canal_topic

注意:兩臺機器上的instance目錄的名字需要保證完全一致,HA模式是依賴於instance name進行管理,同時必須都選擇default-instance.xml配置,此配置中才有關於zookeeper的設置信息。
3)啟動兩臺機器的Canal

#在node3上啟動Canal
[[email protected] ~]# cd /opt/canal/bin
[[email protected] bin]# ./startup.sh
#在node4上啟動Canal
[[email protected] ~]# cd /opt/canal/bin
[[email protected] bin]# ./startup.sh

啟動完成後,可以查看zookeeper中對應的路徑信息:

[zk: localhost:2181(CONNECTED) 2] ls /otter/canal/cluster
[192.168.134.103:11111, 192.168.134.104:11111]
[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
{
"active":true,"address":"192.168.134.103:11111"}

Canal HA 測試

1)向Mysql中“testdb.person”錶中寫入數據

mysql> insert into person values (4,"s1",21),(5,"s2",22),(6,"s3",23);

可以觀察到Kafka canal_topic中有監控到的數據如下:

[[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic canal_topic
{
"data":[{
"id":"4","name":"s1","age":"21"},{
"id":"5","name":"s2","age":"22"},{
"id":"6","name":"s3","age":"23"}],"database":"testdb","es":1641283997000,"id":2,"isDdl":false,"mysqlType":{
"id":"int","name":"varchar(255)","age":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{
"id":4,"name":12,"age":4},"table":"person","ts":1641283997225,"type":"INSERT"}

2)關閉active Canal Server節點,繼續向Mysql錶中寫入數據
關閉node3 Canal Server:

[[email protected] bin]# ./stop.sh

查看zookeeper “/otter/canal/destinations/examples/running”路徑Active的Canal節點:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{
"active":true,"address":"192.168.134.104:11111"}

繼續向MySQL中“testdb.person”錶中寫入數據:

mysql> insert into person values (4,"s1",21),(5,"s2",22),(6,"s3",23);

可以觀察寫入到Kafka “canal_topic”中數據如下:

[[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic canal_topic
{
"data":[{
"id":"4","name":"s1","age":"21"},{
"id":"5","name":"s2","age":"22"},{
"id":"6","name":"s3","age":"23"}],"database":"testdb","es":1641283997000,"id":2,"isDdl":false,"mysqlType":{
"id":"int","name":"varchar(255)","age":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{
"id":4,"name":12,"age":4},"table":"person","ts":1641283997225,"type":"INSERT"}
{
"data":[{
"id":"4","name":"s1","age":"21"},{
"id":"5","name":"s2","age":"22"},{
"id":"6","name":"s3","age":"23"}],"database":"testdb","es":1641284121000,"id":2,"isDdl":false,"mysqlType":{
"id":"int","name":"varchar(255)","age":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{
"id":4,"name":12,"age":4},"table":"person","ts":1641284121920,"type":"INSERT"}

經過以上測試,Canal HA 生效。

注意:經過測試Canal HA 在使用zookeeper存儲binlog position時,當有一個Canal Server重新啟動並切換成Active節點時,每次都會重複讀取最後一條數據。使用非HA 本地存儲binlog position時,沒有此問題。

版权声明:本文为[manba_yqq]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201080802214483.html