debezium采集mongodb数据的到kafka

MongoDB连接器使用MongoDB的 oplog来捕获更改,因此该连接器仅适用于MongoDB副本集或分片集群。
下载mongo对应版本的debezium连接器
https://debezium.io/releases/1.7/

wget  https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/1.7.2.Final/debezium-connector-mongodb-1.7.2.Final-plugin.tar.gz
#新建存放连接器目录
mkdir -p /opt/module/debezium/connector
#解压
tar -zxvf debezium-connector-mongodb-1.7.2.Final-plugin.tar.gz -C /opt/module/debezium/connector

重启kafka连接器

/usr/local/kafka3/bin/connect-distributed.sh  -daemon /usr/local/kafka3/config/connect-distributed.properties 

注册mongodb连接

#注册信息
{
  "name": "zcr-mongodb-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "myrs/127.0.0.1:23001",
    "mongodb.name": "test",
    "collection.include.list": ".*"
  }
}

注册信息解析

  • name:连接器的名称。
  • connector.class:指定要使用的连接器类,此处为 MongoDB 连接器。
  • mongodb.hosts:MongoDB 主机的地址和端口。
  • mongodb.name": "test":MongoDB 数据库的名称。
  • collection.include.list: "":要包含在监视中的集合列表,此处为 "",表示所有集合。

注册mongodb连接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "zcr-mongodb-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "myrs/127.0.0.1:23001",
    "mongodb.name": "test",
    "collection.include.list": ".*"
  }
}'
#输出
HTTP/1.1 201 Created
Date: Wed, 24 Apr 2024 08:17:13 GMT
Location: http://localhost:8083/connectors/zcr-mongodb-connector
Content-Type: application/json
Content-Length: 260
Server: Jetty(9.4.48.v20220622)

#查看限制连接器情况
curl -H "Accept:application/json" localhost:8083/connectors/
["zcr-mysql-connector","zcr-mongodb-connector"]

测试mongo插入一条数据

db.test_table.insert({"name": "test","age" :30})

消费该数据

/usr/local/kafka3/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic  test.test.test_table --from-beginning 
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"filter"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"},{"type":"int64","optional":true,"field":"tord"},{"type":"string","optional":true,"field":"stxnid"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"test.test.test_table.Envelope"},"payload":{"after":"{\"_id\": {\"$oid\": \"6628c13ba762afd74f035fbb\"},\"name\": \"test\",\"age\": 30.0}","patch":null,"filter":null,"source":{"version":"1.7.2.Final","connector":"mongodb","name":"test","ts_ms":1713946939000,"snapshot":"false","db":"test","sequence":null,"rs":"myrs","collection":"test_table","ord":2,"h":4449335239969700278,"tord":null,"stxnid":null},"op":"c","ts_ms":1713946939135,"transaction":null}}