1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| import java.net.InetSocketAddress; import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; import org.apache.commons.lang.StringUtils;
public class SimpleCanalClient {
public static void main(String[] args) throws Exception { String destination = "example"; CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), destination, "", "");
connector.connect(); connector.subscribe(".*\\..*"); connector.rollback();
int batchSize = 5 * 1024;
while (true) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { } else { synchronizedData(message.getEntries()); }
connector.ack(batchId); } }
private static void synchronizedData(List<Entry> entries) throws Exception { for (Entry entry : entries) { if (entry.getEntryType() != EntryType.ROWDATA) { continue; }
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); String tableName = entry.getHeader().getTableName(); for (RowData rowData : rowChange.getRowDatasList()) { String sql = getSql(rowChange.getEventType(),tableName,rowData); System.out.println(sql); } } }
private static String getSql(CanalEntry.EventType eventType,String tableName,RowData rowData){ String sql = null; switch (eventType) { case INSERT: sql = getInsertSql(tableName,rowData.getAfterColumnsList()); break; case UPDATE: sql = getUpdateSql(tableName,rowData.getAfterColumnsList()); break; case DELETE: sql = getDeleteSql(tableName,rowData.getBeforeColumnsList()); break; default: break; } return sql; }
private static String getInsertSql(String tableName,List<Column> columns){ if(columns.size() == 0 || StringUtils.isBlank(tableName)){ return null; } String keys = ""; String values = ""; for(int i=0;i<columns.size();i++){ if(i != 0) { keys += ","; values += ","; } keys += columns.get(i).getName(); values += getValue(columns.get(i)); } String format = "INSERT INTO %s (%s) VALUES (%s)"; return String.format(format,tableName,keys,values); }
private static String getUpdateSql(String tableName,List<Column> columns){ if(columns.size() == 0 || StringUtils.isBlank(tableName)){ return null; } String sets = ""; String where = ""; for(Column column : columns){ if(column.getIsKey()){ where = column.getName() + "=" + getValue(column); continue; } if(!StringUtils.isBlank(sets)) { sets += ","; } sets += column.getName() + "=" + getValue(column); } String format = "UPDATE %s SET %s WHERE %s"; return String.format(format,tableName,sets,where); }
private static String getDeleteSql(String tableName,List<Column> columns){ if(columns.size() == 0 || StringUtils.isBlank(tableName)){ return null; } String where = ""; for(Column column : columns){ if(column.getIsKey()){ where = column.getName() + "=" + getValue(column); continue; } } String format = "DELETE FROM %s WHERE %s"; return String.format(format,tableName,where); }
private static String getValue(Column column){ if(column.getIsNull()){ return "null"; } return String.format("'%s'",column.getValue()); }
}
|