canal高可用的数据重复消费问题,自动切换时消费端数据重复消费,canal高可用的搭建,运用以及问题,以及元数据保存问题,保存zookeeper清除_遗梦孤魂的博客-CSDN博客_canal数据重复


本站和网页 https://blog.csdn.net/weixin_40126236/article/details/90474784 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

canal高可用的数据重复消费问题,自动切换时消费端数据重复消费,canal高可用的搭建,运用以及问题,以及元数据保存问题,保存zookeeper清除_遗梦孤魂的博客-CSDN博客_canal数据重复
canal高可用的数据重复消费问题,自动切换时消费端数据重复消费,canal高可用的搭建,运用以及问题,以及元数据保存问题,保存zookeeper清除
遗梦孤魂
于 2019-05-23 11:05:53 发布
4840
收藏
分类专栏:
error__problem
MySQL-System
文章标签:
canal重复消费
canal的HA
canal高可用搭建
canal的连接
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_40126236/article/details/90474784
版权
error__problem
同时被 2 个专栏收录
29 篇文章
0 订阅
订阅专栏
MySQL-System
18 篇文章
0 订阅
订阅专栏
一、问题描述
搭建的canal是高可用模式,在IDEA里面进行消费的,但是在服务端进行切换时,出现了数据重复被消费的问题。salve1:11111开启服务时,往数据库里面插入了一条数据,然后又删除了这条数据,这是Mysql的bin-log会产生两条日志,客户端也获取到了这个两条数据。当我把salve1的服务stop.sh关掉之后,salve2:11111开启了服务,但是在客户端又重新获取到了这两条数据,也就是说在我切换之后数据被重新消费了一遍。之后我又将salve1的服务开启,将salve2的服务关闭,这个两条数据又被我的客户端消费了一遍,也就这两条数据被消费了6次了,,,,,,总之每次切换服务端数据都会再次被客户端消费。
网上:Canal会导致消息重复吗?
    答:会,这从两个大的方面谈起。
        1)Canal instance初始化时,根据“消费者的Cursor”来确定binlog的起始位置,但是Cursor在ZK中的保存是滞后的(间歇性刷新),所以Canal instance获得的起始position一定不会大于消费者真实已见的position。
        2)Consumer端,因为某种原因的rollback,也可能导致一个batch内的所有消息重发,此时可能导致重复消费。
    我们建议,Consumer端需要保持幂等,对于重复数据可以进行校验或者replace。对于非幂等操作,比如累加、计费,需要慎重。
二、高可用搭建(HA)
可参考:https://github.com/alibaba/canal/wiki/AdminGuide#user-content-ha%E6%A8%A1%E5%BC%8F%E9%85%8D%E7%BD%AE
   1、配置文件修改(两台机器都要下载canal,配置都一样)
① 修改canal.properties,加上zookeeper配置,spring配置选择default-instance.xml
1 canal.zkServers=hadoop:2181 ##zookeeper的ip和端口号
2 canal.instance.global.spring.xml = classpath:spring/default-instance.xml ##开启这个(默认被注释了)
 ② 创建example目录,并修改instance.properties
1 canal.instance.mysql.slaveId = 1234 ##另外一台机器改成1235,保证slaveId不重复即可
2 canal.instance.master.address = hadoop:3306 ##数据库的ip和端口
3 canal.instance.dbUsername=canal ##Mysql的用户名 (这个可以没有)
4 canal.instance.dbPassword=Canal2019! ##Mysql的密码(这个可以没有)
   2、开启服务(两台都开启)
到安装目录下
./bin/startup.sh (开启服务)
./bin/stop.sh (关闭服务)
tail -F logs/canal/canal.log (查看日志)
有以下三句话表示启动成功
INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[*.*.*.*:11111]
INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ...
   3、到zookeeper的客户端查看
