问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

借助Canal实现MySQL数据库间链接canal链接mysql

发布网友 发布时间:2024-09-17 05:51

我来回答

1个回答

热心网友 时间:2024-09-24 22:47

借助Canal实现MySQL数据库间链接
在现代化的应用开发中,不同的系统或应用之间可能需要共享同一个数据库,这时候就需要实现数据库间的链接。本文将介绍如何借助Canal实现MySQL数据库间链接。Canal是阿里巴巴开源的一款数据库增量订阅和消费组件,支持MySQL、Oracle等数据库,它可以将数据库更新的数据通过可靠的方式同步到其他数据存储、NoSQL等系统中。
一、Canal介绍
Canal是阿里巴巴开源的一款数据库增量订阅和消费组件,是基于MySQL数据库增量日志构建的,从而实现了与数据源(如MySQL)解耦,达到了异构神异的目的。Canal主要包括三个模块: Canal.Admin、Canal.Server和Canal.Client。
Canal.Admin: Canal控制台管理界面,用于管理Canal的启停和监控
Canal.Server: Canal的工作服务端,负责从数据源(如MySQL)订阅增量日志,并把日志传输给客户端
Canal.Client: Canal的客户端,用于订阅和消费Canal.Server传输的数据
二、Canal的使用场景
Canal主要应用于以下场景:
1、数据实时同步
提供不同数据存储的数据实时同步,如 MySQL 到 Elasticsearch 的同步,实时更新数据,保持数据一致
2、数据订阅
对于需要全量数据同步的场景,结合 snapshot 快照机制,可以实现数据全量订阅
3、实时数据分析
对数据实时抓取,进行数据分析计算
4、缓存更新
将数据更新到Cache(如Redis)中,提升系统性能
三、Canal的具体实现
将A库的数据同步到B库中,具体实现如下:
1、安装Canal
Canal的安装需要先下载源码,然后进行编译打包,具体步骤可以参考Canal官网: https://github.com/alibaba/canal
2、配置Canal
Canal的配置文件位于config文件夹下,通过修改canal.properties实现配置。
(1)配置MySQL的主从关系
# mysql主从地址信息
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal_test
canal.instance.dbPassword=canal_test
# 配置binlog信息,也可以从当前解析到的binlog中获取,
# 优先从binlog position 获取,找不到才到 GTID_GET中获取, gtid模式推荐打开,
# 当前的timestamp可以通过show master status或 show binary logs获取
canal.instance.connectionCharset = UTF-8
canal.instance.gtidon=on
canal.instance.position =
(2)配置Canal连接内容
# 配置instance连接信息
canal.instance.filter.regex=canal_test.tb_goods
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
(3)配置数据输出方式
# 配置数据输出方式
canal.mq.topic=test
# 指定数据传输格式
canal.mq.flatMessage = false
(4)配置kafka通常参数和账号信息
# canal.mq.producerGroup 客户端group组名,同一个topic下的不同group组互不影响
# canal.mq.servers 指定mq服务器的地址
# canal.mq.topics 指定MQ topic主题名称
# authAccount ,配置到应用已指定的账号
canal.mq.properties.bootstrap.servers=192.168.11.131:9092
canal.mq.producer.bootstrap-servers=192.168.11.131:9092
canal.mq.producer.topic=myTest can
(5)启动Canal
执行bin目录下的startup脚本,即可启动Canal。
3、配置Canal客户端
在B库中新建表,同步A库的数据到该表中。
(1)在B库中创建表
mysql> create database canal_test2;
mysql> use canal_test2;
mysql> create table tb_goods(
-> id int(11) not null auto_increment primary key,
-> name varchar(60) not null,
-> price int(10) not null
-> )engine=innodb default charset=utf8;
(2)在Canal服务端中新增instance,即配置同步关系
mysql> create database canal_client;
mysql> use canal_client;
mysql> create table canal_client.tb_goods(
-> id int(11) not null,
-> name varchar(60) not null,
-> price int(10) not null
-> )engine=innodb default charset=utf8;
mysql> GRANT ALL PRIVILEGES ON canal_client.* TO canal@’%’ IDENTIFIED BY ‘canal_test’ WITH GRANT OPTION;
(3)在Canal客户端中启动Canal
通过Canal客户端,将A库的数据同步到B库中的表中,具体代码实现如下:
public class SimpleCanalClientExample {
public static void mn(String[] args) {
// 从控制台读取参数
String host = args[0];
int port = Integer.valueOf(args[1]);
String destination = args[2];
String username = args[3];
String password = args[4];

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host,
port), destination, username, password);
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(“canal_test.tb_goods”);
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
System.out.printf(“batchId: %s, size: %s \n”, batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void printEntry(List entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND
|| entry.getEntryType() == CanalEntry.EntryType.HEARTBEAT) {
continue;
}
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
continue;
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
printColumn(rowData.getBeforeColumnsList());
System.out.println(“=======”);
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + “\t” + column.getValue() + “\t” + column.getUpdated());
}
}
}
通过以上代码,我们就可以将A库的tb_goods表中的数据实时同步到B库中的canal_client库的tb_goods表中。
四、总结
Canal是一款非常优秀的数据库增量订阅和消费组件,它可以很好地解决数据库间链接的问题,实现不同数据存储之间的数据同步。我们可以通过Canal的控制台管理界面,或者通过Canal的客户端代码实现数据库之间的数据同步。当然,Canal也有其缺点,就是在高并发场景下,可能会受到性能的限制,这需要我们在具体的应用场景中进行实际评估。
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
找专业防水队做完还漏水怎么维权 法院会受理房屋漏水造成的纠纷吗? 巴西龟最长活多久,家养!!! 养胃的药最好的是什么啊 婴儿积食发烧不愿吃药怎么办 板门穴位在哪个部位 手机设置放偷看的方法? 凝结水回收器生产厂家? 个人账户养老金预测公式:现有5万元,缴费20年,能领多少钱? 临沂比较有名的男装品牌 乳汁牛奶奶油蛋糕好吃吗?有什么特殊之处? 什么是风险合规 合规风险评估与预防:首席合规官的风险管理方法 我的电脑屏幕保护怎么没有五彩球 为什么我的360桌面加速球不见了啊? 为什么我的电脑360没有悬浮窗呢? 电脑屏幕上 开始上的游戏 三维铅球 怎么弄出来 以前买电脑的时候有 怎... 长春砍手门事件简介 是选装修公司好还是自己去找人装好一点? 四川省想要采购乌木可以去哪里? 我昨天做梦梦到别人的头掉了。全是在我面前、就感觉自己看得到却没有... 太阳微Oracle公司收购Sun sun软件是什么 开饿了么会员有啥用? 空调清洗后有难闻的味道怎么回事呢? 空调清洗后有难闻的味道?? 去泰国旅游下载什么翻译软件 您好,我是黑龙江的户口,在河北上的大专,08年毕业,学的是法律专业,我现... 我想在河北这地区找份工作,我住在石家庄,考虑去石家庄或者唐山工作... 怎么设置手机微信的照片权限? 大数据的数据处理流程 使用Canal分布式管理MySQLcanal与mysql 实木地板环保标准是什么 橡木实木地板价格多少,一般一平米多少钱 地板砖和木地板有哪些区别应该怎么选 五一期间,长沙臭豆腐在哪里可以品尝到最正宗的味道? 长沙有哪些吃臭豆腐的好去处? 屋脊上装饰的脊兽有哪些 故宫太和殿屋脊兽的顺序对吗一篇说清楚 将臣为什么咬徐福 怎么查团员证的入团时间? 诺基亚X1-01上市报价怎么样 入门级双卡双待仅售百元 发展对象面试答辩(44个可能出现的问题) 诺基亚有百元价位的手机么? 发展对象转预备党员答辩 微信公众号如何实现付费阅读? 信息网络安全风险评估的方法有哪些? - 知乎 哪个不属于网络安全风险评估的步骤 风火牙疼要吃什么药,已经疼了一个多月 感冒了 ,上火牙痛,怎么办