Kafka Connect: 同步 MySQL 的表到 Kafka

核心流程

Kafka Connect 是一个连接器,将数据搬到 Kafka,然后再存储到另一个地方,解耦了数据源和目标存储。

既然要把数据搬到 Kafka,所以需要提前准备好 Kafka 实例。

前置条件

  • Kafka (本文示例版本:2.2.1-cdh6.3.1)

1. 启动 Kafka connect distributed

Kafka connect 就在 Kafka 安装包中,下载对应的安装包open in new window,本文下载 2.2.1 版本的 Kafka 安装包。

Kafka 支持单机和分布式部署,这里演示分布式部署。

下载解压后,首先修改配置,指定 Kafka 实例地址。

# cd /data/bigdata/kafka/kafka_2.12-2.2.1/
# vim  config/connect-distributed.properties
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=10.0.0.29:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
1
2
3
4
5
6

另外几个参数(config.storage.topic, offset.storage.topic, status.storage.topic)使用默认即可。

启动之。

bin/connect-distributed.sh config/connect-distributed.properties
1

查看集群的配置。

8083 是 REST API 默认监听端口。

# curl localhost:8083
{"version":"2.2.1","commit":"55783d3133a5a49a","kafka_cluster_id":"thmuyps7QaeG9JbOXxnJsw"}
1
2

2. 安装 connector-plugins

默认配置只能看到这 2 个

# curl -s localhost:8083/connector-plugins | python -m json.tool
[
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.2.1"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.2.1"
    }
]
1
2
3
4
5
6
7
8
9
10
11
12
13

2.1 下载 JDBC Connector (Source and Sink)

下载 Download installationopen in new window

解压放到 kafka 的 plugins 目录下

2.2 下载 mysql-connector-java

Connect/J JDBC driver for MySQLopen in new window

放到 kafka 的 libs 目录下。

修改配置文件 config/connect-distributed.properties 中 connect 的插件地址

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/data/bigdata/kafka/kafka_2.12-2.2.1/libs,/data/bigdata/kafka/kafka_2.12-2.2.1/plugins
1
2
3
4
5
6
7
8
9

重新启动后,可以看到有这些 connector-plugins

# curl -s localhost:8083/connector-plugins | python -m json.tool
[
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirAvroSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirBinaryFileSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector",
        "type": "source",
        "version": "0.0.0.0"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "10.1.1"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "10.1.1"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.2.1"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.2.1"
    }
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

3. 新增 mysql source connector

参照 connect-mysql-source.propertiesopen in new window,组装如下 JSON ,并注册 connector。

echo '{
    "name":"mysql_source_hue_queryhistory",
    "config":{
        "connector.class":"JdbcSourceConnector",
        "connection.url":"jdbc:mysql://10.0.0.15:3306/hue?user=hue&password=xxxxx",
        "mode":"incrementing",
        "table.whitelist":"desktop_document2",
        "incrementing.column.name":"id",
        "topic.prefix":"hue-"
    }
}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
1
2
3
4
5
6
7
8
9
10
11

"connection.url":"jdbc:mysql://10.0.0.15:3306/hue?user=hue&password=xxxxx", MySQL的连接串,以 Hue 的查询记录为例 "mode":"incrementing" , 增量拉取MySQL表的数据 "incrementing.column.name":"id", 增量字段 "table.whitelist":"desktop_document2", 只拉某张表 "topic.prefix":"hue-" , 存储到 Kafka 中 topic 的命名规则

查看当前的 connector 列表 和 详情。

# curl localhost:8083/connectors
["mysql_source_hue_queryhistory"]

# curl -s localhost:8083/connectors/mysql_source_hue_queryhistory/config | python -m json.tool
{
    "connection.url": "jdbc:mysql://10.0.0.15:3306/hue?user=hue&password=xxxxx",
    "connector.class": "JdbcSourceConnector",
    "incrementing.column.name": "id",
    "mode": "incrementing",
    "name": "mysql_source_hue_queryhistory",
    "table.whitelist": "desktop_document2",
    "topic.prefix": "hue-"
}
1
2
3
4
5
6
7
8
9
10
11
12
13

验证 Kafka 中对应 Topic 的数据

# bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.29:9092 --topic hue-desktop_document2
1

在 Kafka 中也可看到对应 Topic 的生产情况

-w1530

4. 扩展一下

将 Kafka 中的数据搬到 Hive 中

-w1915

在 Hue 中查询

-w1622

reference