基于canal的实时数据同步

适用场景

使用canal做数据备份而不用mysql自带的主从备份的场景主要为:

  1. 跨数据库的数据备份,例如mysql => oracle
  2. 数据异构,即对同一份数据做不同的分库分表查询。例如卖家和买家各自分库索引

maven

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.2</version>
</dependency>

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.lang.StringUtils;

public class SimpleCanalClient {

public static void main(String[] args) throws Exception {
String destination = "example";
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), destination, "", "");

connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();

int batchSize = 5 * 1024;

while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
synchronizedData(message.getEntries());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
}

/**
* 同步数据
* @param entries
* @throws Exception
*/
private static void synchronizedData(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
String sql = getSql(rowChange.getEventType(),tableName,rowData);
System.out.println(sql);
// TODO 执行sql语句
}
}
}

/**
* 获取增删改的sql
* @param eventType
* @param tableName
* @param rowData
* @return
*/
private static String getSql(CanalEntry.EventType eventType,String tableName,RowData rowData){
String sql = null;
switch (eventType) {
case INSERT:
sql = getInsertSql(tableName,rowData.getAfterColumnsList());
break;
case UPDATE:
sql = getUpdateSql(tableName,rowData.getAfterColumnsList());
break;
case DELETE:
sql = getDeleteSql(tableName,rowData.getBeforeColumnsList());
break;
default:
break;
}
return sql;
}

private static String getInsertSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String keys = "";
String values = "";
for(int i=0;i<columns.size();i++){
if(i != 0) {
keys += ",";
values += ",";
}
keys += columns.get(i).getName();
values += getValue(columns.get(i));
}
String format = "INSERT INTO %s (%s) VALUES (%s)";
return String.format(format,tableName,keys,values);
}

private static String getUpdateSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String sets = "";
String where = "";
for(Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
continue;
}
if(!StringUtils.isBlank(sets)) {
sets += ",";
}
sets += column.getName() + "=" + getValue(column);
}
String format = "UPDATE %s SET %s WHERE %s";
return String.format(format,tableName,sets,where);
}

private static String getDeleteSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String where = "";
for(Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
continue;
}
}
String format = "DELETE FROM %s WHERE %s";
return String.format(format,tableName,where);
}

private static String getValue(Column column){
if(column.getIsNull()){
return "null";
}
return String.format("'%s'",column.getValue());
}

}

数据一致性

单机单点消费mysql的log-bin后直接更新到备份数据库中,数据一致性没有问题。但是如果变成分布式环境以及消费mysql的log-bin后将更新数据推到MQ中由多节点消费更新到多个备份数据库中,则会出现数据更新时序和数据一致性的问题。

而以上代码在update sql中除了获取值变化了的字段,也反查数据库获取了未变化的字段。因此每次update的sql实际上是该条记录的全量数据。

通过在表中加上时间戳字段作为记录的版本号,用逻辑删除取代物理删除delete,修改以上代码的sql拼接,insert操作时忽略主键冲突、update操作时仅更新版本号(时间戳)旧的记录,可以极大避免数据不一致的现象,也解决了MQ重复消费的问题。

1
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

再通过定时任务,每天一次增量数据更新,每周一次全量数据更新,保证数据的最终一致性。

>