查看在活跃的服务端
get /otter/canal/destinations/example/running
{"active":true,"address":"*.*.*.*:11111","cid":1}
查看在活跃的客户端
get /otter/canal/destinations/example/1001/running
{"active":true,"address":"*.*.*.*:55285","clientId":1001}
查看zookeeper同步的元数据
get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"hadoop","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000005","position":28477,"serverId":1,"timestamp":1558665144000}}
删除
rmr /otter/canal/destinations/example
在zookeeper保存 的数据目录,推荐工具:http://www.onlinedown.net/soft/1222234.htm
三、IDEA客户端消费
在idea端编写client消费数据,来回的切换server的时候数据就会不断的重新消费。需要的maven依赖直接来:
https://mvnrepository.com/ 搜索:canal
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @ClassName: NetTest
* @Description: TODO
* @Author: *******
* @Data: 2019/5/15 10:16
* @Version: 1.0
**/
public class NetTest {
public static void main(String args[]) {
String destination = "example";
String username = "canal";
String password = "Canal2019!";
InetSocketAddress inetSocketAddress = new InetSocketAddress("hadoop", 11111);
// 创建链接
CanalConnector connector = CanalConnectors.newClusterConnector("hadoop:2181",destination, username, password);
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {//120*2秒还没有数据就断开
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
} else {
emptyCount = 0;
printEntry(message.getEntries());
connector.ack(batchId); // 提交确认
} finally {
connector.disconnect();
//输出数据
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s , ",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
四、问题解决
问题1、数据重复消费
上文提到数据重复消费,不断的切换server数据就不断地重新被消费,之所以被会这样就是zookeeper没有同步client的meta数据,两个服务器在zookeeper中保存的元数据不一样,所以在互相切换的时候,服务端不认为是一个客户端在消费(其实每个server有且只能有一个client)。问题的根源找到了,就是在配置高可用的时候,有配置问题。
原来是一个服务器,我直接将所有的文件拷贝过去,所有的配置都是一样的,后来拷贝的连接数据库连接不上,查看发现数据的配置canal.instance.master.address 是localhost拷贝过来的文件,后来将localhost改成了数据的真实的ip:*.*.*.*,改完之后高可用就可以用,在使用的过程中出现了数据重复。
排查找到了zookeeper元数据问题,核对发现get /otter/canal/destinations/example/1001/cursor ,一个服务端的元数据中"address":"hadoop",而另一个的address 是localhost,就是因为这两个切换,导致数据重复消费。
解决:
修改conf/example/instance.properties文件,将canal.instance.master.address的值都改成真实的ip,不要用主机映射的名字
例:canal.instance.master.address =192.168.123.45:3306
问题2、canal的服务开启了,但是连接不上
canal的服务开启了,查看日志文件,里面也确实是启动了,但是就是连接不上
解决:修改conf/canal.properties 文件,将canal.ip改成本机真实的ip,如果用主机的映射开启的服务就是:主机名:11111,而你在代码里面连接的时候输入的主机名在解析的时候会被解析成真实的ip,根据这个ip去找相关的服务。而服务端并非ip+端口
建议:canal里面的所有关于ip的配置最好不要用ip映射的名字(例如:127.0.0.1,localhost,映射名等等),最好都用真实的ip
可参考:https://shift-alt-ctrl.iteye.com/blog/2399603
遗梦孤魂
关注
关注
点赞
收藏
打赏
评论
canal高可用的数据重复消费问题,自动切换时消费端数据重复消费,canal高可用的搭建,运用以及问题,以及元数据保存问题,保存zookeeper清除
一、问题描述搭建的canal是高可用模式,在IDEA里面进行消费的,但是在服务端进行切换时,出现了数据重复被消费的问题。salve1:11111开启服务时,往数据库里面插入了一条数据,然后又删除了这条数据,这是Mysql的bin-log会产生两条日志,客户端也获取到了这个两条数据。当我把salve1的服务stop.sh关掉之后,salve2:11111开启了服务,但是在客户端又重新获取到了这两...
复制链接
扫一扫
专栏目录
阿里巴巴Canal常见问题:重复解析Filter失效消费落后
小熊的博客
08-28
352
阿里巴巴Canal常见问题:重复解析/Filter失效/消费落后
前言
Canal是阿里巴巴开源的数据库Binlog日志解析框架,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
在之前我写的文章阿里开源MySQL中间件Canal快速入门中,我已经介绍了Canal的基本原理和基础使用。
在部署到生产环境的过程中,自己作为一个菜鸟,又踩了一些坑,期间做了记录和总结,并再解决后分析了下原因,便有了此文。
本文重点内容
Canal常见的三大问题原因分析及解决方案
Binlog解析错误
实战,Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步
wdjnb的博客
12-27
1231
大家好,我是不才陈某~
数据同步一直是一个令人头疼的问题。在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方。
但是随着业务量增大,数据量变多以及各种复杂场景下的分库分表的实现,使数据同步变得越来越困难。
今天这篇文章使用阿里开源的中间件Canal解决数据增量同步的痛点。
文章目录如下:
Canal是什么?
canal译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供..
评论 1
您还未登录,请先
登录
后发表或查看评论
canal搭建及消费日志过程中的遇到的问题《亲测版》
blogs_broadcast的博客
05-26
6550
一、部署canal:3.1开启MySQL的binlog功能,并配置binlog模式为row。通过set global * = *, 立即生效,重启后重新读取配置文件my.cnf ,想永久生效记得修改配置文件log-bin = mysql-binbinlog-format=Rowserver-id=1(和canal的slaveId不重复即可,canal默认1234,一般不用修改)3.2在mysq...
canal简介
guanfengliang的专栏
07-15
1万+
阿里开源canal简介
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
1.背景
阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger(触发器)的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪
canal常见问题答疑【转载】
u013764793的博客
02-08
1355
canal常见问题
【开源实战】Canal部署常见问题:重复解析/Filter失效/消费落后
蛮三刀酱
06-11
2563
前言
Canal是阿里巴巴开源的数据库Binlog日志解析框架,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
在之前我写的文章阿里开源MySQL中间件Canal快速入门中,我已经介绍了Canal的基本原理和基础使用。
在部署到生产环境的过程中,自己作为一个菜鸟,又踩了一些坑,期间做了记录和总结,并再解决后分析了下原因,便有了此文。
本文重点内容
Canal常见的三大问题原因分析及解决方案
Binlog解析错误:重复解析/DML解析为QUERY
Filter失效:设置过滤器无.
canal 投递 数据 只进kafka 0分区
weixin_43564627的博客
07-21
228
canal 投递 数据 只进kafka 0分区
首先要确保其余kafka 有多个分区
在 instance.properties 加入以下配置即可 二者缺一不可
下面一段摘至官网:
mq顺序性问题
1.canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
2.canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topi
使用canal同步数据,踩坑排雷全过程
boyames的博客
05-04
2476
1、mysql配置
(1)检查binlog功能是否有开启
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
1 row in set (0.00 sec)
如果log_bin 显示为OFF则说明服务没有开启,需要修改Linux中
canal 集群配置 总结
you_are_my_life的博客
04-20
331
Canal——高可用架构设计与应用
六、总结
1. 启动两个监听example1的canal client,启动两个监听example2的canal client:
在example1或example2对应的数据发生变化时,两个canal client只有一个消费消息。
当两个监听同一个队列的canal client有一个宕掉时,再有数据变化时,剩下的一个canal client就会开始消费数据。
这就验证了canal client的HA机制:为了保证有序性,一份instance同一时
named 客户端无法解析_【开源实战】Canal部署常见问题:重复解析/Filter失效/消费落后...
weixin_39942213的博客
12-03
155
前言Canal是阿里巴巴开源的数据库Binlog日志解析框架,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。在之前我写的文章阿里开源MySQL中间件Canal快速入门中,我已经介绍了Canal的基本原理和基础使用。在部署到生产环境的过程中,自己作为一个菜鸟,又踩了一些坑,期间做了记录和总结,并再解决后分析了下原因,便有了此文。本文重点内容 Canal常见的三大问题原因分析...
数据科学复习第一章
weixin_43699840的博客
10-25
417
第一章
1.数据的定义
(1)统计学:为了找出问题背后的规律而需要的,与问题 相关的变量的观测值,是对客观现象进行计量的结果。
(2) 计算机科学:所有能输入到计算机,并被计算机程序处理 的符号总称,是用于输入电子计算机进行处理,具有一定 意义的数字、字母、符号和模拟量等的通称。
(3)数据科学:在一定背景下有意义的对于现实世界中的事物定性或定量的记录。
2.数据的类型
3.DIKW模型
4.大数据的定义
5.大数据的5V模型
6.大数据的5R模型
7.大数据的4P医疗模型
8.HACE定理
..
canal 停机后继续从上次位置拉取binlog的原理
siyuan494的博客
12-29
6135
CanalMetaManager 和 CanalLogPositionManager
1)canalInstance->CanalMetaManager 1对1
2)canalInstance->CanalLogPositionManager 1对多 当groupDbAddresses的groupSize()>1时 为1对多
因为:
canalInstance->CanalEventPar
canal 1.1.4 小记
qq_42496461的博客
01-10
2396
Canal 1.1.4 小记
一 安装前准备
链接 CSDN canal同步mysql数据到es、oracle、mq、redis和mysql中
链接 博客园 canal的使用记录 (工作方式及链接方式)
Canal github
链接 Canal 的参数
链接 阿里 canal 的理解
链接 Canal数据库同步组件
链接 canal 概述
deployer : 相当于服务端
adapater...
canal消耗内存_canal探究
weixin_39646688的博客
12-20
352
前面的文章使用canal订阅mysql数据变动进而同步数据,这里研究canal的内部特性,进而更好地使用canal,大部分内容来自官网,还有一部分来自我的理解。canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。应用场景:异构数据同步数据库实时备份业务cache刷新原理canal模拟成mysql slave向master发送dump请求,收到binlog数据进行解析s...
Canal重启策略
weixin_38168703的博客
03-24
691
Canal重启策略
线上环境Canal Instance重启,为了防止数据丢失,需要指定position重启
具体步骤如下:
停止Instance
根据%canal_deployer%/conf/Instance_name/mete.dat元数据文件修改Instance配置文件。注意这里一定要修改journal.name和position两个配置。
删除%canal_deployer%/conf/Instance_name/mete.dat元文件。
重启Instance。
...
超详细的Canal入门,看这篇就够了!
热门推荐
yehongzhi1994的博客
08-09
19万+
Canal入门看这篇就够了!
MySql 8.0主从复制及常见问题解决方法
66进的博客
08-25
213
主库配置:
[mysqld]
#设置服务器id,为1表示主服务器,实例唯一ID,不能和canal的slaveId重复
server_id=1
#启动MySQ二进制日志系统
log-bin=mysql-bin
#选择row模式
binlog-format=ROW
#需要同步的数据库名,如果有多个数据库,可重复此参数,每个数据库一行
binlog-do-db=test
#不同步mysql系统数据库
binlog-ignore-db=mysql
从库配置
[mysqld]中..
Canal实时采集mysql TCP,kafka 连接
最新发布
A3213383291的博客
05-12
507
Canal实时采集数据库信息 TCP)Canal简介MySQL 的 BinlogBinlog 的分类连接TcpTCP测试canal连接kafka
Canal简介
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。
目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。
MySQL 的 Binlog
MySQL 的二进制日
canal(基于mysql数据库binlog的增量订阅&消费)
mnasd的博客
12-05
998
一、背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的c...
tcp实时传输kafka数据_canal实时同步mysql表数据到Kafka
weixin_39921689的博客
12-20
73
canal实时同步mysql表数据到Kafka准备对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不...
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:书香水墨
设计师:CSDN官方博客
返回首页
遗梦孤魂
CSDN认证博客专家
CSDN认证企业博客
码龄5年
暂无认证
77
原创
4万+
周排名
9674
总排名
18万+
访问
等级
2322
积分
20
粉丝
42
获赞
59
评论
189
收藏
私信
关注
热门文章
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result
11914
java集合之List线程安全性比较总结
10057
top命令下CPU%一直超过百分之100甚至百分之200的原因
9623
catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'timesta
9532
canal-admin的高可用使用,单机使用,HA使用,阿里的canal的UI界面,管理canal的实例,以及问题
9321
分类专栏
MySQL-System
18篇
private
JDK源码
5篇
ES-System
1篇
Canal-System
1篇
monitor
1篇
Hadoop-System
2篇
Spark-System
5篇
error__problem
29篇
tools
2篇
konwledge
6篇
software
1篇
project
2篇
Linux-System
2篇
webUI
4篇
最新评论
prometheus的远程存储到clickhouse里面,prometheus store clickhouse
柠檬味的鱼°:
编译有问题的,我已经把编译好的文件放在了我的主页文章,欢迎小伙伴自取!
flink cdc 的 问题
小三你妹:
第二个配置重启这是什么版本的MySQL-cdc,我看2.2.0不支持时间和特殊位置
flink cdc 的 问题
augie_ly:
楼主好,请问第三个问题您是怎么解决的?谢谢
flink cdc 的 问题
遗梦孤魂:
这个是一个整体性的问题。。你的程序出问题了,就会抛出这个异常
java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/serializers/ClosureSerializer$Closure
qq_29711413:
我也出现了这个问题,是scala和java混合在一起,使用了sedona的读取文件出现的,你这边有解决问题吗?
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
centos8 离线安装redis的艰苦历程(gcc、make、redis)
idea maven unable to find valid certification path to requested target...
No compiler is provided in this environment
2022年14篇
2021年8篇
2020年12篇
2019年38篇
2018年7篇
目录
目录
分类专栏
MySQL-System
18篇
private
JDK源码
5篇
ES-System
1篇
Canal-System
1篇
monitor
1篇
Hadoop-System
2篇
Spark-System
5篇
error__problem
29篇
tools
2篇
konwledge
6篇
software
1篇
project
2篇
Linux-System
2篇
webUI
4篇
目录
评论 1
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
遗梦孤魂
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值