curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
or
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
tar xvf confluent-community-6.1.0.tar.gz
cd confluent-community-6.1.0
실행
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
* confluent-community-6.1.0 폴더 내에서
code ./etc/kafka/connect-distributed.properties
mvn 의 빌드된 jar들은 .m에 다있음.
cd /Users/lee/.m2/repository/mysql/mysql-connector-java/8.0.26
cp ./mysql-connector-java-8.0.26.jar /Users/lee/development/kafka/confluent-6.1.0/share/java/kafka
POST / http://localhost:8083/connectors
Type application/json
data
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"1234",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
GET / http://localhost:8083/connectors
http://localhost:8083/connectors/my-source-connect/status
insert into users(user_id, pwd, name) values('user', '1234', 'name');
insert into users(user_id, pwd, name) values('user2', '1234', 'name2');
POST / http://localhost:8083/connectors
Type application/json
data
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"1234",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"user_id"},{"type":"string","optional":true,"field":"pwd"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}],"optional":false,"name":"users"},"payload":{"id":7,"user_id":"user5","pwd":"7777","name":"name7","created_at":1633890976000}}
delete.topic.enable=true
./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic my_topic_users
- GET /connectors - return a list of active connectors
- POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
- GET /connectors/{name} - get information about a specific connector
- GET /connectors/{name}/config - get the configuration parameters for a specific connector
- PUT /connectors/{name}/config - update the configuration parameters for a specific connector
- GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
- GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
- GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
- PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
- PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
- POST /connectors/{name}/restart - restart a connector (typically because it has failed)
- POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
- DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration
- GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued
- PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector