Jelajahi Sumber

数据治理,监听多数据源binlog

zhangshuling 1 tahun lalu
induk
melakukan
1340f1f43c

+ 1 - 0
xzl-admin/src/main/java/com/xzl/web/mapper/DataGovernanceMapper.java

@@ -42,4 +42,5 @@ public interface DataGovernanceMapper {
 
     DatabaseConfig getDatabaseConfig(String id);
 
+    List<DatabaseConfig> getAllDataBaseConfigs();
 }

+ 8 - 5
xzl-admin/src/main/java/com/xzl/web/model/dataGovernance/entity/DatabaseConfig.java

@@ -4,9 +4,12 @@ import lombok.Data;
 
 @Data
 public class DatabaseConfig {
-    private String connectionName;
-    private String databaseType;
-    private String url;
-    private String username;
-    private String password;
+  private String id;
+  private String connectionName;
+  private String databaseType;
+  private String url;
+  private String username;
+  private String password;
+
+  private String tables;
 }

+ 54 - 64
xzl-admin/src/main/java/com/xzl/web/mysql/MysqlBinLogClient.java

@@ -10,20 +10,26 @@ package com.xzl.web.mysql;
  */
 
 import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.*;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
 import com.xzl.system.domain.SysDataLog;
 import com.xzl.system.service.ISysDataLogService;
