安亿程序开发

Linux宝塔面板使用Canal实现Mysql和Redis数据同步(图文教程)

21 次浏览

本文详细描述了一个项目如何通过Canal解决从MySQL读取配置文件的压力问题,包括Linux环境设置、MySQL配置、Canal服务端安装与配置,以及在SpringBoot中接入Canal进行MySQL信息的实时监控

前言

最近有一个项目,频繁读配置文件,如果用mysql怕压力大,一直用的Redis,用Redis虽然解决了读的问题,但是修改的话也要单独写配置,同时在其他环境配置也不方便保存,为此找了mysql和redis同步方案Canal。

Canal 是什么

Canal 是阿里巴巴开源的一款基于 MySQL 数据库二进制日志(Binlog)增量订阅和消费的中间件。它的名字来源于 "canal"(运河、通道),寓意着在数据库和应用之间建立一条数据同步的通道。

工作原理流程:
MySQL 主库 → Binlog(二进制日志) → Canal Server(伪装成从库) → 解析日志 → 发送数据 → 各种下游系统

技术原理

  • Canal 模拟 MySQL Slave 的交互协议

  • 向 MySQL Master 发送 dump 请求

  • MySQL Master 收到请求后,开始推送 Binlog 给 Canal

  • Canal 解析 Binlog 对象(原始为 byte 流)

  • 将解析后的数据发送到消息队列或其他下游系统

主要应用场景

应用场景 具体说明 典型案例
数据库镜像 实时备份数据库数据 主从数据库同步、异地多活
数据库实时备份 增量数据的实时备份 灾备系统、数据恢复
多级索引构建 同步数据到搜索引擎 Elasticsearch、Solr 索引更新
业务缓存刷新 数据库变更后更新缓存 Redis 缓存一致性维护
价格变化监控 监控特定数据的变化 电商价格监控、库存预警
增量数据订阅 为其他系统提供增量数据 数据仓库 ETL、大数据分析
异地数据同步 跨机房、跨地域数据同步 全球化业务数据同步

Linux系统环境准备

1.宝塔面板mysql修改配置

server-id=1    #master端的ID号【必须是唯一的】;
log_bin=mysql-bin    #同步的日志路径,一定注意这个目录要是mysql有权限写入的
binlog-format=row    #行级,记录每次操作后每行记录的变化。
binlog-do-db=game_mqtt  #指定库,缩小监控的范围。

查看是否生效:

show variables like 'log_bin';

 查看正在写入的binlog文件状态:

show master status;

 

2. mysql为canal配置权限

在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限

账号密码都是:canal

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

3.安装canal服务端

 安装 canal.deployer-1.1.6.tar.gz

官网下载地址https://github.com/alibaba/canal/releases

备用下载地址:https://download.csdn.net/download/qq_33215204/88369947

 将文件上传服务器目录   /data/soft/canal

解压文件

tar -zxvf canal.deployer-1.1.6.tar.gz 

4.配置canal

查看 conf/canal.properties 配置,发现需要暴漏三个端口

 canal.admin.port = 11110
 canal.port = 11111
 canal.metrics.pull.port = 11112

conf/canal.properties 配置

# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example
 

修改 conf/example/instance.properties 实例配置 

# 配置 slaveId 自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10 

# 数据库地址:自己的数据库ip+端口
canal.instance.master.address=127.0.0.1:3306 
 
# 数据库用户名和密码 
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

#代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
canal.instance.connectionCharset = UTF-8
	
# 指定库和表,这里的 .* 表示 canal.instance.master.address 下面的所有数据库
canal.instance.filter.regex=.*\\..*

 配置完成之后启动

5.启动canal

cd /data/soft/canal/bin

./startup.sh

启动完成之后去看下有没有日志信息。

2023-09-24 12:14:24.280 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2023-09-24 12:14:24.309 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-09-24 12:14:24.309 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-09-24 12:14:24.320 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-09-24 12:14:24.480 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-09-24 12:14:24.480 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2023-09-24 12:14:26.135 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000045,position=4,serverId=1,gtid=<null>,timestamp=1695526635000] cost : 1634ms , the next step is binlog dump

 启动成功,接下来就是在springboot中创建客户端监控了。

canal客户端监控mysql信息,实现业务逻辑

1.配置pom.xml

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
 

2.创建demo

package com.game.Service;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    public static void main(String args[]) {
        // 创建链接:换成自己的数据库ip地址
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.0.243",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}


3.测试删除一条记录

后面的就是监控数据同步Redis,就不写了