千家信息网

Maxwell读取MySQL binlog日志到Kafka

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,启动MySQL创建maxwell的数据库和用户在MySQL中创建一个测试数据库和表前面三个步骤详见 Maxwell读取MySQL binlog日志通过stdout展示启动Zookeeper[hadoo
千家信息网最后更新 2025年02月01日Maxwell读取MySQL binlog日志到Kafka

启动MySQL

创建maxwell的数据库和用户

在MySQL中创建一个测试数据库和表

前面三个步骤详见 Maxwell读取MySQL binlog日志通过stdout展示

启动Zookeeper

[hadoop@hadoop001 ~]$ cd $ZK_HOME/bin[hadoop@hadoop001 bin]$ ./zkServer.sh start

启动kafka,并创建主题为maxwell的topic

[hadoop@hadoop001 bin]$ cd $KAFKA_HOME//查看kafka版本,防止maxwell不支持[hadoop@hadoop001 kafka]$ find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'kafka_2.11-0.10.0.1-sources.jar//启动kafka-server服务[hadoop@hadoop001 kafka]$ nohup bin/kafka-server-start.sh config/server.properties &[hadoop@hadoop001 kafka]$ jps13460 QuorumPeerMain14952 Jps13518 Kafka[hadoop@hadoop001 kafka]$ bin/kafka-topics.sh --create --zookeeper 192.168.137.2:2181/kafka --replication-factor 1 --partitions 3 --topic maxwellCreated topic "maxwell".[hadoop@hadoop001 kafka]$ bin/kafka-topics.sh --list --zookeeper 192.168.137.2:2181/kafka__consumer_offsetsmaxwell

启动kafaka的消费者,检查数据是否到位

[hadoop@hadoop001 kafka]$ bin/kafka-console-consumer.sh --zookeeper 192.168.137.2:2181/kafka --topic maxwell --from-beginning

启动maxwell进程

//先检查maxwell是否支持kafka-0.10.0.1[root@hadoop000 ~]# cd /root/app/maxwell-1.17.1/lib/kafka-clients[root@hadoop001 kafka-clients]# lltotal 5556-rw-r--r-- 1 yarn games  746207 Jul  3  2018 kafka-clients-0.10.0.1.jar-rw-r--r-- 1 yarn games  951041 Jul  3  2018 kafka-clients-0.10.2.1.jar-rw-r--r-- 1 yarn games 1419544 Jul  3  2018 kafka-clients-0.11.0.1.jar-rw-r--r-- 1 yarn games  324016 Jul  3  2018 kafka-clients-0.8.2.2.jar-rw-r--r-- 1 yarn games  641408 Jul  3  2018 kafka-clients-0.9.0.1.jar-rw-r--r-- 1 yarn games 1591338 Jul  3  2018 kafka-clients-1.0.0.jar//发现支持kafka-0.10.0.1版本,假如没有生产上正在用的kafka版本的jar包,可以直接把这个版本的client jar包copy进来//启动maxwell[root@hadoop001 maxwell-1.17.1]# bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka_version=0.10.0.1 --kafka.bootstrap.servers=192.168.137.2:9092 --kafka_topic=maxwellUsing kafka version: 0.10.0.110:16:52,979 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.10:16:53,451 INFO  ProducerConfig - ProducerConfig values:         metric.reporters = []        metadata.max.age.ms = 300000        reconnect.backoff.ms = 50        sasl.kerberos.ticket.renew.window.factor = 0.8        bootstrap.servers = [192.168.137.2:9092]        ssl.keystore.type = JKS        sasl.mechanism = GSSAPI        max.block.ms = 60000        interceptor.classes = null        ssl.truststore.password = null        client.id =         ssl.endpoint.identification.algorithm = null        request.timeout.ms = 30000        acks = 1        receive.buffer.bytes = 32768        ssl.truststore.type = JKS        retries = 0        ssl.truststore.location = null        ssl.keystore.password = null        send.buffer.bytes = 131072        compression.type = none        metadata.fetch.timeout.ms = 60000        retry.backoff.ms = 100        sasl.kerberos.kinit.cmd = /usr/bin/kinit        buffer.memory = 33554432        timeout.ms = 30000        key.serializer = class org.apache.kafka.common.serialization.StringSerializer        sasl.kerberos.service.name = null        sasl.kerberos.ticket.renew.jitter = 0.05        ssl.trustmanager.algorithm = PKIX        block.on.buffer.full = false        ssl.key.password = null        sasl.kerberos.min.time.before.relogin = 60000        connections.max.idle.ms = 540000        max.in.flight.requests.per.connection = 5        metrics.num.samples = 2        ssl.protocol = TLS        ssl.provider = null        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]        batch.size = 16384        ssl.keystore.location = null        ssl.cipher.suites = null        security.protocol = PLAINTEXT        max.request.size = 1048576        value.serializer = class org.apache.kafka.common.serialization.StringSerializer        ssl.keymanager.algorithm = SunX509        metrics.sample.window.ms = 30000        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner        linger.ms = 010:16:53,512 INFO  ProducerConfig - ProducerConfig values:         metric.reporters = []        metadata.max.age.ms = 300000        reconnect.backoff.ms = 50        sasl.kerberos.ticket.renew.window.factor = 0.8        bootstrap.servers = [192.168.137.2:9092]        ssl.keystore.type = JKS        sasl.mechanism = GSSAPI        max.block.ms = 60000        interceptor.classes = null        ssl.truststore.password = null        client.id = producer-1        ssl.endpoint.identification.algorithm = null        request.timeout.ms = 30000        acks = 1        receive.buffer.bytes = 32768        ssl.truststore.type = JKS        retries = 0        ssl.truststore.location = null        ssl.keystore.password = null        send.buffer.bytes = 131072        compression.type = none        metadata.fetch.timeout.ms = 60000        retry.backoff.ms = 100        sasl.kerberos.kinit.cmd = /usr/bin/kinit        buffer.memory = 33554432        timeout.ms = 30000        key.serializer = class org.apache.kafka.common.serialization.StringSerializer        sasl.kerberos.service.name = null        sasl.kerberos.ticket.renew.jitter = 0.05        ssl.trustmanager.algorithm = PKIX        block.on.buffer.full = false        ssl.key.password = null        sasl.kerberos.min.time.before.relogin = 60000        connections.max.idle.ms = 540000        max.in.flight.requests.per.connection = 5        metrics.num.samples = 2        ssl.protocol = TLS        ssl.provider = null        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]        batch.size = 16384        ssl.keystore.location = null        ssl.cipher.suites = null        security.protocol = PLAINTEXT        max.request.size = 1048576        value.serializer = class org.apache.kafka.common.serialization.StringSerializer        ssl.keymanager.algorithm = SunX509        metrics.sample.window.ms = 30000        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner        linger.ms = 010:16:53,516 INFO  AppInfoParser - Kafka version : 0.10.0.110:16:53,516 INFO  AppInfoParser - Kafka commitId : a7a17cdec9eaa6c510:16:53,550 INFO  Maxwell - Maxwell v1.17.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-bin.000016:116360], lastHeartbeat=1552092988288]10:16:53,730 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000014:5999], lastHeartbeat=0])10:16:53,846 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000016:11636010:16:53,951 INFO  BinaryLogClient - Connected to 127.0.0.1:3306 at mysql-bin.000016/116360 (sid:6379, cid:4)10:16:53,951 INFO  BinlogConnectorLifecycleListener - Binlog connected.

在MySQL中更新一条数据

mysql> update emp set sal=502 where empno=6001;mysql> update emp set sal=603 where empno=6001;mysql> create table emp1 select * from emp;

查看kafka的消费者

//对应第一条insert语句{"database":"hlwtest","table":"emp","type":"update","ts":1552097863,"xid":89,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":502.00,"comm":6000.00,"deptno":40},"old":{"sal":501.00}}//对应第二条insert语句{"database":"hlwtest","table":"emp","type":"update","ts":1552097951,"xid":123,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":603.00,"comm":6000.00,"deptno":40},"old":{"sal":502.00}}//对应建表,相当于在新表emp1里插入了emp表里的所有数据{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":0,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":603.00,"comm":6000.00,"deptno":40}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":1,"data":{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":"1980-12-17 00:00:00","sal":800.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":2,"data":{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-20 00:00:00","sal":1600.00,"comm":300.00,"deptno":30}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":3,"data":{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-22 00:00:00","sal":1250.00,"comm":500.00,"deptno":30}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":4,"data":{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":"1981-04-02 00:00:00","sal":2975.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":5,"data":{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-28 00:00:00","sal":1250.00,"comm":1400.00,"deptno":30}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":6,"data":{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":"1981-05-01 00:00:00","sal":2850.00,"comm":0.00,"deptno":30}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":7,"data":{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":"1981-06-09 00:00:00","sal":2450.00,"comm":0.00,"deptno":10}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":8,"data":{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":"1987-04-19 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":9,"data":{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":0,"hiredate":"1981-11-17 00:00:00","sal":5000.00,"comm":0.00,"deptno":10}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":10,"data":{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-08 00:00:00","sal":1500.00,"comm":0.00,"deptno":30}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":11,"data":{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":"1987-05-23 00:00:00","sal":1100.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":12,"data":{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":"1981-12-03 00:00:00","sal":950.00,"comm":0.00,"deptno":30}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":13,"data":{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":"1981-12-03 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"xoffset":14,"data":{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":"1982-01-23 00:00:00","sal":1300.00,"comm":0.00,"deptno":10}}{"database":"hlwtest","table":"emp1","type":"insert","ts":1552098958,"xid":435,"commit":true,"data":{"empno":8888,"ename":"HIVE","job":"PROGRAM","mgr":7839,"hiredate":"1988-01-23 00:00:00","sal":10300.00,"comm":0.00,"deptno":null}}数据已经正常同步到kafka中

Maxwell的过滤功能

参考过滤配置:

[root@hadoop001 maxwell-1.17.1]# bin/maxwell --user='maxwell' --password='maxwell' \--host='127.0.0.1' --filter 'exclude: *.*, include:hlwtest.emp1' \--producer=kafka --kafka_version=0.10.0.1 --kafka.bootstrap.servers=192.168.137.2:9092 --kafka_topic=maxwell--filter 'exclude: *.*, include:hlwtest.emp1'的意思是只监控hlwtest.emp1表的变化,其他的都不监控//MySQL中update数据mysql> update emp set sal=730 where empno=6001;mysql> update emp1 set sal=330 where empno=6001;mysql> update emp set sal=730 where empno=6001;mysql> update emp1 set sal=331 where empno=6001;//Kafka消费者接收到的数据{"database":"hlwtest","table":"emp1","type":"update","ts":1552099858,"xid":916,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":330.00,"comm":6000.00,"deptno":40},"old":{"sal":321.00}}{"database":"hlwtest","table":"emp1","type":"update","ts":1552099858,"xid":922,"commit":true,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":331.00,"comm":6000.00,"deptno":40},"old":{"sal":330.00}}确实只消费到了emp1表的update语句,而没有接收到emp表的更新

Maxwell 的bootstrap

mysql> insert into maxwell.bootstrap (database_name, table_name) values ("hlwtest", "emp");mysql> select * from maxwell.bootstrap;+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+------------------+-----------------+-----------+| id | database_name | table_name | where_clause | is_complete | inserted_rows | total_rows | created_at | started_at          | completed_at        | binlog_file      | binlog_position | client_id |+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+------------------+-----------------+-----------+|  1 | hlwtest       | emp        | NULL         |           1 |            16 |          0 | NULL       | 2019-03-09 11:33:11 | 2019-03-09 11:33:11 | mysql-bin.000018 |          225498 | maxwell   |+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+------------------+-----------------+-----------+

kafka的消费窗口

{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552102248,"xid":1555,"commit":true,"data":{"id":1,"database_name":"hlwtest","table_name":"emp","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}{"database":"hlwtest","table":"emp","type":"bootstrap-start","ts":1552102391,"data":{}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":6001,"ename":"SIWA","job":"DESIGNER","mgr":7001,"hiredate":"2019-03-08 00:00:00","sal":730.00,"comm":6000.00,"deptno":40}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":"1980-12-17 00:00:00","sal":800.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-20 00:00:00","sal":1600.00,"comm":300.00,"deptno":30}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-22 00:00:00","sal":1250.00,"comm":500.00,"deptno":30}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":"1981-04-02 00:00:00","sal":2975.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-28 00:00:00","sal":1250.00,"comm":1400.00,"deptno":30}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":"1981-05-01 00:00:00","sal":2850.00,"comm":0.00,"deptno":30}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":"1981-06-09 00:00:00","sal":2450.00,"comm":0.00,"deptno":10}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":"1987-04-19 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":0,"hiredate":"1981-11-17 00:00:00","sal":5000.00,"comm":0.00,"deptno":10}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-08 00:00:00","sal":1500.00,"comm":0.00,"deptno":30}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":"1987-05-23 00:00:00","sal":1100.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":"1981-12-03 00:00:00","sal":950.00,"comm":0.00,"deptno":30}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":"1981-12-03 00:00:00","sal":3000.00,"comm":0.00,"deptno":20}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":"1982-01-23 00:00:00","sal":1300.00,"comm":0.00,"deptno":10}}{"database":"hlwtest","table":"emp","type":"bootstrap-insert","ts":1552102391,"data":{"empno":8888,"ename":"HIVE","job":"PROGRAM","mgr":7839,"hiredate":"1988-01-23 00:00:00","sal":10300.00,"comm":0.00,"deptno":null}}{"database":"maxwell","table":"bootstrap","type":"update","ts":1552102391,"xid":1617,"commit":true,"data":{"id":1,"database_name":"hlwtest","table_name":"emp","where_clause":null,"is_complete":1,"inserted_rows":16,"total_rows":0,"created_at":null,"started_at":"2019-03-09 11:33:11","completed_at":"2019-03-09 11:33:11","binlog_file":"mysql-bin.000018","binlog_position":225498,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}{"database":"hlwtest","table":"emp","type":"bootstrap-complete","ts":1552102391,"data":{}}
数据 消费 版本 消费者 语句 支持 数据库 更新 检查 监控 日志 三个 功能 意思 正在 步骤 用户 表里 进程 中创 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 unix服务器配置 变电站网络安全检测稿件 数据库查询一个字段多个条件 网络安全个人工作事迹 上海程序软件开发哪家实惠 网络安全服务必须要驻场吗 成都平台软件开发 三星手机备份的数据在哪个服务器 上海媒体软件开发服务报价表 逻辑与计算机网络技术 Web数据库表单开发 数据库sql语句的书写 事业编网络技术考试题库和答案 嘉定区信息网络技术服务价格 数据库安全性创建新用户 为什么服务器会封号 网络安全ppt 免费 数据库表时间自动填写 智慧工会软件开发哪家好 杨浦区一站式软件开发收费套餐 超微服务器更新签名失败 陕西夕秀软件开发有限公司 车企做软件开发 感动工行集体软件开发 仙居定制软件开发生产过程 cad怎么解除连接服务器 网络技术学测模拟测试 计算机软件开发笔试及答案 服务器文件系统错误 宜兴银联软件开发代理品牌
0