debezium采集mysql数据的到kafka

下载mysql对应版本的debezium连接器

https://debezium.io/releases/1.7/

下载

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.7.2.Final/debezium-connector-mysql-1.7.2.Final-plugin.tar.gz
#新建存放连接器目录
mkdir -p /opt/module/debezium/connector
#解压
tar -zxvf debezium-connector-mysql-1.7.2.Final-plugin.tar.gz  -C /opt/module/debezium/connector

Mysq需要开启binlog

编辑my . cnf文件在[mysqld]选项中,,binlog_format指定mysql的binlog日志格式,必须是row格式

log_bin=mysql_bin
binlog_format=row

创建测试数据库表

CREATE DATABASE test;

CREATE TABLE `cap_table` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(10) NOT NULL,
  `status` char(1) DEFAULT '1',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB ;

insert into cap_table (`name`,status)  value('张三', 1);

配置kafka插件连接器

vim /usr/local/kafka3/config/connect-distributed.properties
#kafka集群地址
bootstrap.servers=localhost:9092,localhost:9093
#配置监听插件的路径
plugin.path=/opt/module/debezium/connector

kafka 启动连接器

/usr/local/kafka3/bin/kafka-server-start.sh -daemon  /usr/local/kafka3/config/kraft/server.properties
/usr/local/kafka3/bin/connect-distributed.sh  -daemon /usr/local/kafka3/config/connect-distributed.properties 

查看连接器启动日志

cat /usr/local/kafka3/logs/connectDistributed.out 

检测kafka连接器的服务状态,Debezium服务器的默认HTTP端口为8083

curl -H "Accept:application/json" localhost:8083/
#正常输出
{"version":"3.3.1","commit":"e23c59d00e687ff5","kafka_cluster_id":"4guwVNnTTeK_C3ILOsy49g"}

检查向Kafka Connect 注册的连接器列表

curl -H "Accept:application/json" localhost:8083/connectors/
#正常输出,因为还没注册
[]

kafka注册mysql连接器

#注册信息
{
  "name": "zcr-mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": 3306,
    "database.user": "root",
    "database.password": "new_password",
    "database.server.id": "2024",
    "database.server.name": "bigdata",
    "database.include.list": "test",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}

说明:

  • name:连接器名字
  • database.include.list:要监控的数据库列表
  • database.server.name :服务器名,会成为topic 的前缀

发送请求向kafka注册mysql连接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "zcr-mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": 3306,
    "database.user": "root",
    "database.password": "root",
    "database.server.id": "2024",
    "database.server.name": "bigdata",
    "database.include.list": "test",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}'
#成功返回
HTTP/1.1 201 Created
Date: Wed, 24 Apr 2024 07:51:18 GMT
Location: http://localhost:8083/connectors/zcr-mysql-connector
Content-Type: application/json
Content-Length: 483
Server: Jetty(9.4.48.v20220622)

#查看限制连接器情况
curl -H "Accept:application/json" localhost:8083/connectors/
["zcr-mysql-connector"]

查看当前topic

/usr/local/kafka3/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets

可以看到有个bigdata.test.cap_table的topic

消费该topic

/usr/local/kafka3/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bigdata.test.cap_table

往mysql插入数据

insert into cap_table (`name`,status)  value('test', 1);

上面的消费能看到具体的消费情况

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"default":"1","field":"status"}],"optional":true,"name":"bigdata.test.cap_table.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"default":"1","field":"status"}],"optional":true,"name":"bigdata.test.cap_table.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"bigdata.test.cap_table.Envelope"},"payload":{"before":null,"after":{"id":5,"name":"test","status":"1"},"source":{"version":"1.7.2.Final","connector":"mysql","name":"bigdata","ts_ms":1713945547000,"snapshot":"false","db":"test","sequence":null,"table":"cap_table","server_id":1,"gtid":null,"file":"mysql_bin.000001","pos":2095,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1713945547830,"transaction":null}}