导图社区 Kafka监控mysql的binlog
监控mysql的binlog,实时发送到kafka、java-client监听topic,执行sql。
编辑于2022-12-28 16:08:49 广东Canal+Kafka监控mysql的binlog (window版)
一、流程图
二、实现方案
Canal监控mysql的binlog,实时发送到kafka
canal-client监听topic,执行sql
三、步骤
1.开启 Binlog 写入功能
配置my.cnf
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2.创建mysql用户帐号
CREATE USER canal IDENTIFIED BY ‘canal’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’; – GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ; FLUSH PRIVILEGES;
3.下载canal.deployer-1.1.5
https://github.com/alibaba/canal/releases
4.安装&配置
注:安装路径不能包含中文
conf\example:instance.properties文件修改
# 监控数据库配置 canal.instance.master.address=127.0.0.1:6033 #数据用户名 canal.instance.dbUsername=canal #数据库密码 canal.instance.dbPassword=canal # mq config canal.mq.topic=test #默认监控数据库
conf\:canal.properties文件修改
如果开启PLAIN安全认证添加配置- ps:同事帮助解决的。 kafka.sasl.mechanism=PLAIN kafka.security.protocol=SASL_PLAINTEXT kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xx' password='xxx';
# tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = kafka kafka.bootstrap.servers = xxx:9092
canal内置jdk配置
修改\bin\startup.bat
修改前
java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
修改后
..\java\jre1.8.0_181\bin\java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.deployer.CanalLauncher
5.启动脚本
start cmd : java -Xms128m -Xmx512m -XX:PermSize=128m -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8 -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n -DappName=otter-canal -Dlogback.configurationFile="D:\programFiles\canal.deployer-1.1.5\bin\\..\conf\logback.xml" -Dcanal.conf="D:\programFiles\canal.deployer-1.1.5\bin\\..\conf\canal.properties" -classpath "D:\programFiles\canal.deployer-1.1.5\bin\\..\conf\..\lib\*;D:\programFiles\canal.deployer-1.1.5\bin\\..\conf" java -Xms128m -Xmx512m -XX:PermSize=128m -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8 -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n -DappName=otter-canal -Dlogback.configurationFile="D:\programFiles\canal.deployer-1.1.5\bin\\..\conf\logback.xml" -Dcanal.conf="D:\programFiles\canal.deployer-1.1.5\bin\\..\conf\canal.properties" -classpath "D:\programFiles\canal.deployer-1.1.5\bin\\..\conf\..\lib\*;D:\programFiles\canal.deployer-1.1.5\bin\\..\conf" com.alibaba.otter.canal.deployer.CanalLauncher Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0 Listening for transport dt_socket at address: 9099
6.Java canal-client客户端代码编写
配置文件
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <!--mysql--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.45</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency>
application.yml
# 服务端口 server: port: 9908 # 服务名 spring: application: name: canal-client # 环境设置:dev、test、prod profiles: active: dev
application-dev.yml
spring: # mysql数据库连接 datasource: druid: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/xxxx_empty?serverTimezone=GMT%2B8&allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false username: root password: xxxxx #初始化时建立物理连接的个数 initial-size: 5 #最大连接池数量 max-active: 10 #最小连接池数量 min-idle: 5 #获取连接时最大等待时间,单位毫秒 max-wait: 60000 #超过时间限制是否回收 removeAbandoned: true #当连接超过3分钟后会强制进行回收 removeAbandonedTimeout: 180 #打开PSCache,并且指定每个连接上PSCache的大小 pool-prepared-statements: true max-pool-prepared-statement-per-connection-size: 20 #间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 max-evictable-idle-time-millis: 60000 #用来检测连接是否有效的sql 必须是一个查询语句。mysql中为 select 'x', oracle中为 select 1 from dual validation-query: select 'x' # validation-query-timeout: 5000 #申请连接时会执行validationQuery检测连接是否有效,开启会降低性能,默认为true test-on-borrow: false #归还连接时会执行validationQuery检测连接是否有效,开启会降低性能,默认为true test-on-return: false test-while-idle: true #通过connectProperties属性来打开mergeSql功能,慢SQL记录 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 #filters: #配置多个英文逗号分隔(统计,sql注入) filters: stat,wall #配置stat-view-servlet stat-view-servlet: #允许开启监控 enabled: true #监控面板路径 url-pattern: /druid/* kafka: bootstrap-servers: xxxxxx:9092 # 指定listener 容器中的线程数,用于提高并发量 listener: ack-mode: manual # 消费者的配置 consumer: # 指定默认消费者group id group-id: test # 是否开启自动提交 enable-auto-commit: false # key,value的解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #kafka加密配置 properties: sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='xxx';
代码
kafka消费者
package com.hzjt.consumer; import com.alibaba.fastjson.JSONObject; import com.hzjt.constant.CanalConstants; import com.hzjt.protocol.CanalEntry; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; /** * kafka消费者 * @author zhangxf * @create 2021/9/2 16:46 * @desc **/ @Slf4j @Component public class KafkaConsumer { @Resource(name = "dataOneTemplate") private JdbcTemplate jdbcTemplate; @KafkaListener(topics = "test") public void listen (ConsumerRecord<?, ?> record, Acknowledgment ack){ try { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); this.dataHandle(record.value()); }catch (Exception e){ log.error("Kafka监听异常"+e.getMessage(),e); }finally { ack.acknowledge();//手动提交偏移量 } } /** * 数据处理 * @param value */ private void dataHandle(Object value){ CanalEntry entry = JSONObject.parseObject(value.toString(), CanalEntry.class); if (entry != null) { if(StringUtils.isNotBlank(entry.getSql()) && CanalConstants.SqlType.getInfoByCode(entry.getType()) != null){ this.execute(entry.getSql()); } else if (CanalConstants.SqlType.INSERT.getCode().equals(entry.getType())) { saveInsertSql(entry); } else if (CanalConstants.SqlType.DELETE.getCode().equals(entry.getType())) { saveDeleteSql(entry); } else if (CanalConstants.SqlType.UPDATE.getCode().equals(entry.getType())) { saveUpdateSql(entry); } } } /** * 保存插入语句 * * @param entry */ private void saveInsertSql(CanalEntry entry) { StringBuffer sql = new StringBuffer("insert into " + entry.getTable() + " ("); for (Map.Entry<String,Object> en : entry.getData().get(0).entrySet()) { sql.append(en.getKey()); sql.append(","); } sql.deleteCharAt(sql.length() - 1); sql.append(") VALUES ("); for (Map.Entry<String,Object> en : entry.getData().get(0).entrySet()) { sql.append("'" + en.getValue() + "',"); } sql.deleteCharAt(sql.length() - 1); sql.append(")"); this.execute(sql.toString()); } /** * 保存删除语句 * * @param entry */ private void saveDeleteSql(CanalEntry entry) { StringBuffer sql = new StringBuffer("delete from " + entry.getTable() + " where "); this.publicSplicing(entry,sql); this.execute(sql.toString()); } /** * 保存更新语句 * * @param entry */ private void saveUpdateSql(CanalEntry entry) { StringBuffer sql = new StringBuffer("update " + entry.getTable() + " set "); for (Map.Entry<String,Object> en : entry.getData().get(0).entrySet()) { sql.append(" " + en.getKey() + " = '" + en.getValue() + "',"); } sql.deleteCharAt(sql.length() - 1); sql.append(" where "); this.publicSplicing(entry,sql); this.execute(sql.toString()); } /** * 共用拼接 * @param entry * @param sql */ private void publicSplicing(CanalEntry entry, StringBuffer sql){ for (Map.Entry<String,Object> en : entry.getData().get(0).entrySet()) { if (entry.getPkNames().get(0).equals(en.getKey())) { //暂时只支持单一主键 sql.append(en.getKey() + "=" + en.getValue()); break; } } } /** * 入库 * @param sql */ public void execute(String sql) { log.info(sql); jdbcTemplate.execute(sql); } }
CanalEntry
package com.hzjt.protocol; import lombok.Data; import java.util.List; import java.util.Map; /** * @author zhangxf * @create 2021/9/2 16:47 * @desc **/ @Data public class CanalEntry { private List<Map<String, Object>> data; private String database; private Long es; private Integer id; private Boolean isDdl; private Map<String, String> mysqlType; private Map<String, Object> old; private List<String> pkNames; private String sql; private Map<String, Object> sqlType; private String table; private Long ts; private String type; }
配置 Druid
package com.hzjt.config; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.support.http.StatViewServlet; import com.alibaba.druid.support.http.WebStatFilter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.boot.web.servlet.ServletRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.jdbc.core.JdbcTemplate; @Slf4j @Configuration public class DruidConfig { @Value("${spring.datasource.druid.url}") private String dbUrl; @Value("${spring.datasource.druid.username}") private String username; @Value("${spring.datasource.druid.password}") private String password; @Value("${spring.datasource.druid.driver-class-name}") private String driverClassName; @Value("${spring.datasource.druid.initial-size}") private int initialSize; @Value("${spring.datasource.druid.max-active}") private int maxActive; @Value("${spring.datasource.druid.min-idle}") private int minIdle; @Value("${spring.datasource.druid.max-wait}") private int maxWait; @Value("${spring.datasource.druid.pool-prepared-statements}") private boolean poolPreparedStatements; @Value("${spring.datasource.druid.max-pool-prepared-statement-per-connection-size}") private int maxPoolPreparedStatementPerConnectionSize; @Value("${spring.datasource.druid.time-between-eviction-runs-millis}") private int timeBetweenEvictionRunsMillis; @Value("${spring.datasource.druid.min-evictable-idle-time-millis}") private int minEvictableIdleTimeMillis; @Value("${spring.datasource.druid.max-evictable-idle-time-millis}") private int maxEvictableIdleTimeMillis; @Value("${spring.datasource.druid.validation-query}") private String validationQuery; @Value("${spring.datasource.druid.test-while-idle}") private boolean testWhileIdle; @Value("${spring.datasource.druid.test-on-borrow}") private boolean testOnBorrow; @Value("${spring.datasource.druid.test-on-return}") private boolean testOnReturn; @Value("${spring.datasource.druid.filters}") private String filters; @Value("{spring.datasource.druid.connection-properties}") private String connectionProperties; /** * Druid连接池配置 * @return */ //声明其为Bean实例 @Bean public DruidDataSource dataSource() { DruidDataSource datasource = new DruidDataSource(); datasource.setUrl(dbUrl); datasource.setUsername(username); datasource.setPassword(password); datasource.setDriverClassName(driverClassName); datasource.setInitialSize(initialSize); datasource.setMinIdle(minIdle); datasource.setMaxActive(maxActive); datasource.setMaxWait(maxWait); datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); datasource.setMaxEvictableIdleTimeMillis(minEvictableIdleTimeMillis); datasource.setValidationQuery(validationQuery); datasource.setTestWhileIdle(testWhileIdle); datasource.setTestOnBorrow(testOnBorrow); datasource.setTestOnReturn(testOnReturn); datasource.setPoolPreparedStatements(poolPreparedStatements); datasource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize); try { datasource.setFilters(filters); } catch (Exception e) { log.error("druid configuration initialization filter", e); } datasource.setConnectionProperties(connectionProperties); return datasource; } /** * JDBC操作配置 * @param dataSource * @return */ @Bean(name = "dataOneTemplate") public JdbcTemplate jdbcTemplate (@Autowired DruidDataSource dataSource){ return new JdbcTemplate(dataSource) ; } /** * 配置 Druid 监控界面 */ @Bean public ServletRegistrationBean statViewServlet(){ ServletRegistrationBean srb = new ServletRegistrationBean(new StatViewServlet(),"/druid/*"); //设置控制台管理用户 srb.addInitParameter("loginUsername","root"); srb.addInitParameter("loginPassword","root"); //是否可以重置数据。禁用HTML页面上的“Reset All”功能 srb.addInitParameter("resetEnable","false"); return srb; } @Bean public FilterRegistrationBean statFilter(){ //创建过滤器 FilterRegistrationBean frb = new FilterRegistrationBean(new WebStatFilter()); //设置过滤器过滤路径 frb.addUrlPatterns("/*"); //忽略过滤的形式 frb.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"); return frb; } }
CanalConstants
package com.hzjt.constant; /** * @Description //TODO 枚举类 * @Author zhangxifeng * @Date 14:10 2020-7-22 * @Version: 1.0 **/ public class CanalConstants { /** * sql操作类型 */ public enum SqlType { INSERT ("INSERT", "新增"), DELETE ("DELETE", "删除"), UPDATE ("UPDATE", "更新"); private String code; private String info; SqlType(String code, String info) { this.code = code; this.info = info; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } public static String getInfoByCode(String code){ SqlType[] values = SqlType.values (); for (SqlType value : values) { String c = value.getCode (); if (c.equals (code)){ return value.getInfo (); } } return null; } } }