|
@@ -11,20 +11,25 @@ package com.xzl.web.mysql;
|
|
|
|
|
|
import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
|
import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
|
import com.github.shyiko.mysql.binlog.event.*;
|
|
import com.github.shyiko.mysql.binlog.event.*;
|
|
|
|
+import com.xzl.system.domain.SysDataLog;
|
|
|
|
+import com.xzl.system.service.ISysDataLogService;
|
|
import org.apache.commons.collections.MapUtils;
|
|
import org.apache.commons.collections.MapUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.boot.ApplicationArguments;
|
|
import org.springframework.boot.ApplicationArguments;
|
|
import org.springframework.boot.ApplicationRunner;
|
|
import org.springframework.boot.ApplicationRunner;
|
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
//此类可以监控MySQL库数据的增删改
|
|
//此类可以监控MySQL库数据的增删改
|
|
-//@Component
|
|
|
|
|
|
+@Component
|
|
//在SpringBoot中,提供了一个接口:ApplicationRunner。
|
|
//在SpringBoot中,提供了一个接口:ApplicationRunner。
|
|
//该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
|
|
//该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
|
|
public class MysqlBinLogClient implements ApplicationRunner {
|
|
public class MysqlBinLogClient implements ApplicationRunner {
|
|
@@ -38,6 +43,16 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
@Value("${spring.datasource.druid.master.password}")
|
|
@Value("${spring.datasource.druid.master.password}")
|
|
String password;
|
|
String password;
|
|
|
|
|
|
|
|
+ @Value("#{'${mysql.binlog.tables}'.split(',')}")
|
|
|
|
+ List<String> tables;
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ ISysDataLogService sysDataLogService;
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ RedisTemplate redisTemplate;
|
|
|
|
+
|
|
|
|
+ String redisKeyMysqlTable = "mysql:table:";
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void run(ApplicationArguments args) throws Exception {
|
|
public void run(ApplicationArguments args) throws Exception {
|
|
@@ -64,39 +79,56 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
|
|
|
|
client.setServerId(100); //和自己之前设置的server-id保持一致,但是我不知道为什么不一致也能成功
|
|
client.setServerId(100); //和自己之前设置的server-id保持一致,但是我不知道为什么不一致也能成功
|
|
|
|
|
|
-//下面直接照抄就行
|
|
|
|
client.registerEventListener(event -> {
|
|
client.registerEventListener(event -> {
|
|
EventData data = event.getData();
|
|
EventData data = event.getData();
|
|
|
|
+ if (data instanceof TableMapEventData) {
|
|
|
|
+ TableMapEventData tableMapEventData = (TableMapEventData) data;
|
|
|
|
+ String tableName = tableMapEventData.getDatabase() + "." + tableMapEventData.getTable();
|
|
|
|
+ long tableId = tableMapEventData.getTableId();
|
|
|
|
+ // 存放到redis中(没有就增加)
|
|
|
|
+ redisTemplate.opsForValue().setIfAbsent(redisKeyMysqlTable + tableId, tableName);
|
|
|
|
+ }
|
|
|
|
+ if (data instanceof UpdateRowsEventData) { //表数据发生修改时触发
|
|
|
|
+ UpdateRowsEventData eventData = (UpdateRowsEventData) data;
|
|
|
|
+ saveData(eventData.getTableId(), data);
|
|
|
|
+ } else if (data instanceof WriteRowsEventData) { //表数据发生插入时触发
|
|
|
|
+ WriteRowsEventData eventData = (WriteRowsEventData) data;
|
|
|
|
+ saveData(eventData.getTableId(), data);
|
|
|
|
+ } else if (data instanceof DeleteRowsEventData) {//表数据发生删除后触发
|
|
|
|
+ DeleteRowsEventData eventData = (DeleteRowsEventData) data;
|
|
|
|
+ saveData(eventData.getTableId(), data);
|
|
|
|
+ }
|
|
|
|
+ /*
|
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
if (data instanceof TableMapEventData) {
|
|
if (data instanceof TableMapEventData) {
|
|
//只要连接的MySQL发生的增删改的操作,则都会进入这里,无论哪个数据库
|
|
//只要连接的MySQL发生的增删改的操作,则都会进入这里,无论哪个数据库
|
|
|
|
|
|
TableMapEventData tableMapEventData = (TableMapEventData) data;
|
|
TableMapEventData tableMapEventData = (TableMapEventData) data;
|
|
|
|
|
|
//可以通过转成TableMapEventData类实例的tableMapEventData来获取当前发生变更的数据库
|
|
//可以通过转成TableMapEventData类实例的tableMapEventData来获取当前发生变更的数据库
|
|
- System.err.println("发生变更的数据库:" + tableMapEventData.getDatabase());
|
|
|
|
|
|
+ System.err.println("[" + threadName + "]" + "发生变更的数据库:" + tableMapEventData.getDatabase());
|
|
|
|
|
|
- System.err.print("TableID:");
|
|
|
|
|
|
+ System.err.print("[" + threadName + "]" + "TableID:");
|
|
//表ID
|
|
//表ID
|
|
System.err.println(tableMapEventData.getTableId());
|
|
System.err.println(tableMapEventData.getTableId());
|
|
- System.err.print("TableName:");
|
|
|
|
|
|
+ System.err.print("[" + threadName + "]" + "TableName:");
|
|
//表名字
|
|
//表名字
|
|
System.err.println(tableMapEventData.getTable());
|
|
System.err.println(tableMapEventData.getTable());
|
|
}
|
|
}
|
|
//表数据发生修改时触发
|
|
//表数据发生修改时触发
|
|
if (data instanceof UpdateRowsEventData) {
|
|
if (data instanceof UpdateRowsEventData) {
|
|
- System.err.println("Update:");
|
|
|
|
|
|
+ System.err.print("[" + threadName + "]" + "Update:");
|
|
System.err.println(data.toString());
|
|
System.err.println(data.toString());
|
|
//表数据发生插入时触发
|
|
//表数据发生插入时触发
|
|
} else if (data instanceof WriteRowsEventData) {
|
|
} else if (data instanceof WriteRowsEventData) {
|
|
- System.err.println("Insert:");
|
|
|
|
|
|
+ System.err.print("[" + threadName + "]" + "Insert:");
|
|
System.err.println(data.toString());
|
|
System.err.println(data.toString());
|
|
//表数据发生删除后触发
|
|
//表数据发生删除后触发
|
|
} else if (data instanceof DeleteRowsEventData) {
|
|
} else if (data instanceof DeleteRowsEventData) {
|
|
- System.err.println("Delete:");
|
|
|
|
|
|
+ System.err.print("[" + threadName + "]" + "Delete:");
|
|
System.err.println(data.toString());
|
|
System.err.println(data.toString());
|
|
- }
|
|
|
|
|
|
+ }*/
|
|
});
|
|
});
|
|
-
|
|
|
|
try {
|
|
try {
|
|
client.connect();
|
|
client.connect();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -116,4 +148,33 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
rs.put("port", port);
|
|
rs.put("port", port);
|
|
return rs;
|
|
return rs;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void saveData(long tableId, EventData data) {
|
|
|
|
+ String tableName = (String) redisTemplate.opsForValue().get(redisKeyMysqlTable + tableId);
|
|
|
|
+ if (tableName == null) {
|
|
|
|
+ logger.info("没取到表名 ...");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (tables.contains(tableName)) {
|
|
|
|
+ String[] db = tableName.split("\\.");
|
|
|
|
+ SysDataLog sysDataLog = new SysDataLog();
|
|
|
|
+ sysDataLog.setDatabaseName(db[0]);
|
|
|
|
+ sysDataLog.setTableName(db[1]);
|
|
|
|
+ if (data instanceof WriteRowsEventData) { //表数据发生插入时触发
|
|
|
|
+ WriteRowsEventData eventData = (WriteRowsEventData) data;
|
|
|
|
+ sysDataLog.setUpdateCount(Long.valueOf(eventData.getRows().size()));
|
|
|
|
+ sysDataLog.setActionType(0);
|
|
|
|
+ } else if (data instanceof UpdateRowsEventData) { //表数据发生修改时触发
|
|
|
|
+ UpdateRowsEventData eventData = (UpdateRowsEventData) data;
|
|
|
|
+ sysDataLog.setUpdateCount(Long.valueOf(eventData.getRows().size()));
|
|
|
|
+ sysDataLog.setActionType(1);
|
|
|
|
+ } else if (data instanceof DeleteRowsEventData) {//表数据发生删除后触发
|
|
|
|
+ DeleteRowsEventData eventData = (DeleteRowsEventData) data;
|
|
|
|
+ sysDataLog.setUpdateCount(Long.valueOf(eventData.getRows().size()));
|
|
|
|
+ sysDataLog.setActionType(2);
|
|
|
|
+ }
|
|
|
|
+ sysDataLog.setDataContent(data.toString());
|
|
|
|
+ sysDataLogService.insertDataLog(sysDataLog);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|