+import com.xzl.web.mapper.DataGovernanceMapper;
+import com.xzl.web.model.dataGovernance.entity.DatabaseConfig;
 import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,47 +42,57 @@ public class MysqlBinLogClient implements ApplicationRunner {
 
   private static final Logger logger = LoggerFactory.getLogger(MysqlBinLogClient.class);
 
-  @Value("${spring.datasource.druid.master.url}")
-  String url;
-  @Value("${spring.datasource.druid.master.username}")
-  String username;
-  @Value("${spring.datasource.druid.master.password}")
-  String password;
-
-  @Value("#{'${mysql.binlog.tables}'.split(',')}")
-  List<String> tables;
 
   @Autowired
   ISysDataLogService sysDataLogService;
 
+  @Autowired
+  DataGovernanceMapper dataGovernanceMapper;
+
   @Autowired
   RedisTemplate redisTemplate;
 
-  String redisKeyMysqlTable = "mysql:table:";
+  String redisKeyMysqlTable = "mysql-binlog:%s:table:";
+
+  List<String> tables = new ArrayList<>();
 
   @Override
   public void run(ApplicationArguments args) throws Exception {
-    //项目启动完成连接bin-log
-    new Thread(() -> {
-      connectMysqlBinLog();
-    }).start();
+    List<DatabaseConfig> databaseConfigList = dataGovernanceMapper.getAllDataBaseConfigs();
+    if (databaseConfigList == null || databaseConfigList.size() == 0) {
+      logger.info("未获取数据源配置 ...");
+      return;
+    }
 
+    for (DatabaseConfig databaseConfig : databaseConfigList) {
+      new Thread(() -> {
+        connectMysqlBinLog(databaseConfig);
+        logger.info("{}监控BinLog服务已启动", databaseConfig.getConnectionName());
+      }).start();
+    }
   }
 
   /**
    * 连接mysqlBinLog
    */
-  public void connectMysqlBinLog() {
-    logger.info("监控BinLog服务已启动");
-    //自己MySQL的信息。host,port,username,password
-    Map dbMap = parseDbUrl(url);
-    BinaryLogClient client = new BinaryLogClient(MapUtils.getString(dbMap, "host"), MapUtils.getIntValue(dbMap, "port"), username, password);
-    /**因为binlog不是以数据库为单位划分的,所以监控binglog不是监控的单个的数据库,而是整个当前所设置连接的MySQL,
-     *其中任何一个库发生数据增删改,这里都能检测到,
-     *所以不用设置所监控的数据库的名字(我也不知道怎么设置,没发现有包含这个形参的构造函数)
-     *如果需要只监控指定的数据库,可以看后面代码,可以获取到当前发生变更的数据库名称。可以根据名称来决定是否监控
-     **/
+  public void connectMysqlBinLog(DatabaseConfig databaseConfig) {
+    //MySQL的信息。host,port, db,username,password
+    Map dbMap = parseDbUrl(databaseConfig.getUrl());
+    String dbName = MapUtils.getString(dbMap, "db");
+    String dbTables = databaseConfig.getTables();
+    if (dbTables == null || dbTables.length() == 0) {
+      logger.info("未配置监听表, 忽略 {}", databaseConfig.getConnectionName());
+      return;
+    }
+    String[] tableNames = dbTables.split(",");
+    for (String tableName : tableNames) {
+      String tn = databaseConfig.getId() + "." + dbName + "." + tableName;
+      tables.add(tn);
+      logger.info("监听 {} ", tn);
+    }
+
 
+    BinaryLogClient client = new BinaryLogClient(MapUtils.getString(dbMap, "host"), MapUtils.getIntValue(dbMap, "port"), databaseConfig.getUsername(), databaseConfig.getPassword());
     client.setServerId(100); //和自己之前设置的server-id保持一致,但是我不知道为什么不一致也能成功
 
     client.registerEventListener(event -> {
@@ -86,48 +102,18 @@ public class MysqlBinLogClient implements ApplicationRunner {
         String tableName = tableMapEventData.getDatabase() + "." + tableMapEventData.getTable();
         long tableId = tableMapEventData.getTableId();
         // 存放到redis中(没有就增加)
-        redisTemplate.opsForValue().setIfAbsent(redisKeyMysqlTable + tableId, tableName);
+        redisTemplate.opsForValue().setIfAbsent((String.format(redisKeyMysqlTable, databaseConfig.getId()) + tableId), (databaseConfig.getId() + "." + tableName));
       }
       if (data instanceof UpdateRowsEventData) { //表数据发生修改时触发
         UpdateRowsEventData eventData = (UpdateRowsEventData) data;
-        saveData(eventData.getTableId(), data);
+        saveData(eventData.getTableId(), databaseConfig.getId(), data);
       } else if (data instanceof WriteRowsEventData) { //表数据发生插入时触发
         WriteRowsEventData eventData = (WriteRowsEventData) data;
-        saveData(eventData.getTableId(), data);
+        saveData(eventData.getTableId(), databaseConfig.getId(), data);
       } else if (data instanceof DeleteRowsEventData) {//表数据发生删除后触发
         DeleteRowsEventData eventData = (DeleteRowsEventData) data;
-        saveData(eventData.getTableId(), data);
+        saveData(eventData.getTableId(), databaseConfig.getId(), data);
       }
-      /*
-      String threadName = Thread.currentThread().getName();
-      if (data instanceof TableMapEventData) {
-        //只要连接的MySQL发生的增删改的操作,则都会进入这里,无论哪个数据库
-
-        TableMapEventData tableMapEventData = (TableMapEventData) data;
-
-        //可以通过转成TableMapEventData类实例的tableMapEventData来获取当前发生变更的数据库
-        System.err.println("[" + threadName + "]" + "发生变更的数据库:" + tableMapEventData.getDatabase());
-
-        System.err.print("[" + threadName + "]" + "TableID:");
-        //表ID
-        System.err.println(tableMapEventData.getTableId());
-        System.err.print("[" + threadName + "]" + "TableName:");
-        //表名字
-        System.err.println(tableMapEventData.getTable());
-      }
-      //表数据发生修改时触发
-      if (data instanceof UpdateRowsEventData) {
-        System.err.print("[" + threadName + "]" + "Update:");
-        System.err.println(data.toString());
-        //表数据发生插入时触发
-      } else if (data instanceof WriteRowsEventData) {
-        System.err.print("[" + threadName + "]" + "Insert:");
-        System.err.println(data.toString());
-        //表数据发生删除后触发
-      } else if (data instanceof DeleteRowsEventData) {
-        System.err.print("[" + threadName + "]" + "Delete:");
-        System.err.println(data.toString());
-      }*/
     });
     try {
       client.connect();
@@ -146,11 +132,14 @@ public class MysqlBinLogClient implements ApplicationRunner {
     Map rs = new HashMap();
     rs.put("host", host);
     rs.put("port", port);
+
+    String db[] = portSplit[1].split("\\?");
+    rs.put("db", db[0]);
     return rs;
   }
 
-  private void saveData(long tableId, EventData data) {
-    String tableName = (String) redisTemplate.opsForValue().get(redisKeyMysqlTable + tableId);
+  private void saveData(long tableId, String dbCfgId, EventData data) {
+    String tableName = (String) redisTemplate.opsForValue().get(String.format(redisKeyMysqlTable, dbCfgId) + tableId);
     if (tableName == null) {
       logger.info("没取到表名 ...");
       return;
@@ -158,8 +147,9 @@ public class MysqlBinLogClient implements ApplicationRunner {
     if (tables.contains(tableName)) {
       String[] db = tableName.split("\\.");
       SysDataLog sysDataLog = new SysDataLog();
-      sysDataLog.setDatabaseName(db[0]);
-      sysDataLog.setTableName(db[1]);
+      sysDataLog.setDbCfgId(dbCfgId);
+      sysDataLog.setDatabaseName(db[1]);
+      sysDataLog.setTableName(db[2]);
       if (data instanceof WriteRowsEventData) { //表数据发生插入时触发
         WriteRowsEventData eventData = (WriteRowsEventData) data;
         sysDataLog.setUpdateCount(Long.valueOf(eventData.getRows().size()));

+ 6 - 0
xzl-admin/src/main/resources/mapper/DataGovernanceMapper.xml

@@ -132,4 +132,10 @@
         INSERT INTO ${targetDatabaseName}.${targetTableName}
         SELECT * FROM ${resourceDatabaseName}.${resourceTableName}
     </insert>
+
+    <select id="getAllDataBaseConfigs" resultType="com.xzl.web.model.dataGovernance.entity.DatabaseConfig">
+         select id , connection_name as 'connectionName' , database_type as 'databaseType',
+         url, username, `password`, tables
+         from database_config
+    </select>
 </mapper>

+ 18 - 0
xzl-system/src/main/java/com/xzl/system/domain/SysDataLog.java

@@ -21,6 +21,8 @@ public class SysDataLog extends BaseEntity {
   private Long logId;
 
 
+  private String dbCfgId;
+  private String userName;
   private String databaseName;
   private String tableName;
   private Integer actionType;
@@ -35,6 +37,22 @@ public class SysDataLog extends BaseEntity {
     this.logId = logId;
   }
 
+  public String getDbCfgId() {
+    return dbCfgId;
+  }
+
+  public void setDbCfgId(String dbCfgId) {
+    this.dbCfgId = dbCfgId;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public void setUserName(String userName) {
+    this.userName = userName;
+  }
+
   public String getDatabaseName() {
     return databaseName;
   }

+ 5 - 3
xzl-system/src/main/resources/mapper/system/SysDataLogMapper.xml

@@ -5,6 +5,8 @@
 <mapper namespace="com.xzl.system.mapper.SysDataLogMapper">
     <resultMap type="SysDataLog" id="SysDataLogResult">
         <id property="logId" column="log_id"/>
+        <result property="dbCfgId" column="db_cfg_id"/>
+        <result property="userName" column="user_name"/>
         <result property="databaseName" column="database_name"/>
         <result property="tableName" column="table_name"/>
         <result property="actionType" column="action_type"/>
@@ -17,13 +19,13 @@
     </resultMap>
 
     <sql id="selectDataLogVo">
-        select log_id, database_name, table_name, action_type, update_count, data_content,create_by, create_time, update_by, update_time
+        select log_id, db_cfg_id, user_name, database_name, table_name, action_type, update_count, data_content,create_by, create_time, update_by, update_time
         from sys_data_log
     </sql>
 
     <insert id="insertDataLog" parameterType="SysDataLog">
-		insert into sys_data_log(database_name, table_name, action_type, update_count, data_content, create_time)
-        values (#{databaseName}, #{tableName}, #{actionType}, #{updateCount}, #{dataContent}, sysdate())
+		insert into sys_data_log(db_cfg_id, user_name, database_name, table_name, action_type, update_count, data_content, create_time)
+        values (#{dbCfgId}, #{userName}, #{databaseName}, #{tableName}, #{actionType}, #{updateCount}, #{dataContent}, sysdate())
 	</insert>