下载mysql对应版本的debezium连接器
https://debezium.io/releases/1.7/
下载
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.7.2.Final/debezium-connector-mysql-1.7.2.Final-plugin.tar.gz
#新建存放连接器目录
mkdir -p /opt/module/debezium/connector
#解压
tar -zxvf debezium-connector-mysql-1.7.2.Final-plugin.tar.gz -C /opt/module/debezium/connector
Mysq需要开启binlog
编辑my . cnf文件在[mysqld]选项中,,binlog_format指定mysql的binlog日志格式,必须是row格式
log_bin=mysql_bin
binlog_format=row
创建测试数据库表
CREATE DATABASE test;
CREATE TABLE `cap_table` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(10) NOT NULL,
`status` char(1) DEFAULT '1',
PRIMARY KEY (`id`)
) ENGINE=InnoDB ;
insert into cap_table (`name`,status) value('张三', 1);
配置kafka插件连接器
vim /usr/local/kafka3/config/connect-distributed.properties
#kafka集群地址
bootstrap.servers=localhost:9092,localhost:9093
#配置监听插件的路径
plugin.path=/opt/module/debezium/connector
kafka 启动连接器
/usr/local/kafka3/bin/kafka-server-start.sh -daemon /usr/local/kafka3/config/kraft/server.properties
/usr/local/kafka3/bin/connect-distributed.sh -daemon /usr/local/kafka3/config/connect-distributed.properties
查看连接器启动日志
cat /usr/local/kafka3/logs/connectDistributed.out
检测kafka连接器的服务状态,Debezium服务器的默认HTTP端口为8083
curl -H "Accept:application/json" localhost:8083/
#正常输出
{"version":"3.3.1","commit":"e23c59d00e687ff5","kafka_cluster_id":"4guwVNnTTeK_C3ILOsy49g"}
检查向Kafka Connect 注册的连接器列表
curl -H "Accept:application/json" localhost:8083/connectors/
#正常输出,因为还没注册
[]
kafka注册mysql连接器
#注册信息
{
"name": "zcr-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": 3306,
"database.user": "root",
"database.password": "new_password",
"database.server.id": "2024",
"database.server.name": "bigdata",
"database.include.list": "test",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
说明:
- name:连接器名字
- database.include.list:要监控的数据库列表
- database.server.name :服务器名,会成为topic 的前缀
发送请求向kafka注册mysql连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "zcr-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": 3306,
"database.user": "root",
"database.password": "root",
"database.server.id": "2024",
"database.server.name": "bigdata",
"database.include.list": "test",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'
#成功返回
HTTP/1.1 201 Created
Date: Wed, 24 Apr 2024 07:51:18 GMT
Location: http://localhost:8083/connectors/zcr-mysql-connector
Content-Type: application/json
Content-Length: 483
Server: Jetty(9.4.48.v20220622)
#查看限制连接器情况
curl -H "Accept:application/json" localhost:8083/connectors/
["zcr-mysql-connector"]
查看当前topic
/usr/local/kafka3/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
可以看到有个bigdata.test.cap_table的topic
消费该topic
/usr/local/kafka3/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bigdata.test.cap_table
往mysql插入数据
insert into cap_table (`name`,status) value('test', 1);
上面的消费能看到具体的消费情况
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"default":"1","field":"status"}],"optional":true,"name":"bigdata.test.cap_table.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"default":"1","field":"status"}],"optional":true,"name":"bigdata.test.cap_table.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"bigdata.test.cap_table.Envelope"},"payload":{"before":null,"after":{"id":5,"name":"test","status":"1"},"source":{"version":"1.7.2.Final","connector":"mysql","name":"bigdata","ts_ms":1713945547000,"snapshot":"false","db":"test","sequence":null,"table":"cap_table","server_id":1,"gtid":null,"file":"mysql_bin.000001","pos":2095,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1713945547830,"transaction":null}}