集群环境:

IP hostname 配置
192.168.23.167 kafka01 4核cpu/8G内存/50G硬盘
192.168.23.168 kafka02 4核cpu/8G内存/50G硬盘
192.168.23.169 kafka03 4核cpu/8G内存/50G硬盘

集群安装目录:/data/apps

部署流程

root用户配置主机映射

1
vi /etc/hosts
1
2
3
192.168.23.167  kafka01
192.168.23.168 kafka02
192.168.23.169 kafka03

在root用户下新建kafka用户(kafka/admin)

1
2
adduser kafka -g kafka
passwd kafka

在root用户下将apps目录下的用户及用户组均更改为kafka

1
chown -R kafka:kafka /data/apps/

切换到kafka用户(之后的操作均在kafka用户下)

1
su kafka

配置免密登录

1
2
3
4
ssh-keygen -t rsa
ssh-copy-id kafka@kafka01
ssh-copy-id kafka@kafka02
ssh-copy-id kafka@kafka03

将所有安装资源上传到kafka01节点的kafka用户的家目录下

1
2
3
4
5
6
7
8
# zookeeper-3.4.10.tar.gz (官网下载)
# kafka_2.11-0.11.0.1.tar.gz (官网下载)
# kafka-manager-1.3.3.22 (官网下载)
# influxdb-1.7.5.x86_64.rpm (官网下载)
# jmxtrans-270.rpm (官网下载)
# grafana-6.0.2-1.x86_64.rpm (官网下载)
# install-kafka.sh (文件内容见附录)
# install-zookeeper.sh (文件内容见附录)

在kafka01节点上执行zookeeper集群安装脚本

1
sh ~/install-zookeeper.sh

在kafka01节点上执行kafka集群安装脚本

1
sh ~/install-kafka.sh

配置环境变量

1
2
3
4
5
vim ~/.bash_profile
# 内容如下:
export ZOOKEEPER_HOME=/data/apps/zookeeper-3.4.10
export KAFKA_HOME=/data/apps/kafka_0.11.0.1
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
1
2
# 使环境变量生效
source ~/.bash_profile

在kafka01、02、03节点上执行命令启动zookeeper集群

1
zkServer.sh start

在kafka01、02、03节点上执行命令启动kafka集群

1
JMX_PORT=9999 kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

Zookeeper常用命令

查看znode中的内容

1
ls  /

创建普通的节点

1
create

获得节点的信息

1
get

创建临时节点

1
create -e

编号节点:

1
create -s

删除一个节点

1
delete

递归删除节点

1
rmr

修改节点内容

1
set

监听节点

1
get /test watch

在其他节点进行修改

1
set /test 555

监听节点上收到WatchedEvent state:SyncConnected type:NodeDataChanged path:/test

Kafka常用命令

关闭Kafka(关闭Kafka之前禁止关闭Zookeeper)

1
bin/kafka-server-stop.sh

创建Topic

1
bin/kafka-topics.sh --create --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 --replication-factor 3 --partitions 3 --topic test

查看Topic列表

1
bin/kafka-topics.sh --list --zookeeper kafka01:2181,kafka02:2181,kafka03:2181

查看Topic详细信息:

1
bin/kafka-topics.sh --describe --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 --topic test

建立发布者console-producer:

1
bin/kafka-console-producer.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic test

建立订阅者console-consumer:

1
bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic test --from-beginning

删除topic(需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启)

1
bin/kafka-topics.sh --delete --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 --topic test

使用kafka-manager管理kafka集群

注:以下均在kafka用户下搭建,仅在kafka01节点上安装kafka-manager

将kafka-manager的安装包放到/data/apps目录下

1
mv ~/kafka-manager-1.3.3.22 /data/apps

配置kafka-manager

1
2
3
cd conf
vim application.conf
修改kafka-manager.zkhosts="kafka01:2181,kafka02:2181,kafka03:2181"

启动kafka-manager

1
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &

kafka-manager的WebUI

1
kafka01:8080

使用jmxtrans+influxdb+grafana监控JMX指标

注:以下均在root用户下搭建,除了jmxtrans,其他组件仅在kafka01节点上安装

开启Kafka JMX端口

1
2
cd bin
vim kafka-run-class.sh

第一行增加JMX_PORT=9999即可

修改好后重启kafka,查看Kafka以及JMX端口状态

1
2
ps -ef | grep kafka
netstat -anop | grep 9999

安装InfluxDB

下载InfluxDB rpm安装包

1
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.7.5.x86_64.rpm

安装rpm包

1
rpm -ivh influxdb-1.7.5.x86_64.rpm

启动InfluxDB

1
service influxdb start(systemctl start influxdb)

查看InfluxDB状态

1
2
ps -ef | grep influxdb
service influxdb status(systemctl status influxdb)

使用InfluxDB客户端

1
influx

创建用户和数据库

1
2
CREATE USER "admin" WITH PASSWORD 'admin' WITH ALL PRIVILEGES;
create database "jmxDB";

创建完成InfluxDB的用户和数据库暂时就够用了,其它简单操作如下,后面会用到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 创建数据库
create database "db_name"

# 显示所有的数据库
show databases

# 删除数据库
drop database "db_name"

# 使用数据库
use db_name

# 显示该数据库中所有的表
show measurements

# 创建表,直接在插入数据的时候指定表名
insert test,host=127.0.0.1,monitor_name=test count=1

# 删除表
drop measurement "measurement_name"

# 退出
quit

安装jmxtrans(所有kafka节点均安装)

下载jmxtrans rpm安装包

1
wget http://central.maven.org/maven2/org/jmxtrans/jmxtrans/270/jmxtrans-270.rpm

安装rpm包

1
rpm -ivh jmxtrans-270.rpm

jmxtrans相关路径

1
2
3
jmxtrans安装目录:/usr/share/jmxtrans
json文件默认目录:/var/lib/jmxtrans/
日志路径:/var/log/jmxtrans/jmxtrans.log

配置json,jmxtrans的github上有一段示例配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"servers" : [ {
"port" : "9999",
"host" : "127.0.0.1",
"queries" : [ {
"obj" : "java.lang:type=Memory",
"attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ],
"resultAlias":"jvmMemory",
"outputWriters" : [ {
"@class" : "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url" : "http://127.0.0.1:8086/",
"username" : "admin",
"password" : "admin",
"database" : "jmxDB",
"tags" : {"application" : "kafka"}
} ]
} ]
} ]
}
  • host:监控服务器
  • port:jmx端口
  • obj:对应jmx的ObjectName,就是我们要监控的指标
  • attr:对应ObjectName的属性,可以理解为我们要监控的指标的值
  • resultAlias:对应metric 的名称,在InfluxDB里面就是MEASUREMENTS名
  • tags:对应InfluxDB的tag功能,对与存储在同一个MEASUREMENTS里面的不同监控指标可以做区分,我们在用Grafana绘图的时候会用到,建议对每个监控指标都打上tags

附上两段配置的json示例文件(完整的均放在了三台节点的/var/lib/jmxtrans/目录下)

base_10.164.204.248.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
{
"servers": [{
"port": "9999",
"host": "10.164.204.248",
"queries": [{
"obj": "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
"attr": ["Count", "OneMinuteRate"],
"resultAlias": "BytesInPerSec",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "BytesInPerSec"
}
}]
},
{
"obj": "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
"attr": ["Count", "OneMinuteRate"],
"resultAlias": "BytesOutPerSec",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "BytesOutPerSec"
}
}]
},
{
"obj": "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec",
"attr": ["Count", "OneMinuteRate"],
"resultAlias": "BytesRejectedPerSec",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "BytesRejectedPerSec"
}
}]
},
{
"obj": "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"attr": ["Count", "OneMinuteRate"],
"resultAlias": "MessagesInPerSec",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "MessagesInPerSec"
}
}]
},
{
"obj": "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer",
"attr": ["Count"],
"resultAlias": "RequestsPerSec",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"request": "FetchConsumer"
}
}]
},
{
"obj": "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower",
"attr": ["Count"],
"resultAlias": "RequestsPerSec",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"request": "FetchFollower"
}
}]
},
{
"obj": "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce",
"attr": ["Count"],
"resultAlias": "RequestsPerSec",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"request": "Produce"
}
}]
},
{
"obj": "java.lang:type=Memory",
"attr": ["HeapMemoryUsage", "NonHeapMemoryUsage"],
"resultAlias": "MemoryUsage",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "MemoryUsage"
}
}]
},
{
"obj": "java.lang:type=GarbageCollector,name=*",
"attr": ["CollectionCount", "CollectionTime"],
"resultAlias": "GC",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "GC"
}
}]
},
{
"obj": "java.lang:type=Threading",
"attr": ["PeakThreadCount", "ThreadCount"],
"resultAlias": "Thread",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "Thread"
}
}]
},
{
"obj": "kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica",
"attr": ["Value"],
"resultAlias": "ReplicaFetcherManager",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "MaxLag"
}
}]
},
{
"obj": "kafka.server:type=ReplicaManager,name=PartitionCount",
"attr": ["Value"],
"resultAlias": "ReplicaManager",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "PartitionCount"
}
}]
},
{
"obj": "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions",
"attr": ["Value"],
"resultAlias": "ReplicaManager",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "UnderReplicatedPartitions"
}
}]
},
{
"obj": "kafka.server:type=ReplicaManager,name=LeaderCount",
"attr": ["Value"],
"resultAlias": "ReplicaManager",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "LeaderCount"
}
}]
},
{
"obj": "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer",
"attr": ["Count", "Max"],
"resultAlias": "TotalTimeMs",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "FetchConsumer"
}
}]
},
{
"obj": "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower",
"attr": ["Count", "Max"],
"resultAlias": "TotalTimeMs",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "FetchConsumer"
}
}]
},
{
"obj": "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce",
"attr": ["Count", "Max"],
"resultAlias": "TotalTimeMs",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "Produce"
}
}]
},
{
"obj": "kafka.server:type=ReplicaManager,name=IsrShrinksPerSec",
"attr": ["Count"],
"resultAlias": "ReplicaManager",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "IsrShrinksPerSec"
}
}]
}
]
}]
}

flink_sf_lx_248.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
"servers": [{
"port": "9999",
"host": "10.164.204.248",
"queries": [{
"obj": "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=FLINK_SF_LX",
"attr": ["Count"],
"resultAlias": "FLINK_SF_LX",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "BytesInPerSec"
}
}]
},
{
"obj": "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=FLINK_SF_LX",
"attr": ["Count"],
"resultAlias": "FLINK_SF_LX",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "BytesOutPerSec"
}
}]
},
{
"obj": "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=FLINK_SF_LX",
"attr": ["Count"],
"resultAlias": "FLINK_SF_LX",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "MessagesInPerSec"
}
}]
},
{
"obj": "kafka.log:type=Log,name=LogEndOffset,topic=FLINK_SF_LX,partition=*",
"attr": ["Value"],
"resultAlias": "FLINK_SF_LX",
"outputWriters": [{
"@class": "com.googlecode.jmxtrans.model.output.InfluxDbWriterFactory",
"url": "http://10.164.204.248:8086/",
"username": "admin",
"password": "admin",
"database": "jmxDB",
"tags": {
"application": "LogEndOffset"
}
}]
}
]
}]
}

配置说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
1、全局指标

每秒输入的流量
"obj" : "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"
"attr" : [ "Count" ]
"resultAlias":"BytesInPerSec"
"tags" : {"application" : "BytesInPerSec"}

每秒输入的流量
"obj" : "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec"
"attr" : [ "Count" ]
"resultAlias":"BytesOutPerSec"
"tags" : {"application" : "BytesOutPerSec"}

每秒输入的流量
"obj" : "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec"
"attr" : [ "Count" ]
"resultAlias":"BytesRejectedPerSec"
"tags" : {"application" : "BytesRejectedPerSec"}

每秒的消息写入总量

"obj" : "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"
"attr" : [ "Count" ]
"resultAlias":"MessagesInPerSec"
"tags" : {"application" : "MessagesInPerSec"}

每秒FetchFollower的请求次数

"obj" : "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower"
"attr" : [ "Count" ]
"resultAlias":"RequestsPerSec"
"tags" : {"request" : "FetchFollower"}

每秒FetchConsumer的请求次数

"obj" : "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer"
"attr" : [ "Count" ]
"resultAlias":"RequestsPerSec"
"tags" : {"request" : "FetchConsumer"}

每秒Produce的请求次数

"obj" : "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce"
"attr" : [ "Count" ]
"resultAlias":"RequestsPerSec"
"tags" : {"request" : "Produce"}

内存使用的使用情况
"obj" : "java.lang:type=Memory"
"attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ]
"resultAlias":"MemoryUsage"
"tags" : {"application" : "MemoryUsage"}

GC的耗时和次数
"obj" : "java.lang:type=GarbageCollector,name=*"
"attr" : [ "CollectionCount","CollectionTime" ]
"resultAlias":"GC"
"tags" : {"application" : "GC"}

线程的使用情况
"obj" : "java.lang:type=Threading"
"attr" : [ "PeakThreadCount","ThreadCount" ]
"resultAlias":"Thread"
"tags" : {"application" : "Thread"}

副本落后主分片的最大消息数量
"obj" : "kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica"
"attr" : [ "Value" ]
"resultAlias":"ReplicaFetcherManager"
"tags" : {"application" : "MaxLag"}

该broker上的partition的数量
"obj" : "kafka.server:type=ReplicaManager,name=PartitionCount"
"attr" : [ "Value" ]
"resultAlias":"ReplicaManager"
"tags" : {"application" : "PartitionCount"}

正在做复制的partition的数量
"obj" : "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions"
"attr" : [ "Value" ]
"resultAlias":"ReplicaManager"
"tags" : {"application" : "UnderReplicatedPartitions"}

Leader的replica的数量
"obj" : "kafka.server:type=ReplicaManager,name=LeaderCount"
"attr" : [ "Value" ]
"resultAlias":"ReplicaManager"
"tags" : {"application" : "LeaderCount"}

一个请求FetchConsumer耗费的所有时间
"obj" : "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer"
"attr" : [ "Count","Max" ]
"resultAlias":"TotalTimeMs"
"tags" : {"application" : "FetchConsumer"}

一个请求FetchFollower耗费的所有时间
"obj" : "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower"
"attr" : [ "Count","Max" ]
"resultAlias":"TotalTimeMs"
"tags" : {"application" : "FetchFollower"}

一个请求Produce耗费的所有时间
"obj" : "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce"
"attr" : [ "Count","Max" ]
"resultAlias":"TotalTimeMs"
"tags" : {"application" : "Produce"}


2、topic的监控指标

falcon_monitor_us每秒的写入流量
"kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=falcon_monitor_us"
"attr" : [ "Count" ]
"resultAlias":"falcon_monitor_us"
"tags" : {"application" : "BytesInPerSec"}

falcon_monitor_us每秒的输出流量
"kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=falcon_monitor_us"
"attr" : [ "Count" ]
"resultAlias":"falcon_monitor_us"
"tags" : {"application" : "BytesOutPerSec"}

falcon_monitor_us每秒写入消息的数量
"obj" : "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=falcon_monitor_us"
"attr" : [ "Count" ]
"resultAlias":"falcon_monitor_us"
"tags" : {"application" : "MessagesInPerSec"}

falcon_monitor_us在每个分区最后的Offset
"obj" : "kafka.log:type=Log,name=LogEndOffset,topic=falcon_monitor_us,partition=*"
"attr" : [ "Value" ]
"resultAlias":"falcon_monitor_us"
"tags" : {"application" : "LogEndOffset"}

PS:
1、参数说明
"obj"对应jmx的ObjectName,就是我们要监控的指标
"attr"对应ObjectName的属性,可以理解为我们要监控的指标的值
"resultAlias"对应metric 的名称,在InfluxDb里面就是MEASUREMENTS名
"tags" 对应InfluxDb的tag功能,对与存储在同一个MEASUREMENTS里面的不同监控指标可以做区分,我们在用Grafana绘图的时候会用到,建议对每个监控指标都打上tags

2、对于全局监控,每一个监控指标对应一个MEASUREMENTS,所有的kafka节点同一个监控指标数据写同一个MEASUREMENTS ,对于topc监控的监控指标,同一个topic所有kafka节点写到同一个MEASUREMENTS,并且以topic名称命名

启动jmxtrans

1
service jmxtrans start(systemctl start jmxtrans)

查看日志没有报错即为成功

1
tail /var/log/jmxtrans/jmxtrans.log

安装Grafana

下载jmxtrans rpm安装包

1
wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-6.0.2-1.x86_64.rpm

安装rpm包(如果缺少依赖,下载依赖)

1
rpm -ivh grafana-6.0.2-1.x86_64.rpm

1
2
3
4
5
6
7
8
9
yum install --downloadonly --downloaddir=./ fontconfig

yum localinstall fontconfig-2.13.0-4.3.el7.x86_64.rpm

yum install --downloadonly --downloaddir=./ urw-fonts

yum localinstall urw-fonts-2.4-11.el6.noarch.rpm

rpm -ivh grafana-6.0.2-1.x86_64.rpm

启动Grafana

1
service grafana-server start(systemctl start grafana-server

打开浏览器

1
http://127.0.0.1:3000

先输入默认用户名密码admin/admin

点击Add data source,选择InfluxDB

1
Url、Database、User、Password需要和jmxtrans采集数据配置文件里面的写一致,然后点击Save&Test,提示成功就正常了

通过后点击Back返回

左侧 + 可以创建或引入仪表盘,创建一个dashboard,然后在这里配置每一个监控指标的图

主要配置项说明:

配置项 说明
DataSource 选择Grafana已配置的数据源
FROM-Default 默认Schema,保持不变即可
FROM-measurement 对应的InfluxDB的表名
WHERE WHERE条件,根据自己需求选择
SELECT-Field 对应选的字段,可根据需求增减
SELECT-mean() 选择的字段对应的InfluxDB的函数
GroupBY-time() 根据时间分组
GROUPBY-fill() 当不存在数据时,以null为默认值填充

要点说明:

1
2
3
4
5
6
7
8
9
10
11
1、对于监控指标为Count的监控项,需要通过Grafana做计算得到我们想要的监控,比如BytesInPerSec这个指标,它的监控值是一个累计值,我们想要取到每秒的流量,肯定需要计算,(本次采集的值-上次采集的值)/60 ,jmxtrans是一分钟采集一次数据,具体配置参考下面截图:

因为我们是一分钟采集一次数据,所以group by 和derivative选1分钟;因为我们要每秒的流量,所以math这里除以60

2、X轴的单位选择,比如流量的单位、时间的单位、每秒消息的个数无单位等等,下面分布举一个例子介绍说明

设置流量的单位 ,点击需要设置的图,选择"Edit"进入编辑页面,切到Axes这个tab页,Unit--》data(Metric)--》bytes

设置时间的单位 ,点击需要设置的图,选择"Edit"进入编辑页面,切到Axes这个tab页,Unit--》time--》milliseconds(ms)

设置按原始值展示,无单位 ,点击需要设置的图,选择"Edit"进入编辑页面,切到Axes这个tab页,Unit--》none--》none

附录(kafka配置说明)

server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
# 节点的ID,必须与其它节点不同
broker.id=0

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=false

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# 套接字服务器监听的地址。如果没有配置,就使用java.net.InetAddress.getCanonicalHostName()的返回值
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://kafka01:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# 节点的主机名会通知给生产者和消费者。如果没有设置,如果配置了"listeners"就使用"listeners"的值。
# 否则就使用java.net.InetAddress.getCanonicalHostName()的返回值
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://kafka01:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
# 将侦听器的名称映射到安全协议,默认情况下它们是相同的。有关详细信息,请参阅配置文档
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# 服务器用来接受请求或者发送响应的线程数
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
# 服务器用来处理请求的线程数,可能包括磁盘IO
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
# 套接字服务器使用的发送缓冲区大小
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
# 套接字服务器使用的接收缓冲区大小
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
# 单个请求最大能接收的数据量
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
# 一个逗号分隔的目录列表,用来存储日志文件
log.dirs=/data/apps/kafkaapp/logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 每个主题的日志分区的默认数量。更多的分区允许更大的并行操作,但是它会导致节点产生更多的文件
num.partitions=6
default.replication.factor=3

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 每个数据目录中的线程数,用于在启动时日志恢复,并在关闭时刷新。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
# 对于除了开发测试之外的其他任何东西,group元数据内部主题的复制因子“__consumer_offsets”和“__transaction_state”,建议值大于1,以确保可用性(如3)。
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# 消息直接被写入文件系统,但是默认情况下我们仅仅调用fsync()以延迟的同步系统缓存
# There are a few important trade-offs here:
# 这些有一些重要的权衡
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# 1. 持久性:如果不使用复制,未刷新的数据可能会丢失。
# 2. 延迟:非常大的刷新间隔可能会在刷新时导致延迟,因为将会有大量数据刷新。
# 3. 吞吐量:刷新通常是最昂贵的操作,而一个小的刷新间隔可能会导致过多的搜索。
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# 下面的设置允许你去配置刷新策略,每隔一段时间刷新或者一次N个消息(或者两个都配置)。这可以在全局范围内完成,并在每个主题的基础上重写。

# The number of messages to accept before forcing a flush of data to disk
# 在强制刷新数据到磁盘之前允许接收消息的数量
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# 在强制刷新之前,消息可以在日志中停留的最长时间
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# 以下的配置控制了日志段的处理。策略可以配置为每隔一段时间删除片段或者到达一定大小之后。
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# 当满足这些条件时,将会删除一个片段。删除总是发生在日志的末尾。

# The minimum age of a log file to be eligible for deletion due to age
# 一个日志的最小存活时间,可以被删除
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
# 一个基于大小的日志保留策略。段将被从日志中删除只要剩下的部分段不低于log.retention.bytes。
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 每一个日志段大小的最大值。当到达这个大小时,会生成一个新的片段。
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# 检查日志段的时间间隔,看是否可以根据保留策略删除它们
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# Zookeeper连接字符串(具体见Zookeeper文档)
# This is a comma separated host:port pairs, each corresponding to a zk
# 这是一个以逗号为分割的部分,每一个都匹配一个Zookeeper
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# 您还可以将一个可选的chroot字符串附加到url,以指定所有kafka znode的根目录。
# root directory for all kafka znodes.
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181

# Timeout in ms for connecting to zookeeper
# 连接到Zookeeper的超时时间
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

producer.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=kafka01:9092,kafka02:9092,kafka03:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
partitioner.class=kafka.producer.DefaultPartitioner

# the maximum amount of time the client will wait for the response of a request
# 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

consumer.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.consumer.ConsumerConfig for more details

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=500

install-kafka.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/bin/bash
# Author gaojintao999@163.com
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
echo "~~~ 运行后续操作前请仔细阅读以下内容! Author gaojintao999@163.com ~~~"
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
echo "(1)已配置主机映射!"
echo "(2)已永久关闭防火墙!"
echo "(3)已配置免密登录!"
echo "(4)当前执行脚本和相关的安装包资源在同一路径下!"
read -p "上述条件是否都满足?(y or n)" yesorno

if [[ $yesorno = "y" || $yesorno = "Y" ]]; then

# 配置KAFKA的安装目录
currentTime=$(date '+%Y-%m-%d %H:%M:%S')
echo -e "请输入kafka的安装目录,不存在脚本自动创建,最后一个/不要写,如 /data/apps"
read kafkainstallpath

# 创建kafka安装的目录
if [ ! -d $kafkainstallpath ]; then
mkdir -p $kafkainstallpath
fi
if [ ! -d $kafkainstallpath ]; then
echo "创建目录$kafkainstallpath失败!请检查目录是否有权限"
exit
fi

# 解压tar包
currentdir=$(cd $(dirname $0); pwd)
ls | grep 'kafka.*[gz]$'
if [ $? -ne 0 ]; then
# 当前目录没有kafka的压缩包
echo "在$currentdir下没有发现kafka*.tar.gz,请自行上传!"
exit
else
# 解压
tar -zxvf $currentdir/$(ls | grep 'kafka.*[gz]$') -C $kafkainstallpath
fi

# kafka版本全称
kafkaversion=`ls $kafkainstallpath| grep 'kafka_2.*'`

# kafka配置文件存储路径
confpath=$kafkainstallpath/$kafkaversion/config

# 修改配置文件
echo -e "请输入当前kafka节点的broker.id:唯一值 例如 0"
read brokerid
sed -i "s/^broker.id=0/broker.id=${brokerid}/g" $confpath/server.properties

echo -e "请输入当前kafka节点的hostname: 例如kafka01"
read hostname
sed -i "s/^#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/$hostname:9092/g" $confpath/server.properties

echo -e "请输入kafka消息存储目录:例如 /data/apps/kafkaapp/log"
read kafkalogspath

#创建KAFKA日志存储目录
if [ ! -d $kafkalogspath ]; then
mkdir -p $kafkalogspath
fi
if [ ! -d $kafkalogspath ]; then
echo "创建目录$kafkalogspath失败!请检查目录是否有权限"
exit
fi

bak_dir='log.dirs=/tmp/kafka-logs'
new_dir='log.dirs='$kafkalogspath
sed -i "s!${bak_dir}!${new_dir}!g" $confpath/server.properties

echo -e '请输入zookeeper集群的所有节点:(严格按照示例格式) 例如kafka01:2181,kafka02:2181,kafka03:2181'
read allhosts
sed -i "s/^zookeeper.connect=localhost:2181/zookeeper.connect=$allhosts/g" $confpath/server.properties

# 关闭删除topic的权限
sed -i 's/^#delete.topic.enable=true/delete.topic.enable=false/g' $confpath/server.properties
# 设置topic的默认分区数量为6
sed -i 's/^num.partitions=1/num.partitions=6/g' $confpath/server.properties
# 一个基于大小的日志保留策略。段将被从日志中删除只要剩下的部分段不低于log.retention.bytes。
sed -i 's/^#log.retention.bytes=1073741824/log.retention.bytes=1073741824/g' $confpath/server.properties
# 在强制刷新数据到磁盘之前允许接收消息的数量
sed -i 's/^#log.flush.interval.messages=10000/log.flush.interval.messages=10000/g' $confpath/server.properties
# 在强制刷新之前,消息可以在日志中停留的最长时间
sed -i 's/^#log.flush.interval.ms=1000/log.flush.interval.ms=1000/g' $confpath/server.properties
# 设置偏移量topic的复制因子为3
sed -i 's/^offsets.topic.replication.factor=1/offsets.topic.replication.factor=3/g' $confpath/server.properties
# 设置事务topic的复制因子为3
sed -i 's/^transaction.state.log.replication.factor=1/transaction.state.log.replication.factor=3/g' $confpath/server.properties
# 设置默认副本因子为3
echo "">>$confpath/server.properties
echo "default.replication.factor=3" >>$confpath/server.properties

# echo "">>$confpath/server.properties
# echo "log.cleanup.policy=delete" >>$confpath/server.properties
#kafka参数优化
# sed -i 's/^log.retention.hours=16/log.retention.hours=72/g' $confpath/server.properties
# param=`cat /proc/cpuinfo | grep "cpu cores"| uniq`
# bak_count="num.network.threads=3"
# new_count="num.network.threads="$((${param:0-1:1}+1))
# sed -i "s!${bak_count}!${new_count}!g" $confpath/server.properties
# bak_io="num.network.threads=3"
# new_io="num.network.threads="$((${param:0-1:1}+${param:0-1:1}))
# sed -i "s!${bak_io}!${new_io}!g" $confpath/server.properties

#PATH设置
#末行插入
#echo "">>~/.bash_profile
#echo "#KAFKA $currentTime">>~/.bash_profile
#echo "export KAFKA_HOME=$kafkainstallpath/$kafkaversion">>~/.bash_profile
#echo 'export PATH=$PATH:$KAFKA_HOME/bin'>>~/.bash_profile
#source ~/.bash_profile

echo -e "是否远程复制 请输入y/n"
read flag
if [[ $flag == "y" ]]; then

#修改并分发安装文件
kafkapath=$kafkainstallpath/$kafkaversion
kafkapathtemp=$kafkainstallpath/$kafkaversion-temp
cp -r $kafkapath $kafkapathtemp

echo "以下输入的节点必须做免密登录"
echo -e '请输入除当前节点之外的节点(当前节点${hostname}),严格符合以下格式hostname:brokerid,空格隔开, 如kafka02:1 kafka03:2'
read allnodes
user=`whoami`
array2=(${allnodes// / })
for allnode in ${array2[@]}
do
array3=(${allnode//:/ })
kafkahostname=${array3[0]}
kafkabrokerid=${array3[1]}
echo ======= $kafkahostname =======

#修改文件
ssh $kafkahostname "rm -rf $kafkapath $kafkalogspath"
ssh $kafkahostname "mkdir -p $kafkapath $kafkalogspath"

#修改broker.id
old_brokerid="broker.id=$brokerid"
new_brokerid="broker.id=$kafkabrokerid"
sed -i "s!${old_brokerid}!${new_brokerid}!g" $kafkapathtemp/config/server.properties
#修改listeners
old_listeners="listeners=PLAINTEXT:\/\/${hostname}:9092"
new_listeners="listeners=PLAINTEXT:\/\/${kafkahostname}:9092"
sed -i "s!${old_listeners}!${new_listeners}!g" $kafkapathtemp/config/server.properties

scp -r $kafkapathtemp/* ${user}@$kafkahostname:$kafkapath/

#ssh $kafkahostname "echo ''>>~/.bash_profile"
#ssh $kafkahostname "echo '#KAFKA $currentTime'>>~/.bash_profile"
#ssh $kafkahostname "echo 'export KAFKA_HOME=$kafkainstallpath/$kafkaversion'>>~/.bash_profile"
#ssh $kafkahostname 'echo "export PATH=\$PATH:\$KAFKA_HOME/bin">>~/.bash_profile'
#ssh $kafkahostname "source ~/.bash_profile"

#再次修改回来 防止修改错误
new_brokerid="broker.id=$brokerid"
old_brokerid="broker.id=$kafkabrokerid"
sed -i "s!${old_brokerid}!${new_brokerid}!g" $kafkapathtemp/config/server.properties
new_listeners="listeners=PLAINTEXT:\/\/$hostname:9092"
old_listeners="listeners=PLAINTEXT:\/\/$kafkahostname:9092"
sed -i "s!${old_listeners}!${new_listeners}!g" $kafkapathtemp/config/server.properties

echo ======= $kafkahostname 远程复制完成 =======
done

#删除临时文件
rm -rf $kafkapathtemp

fi

else
echo "退出当前程序!"
exit
fi

install-zookeeper.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/bin/bash
# Author gaojintao999@163.com
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
echo "~~~ 运行后续操作前请仔细阅读以下内容! Author gaojintao999@163.com ~~~"
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
echo "(1)已配置主机映射!"
echo "(2)已永久关闭防火墙!"
echo "(3)已配置免密登录!"
echo "(4)当前执行脚本和相关的安装包资源在同一路径下!"
read -p "上述条件是否都满足?(y or n)" yesorno

if [[ $yesorno = "y" || $yesorno = "Y" ]]; then

#配置zk的安装目录
currentTime=$(date '+%Y-%m-%d %H:%M:%S')
echo -e "请输入zk的安装目录,不存在脚本自动创建,最后一个/不要写 如/data/apps"
read zkinstallpath

#创建zk安装的目录
if [ ! -d $zkinstallpath ]; then
mkdir -p $zkinstallpath
fi
if [ ! -d $zkinstallpath ]; then
echo "创建目录$zkinstallpath失败!请检查目录是否有权限"
exit:
fi

#解压tar包
currentdir=$(cd $(dirname $0); pwd)
ls | grep 'zookeeper-.*[gz]$'
if [ $? -ne 0 ]; then
#当前目录没有zk的压缩包
echo "在$currentdir下没有发现zookeeper的gz压缩包,请自行上传!"
exit
else
#解压
tar -zxvf $currentdir/$(ls | grep 'zookeeper-.*[gz]$') -C $zkinstallpath
fi

zkversion=`ls $zkinstallpath| grep 'zookeeper-.*'`

confpath=$zkinstallpath/$zkversion/conf

cp $confpath/zoo_sample.cfg $confpath/zoo.cfg

echo -e "请输入zk数据存储目录:例如 /data/apps/zookeeperapp"
read zkdatapath
#创建zk数据的目录
if [ ! -d $zkdatapath ]; then
mkdir -p $zkdatapath
fi
if [ ! -d $zkdatapath ]; then
echo "创建目录$zkdatapath失败!请检查目录是否有权限"
exit
fi

bak_dir='dataDir=/tmp/zookeeper'
new_dir='dataDir='$zkdatapath
sed -i "s!${bak_dir}!${new_dir}!g" $confpath/zoo.cfg


echo "请输入所有的zk集群节点:(按照空格分割) 例如 zk01 zk02 zk03"
read zkNodes
array=(`echo $zkNodes | tr ' ' ' '` )

#末行插入
echo "">>$confpath/zoo.cfg
for i in `seq 0 $((${#array[@]}-1))`
do
echo "server.$((${i}+1))=${array[${i}]}:2888:3888" >>$confpath/zoo.cfg
done

echo "请输入zk的myid,不能重复,唯一值 例如 1"
read myid
echo $myid > $zkdatapath/myid

binpath=$zkinstallpath/$zkversion/bin

sed -i 's/ZOO_LOG_DIR=\".\"/ZOO_LOG_DIR=\"${ZOOKEEPER_PREFIX}\/logs\"/g' $binpath/zkEnv.sh

echo "ZOO_LOG_DIR修改成功"

sed -i 's/ZOO_LOG4J_PROP=\"INFO,CONSOLE\"/ZOO_LOG4J_PROP=\"INFO,ROLLINGFILE\"/g' $binpath/zkEnv.sh
echo "ZOO_LOG4J_PROP修改成功"

sed -i 's/_ZOO_DAEMON_OUT=\"$ZOO_LOG_DIR\/zookeeper.out\"/_ZOO_DAEMON_OUT=\"$ZOO_LOG_DIR\/zookeeper.log\"/g' $binpath/zkServer.sh
echo "_ZOO_DAEMON_OUT修改成功"

sed -i 's/zookeeper.root.logger=INFO, CONSOLE/zookeeper.root.logger=INFO, ROLLINGFILE/g' $confpath/log4j.properties
echo "zookeeper.root.logger修改成功"

#PATH设置
#末行插入
#echo "">>~/.bash_profile
#echo "#zookeeper $currentTime">>~/.bash_profile
#echo "export ZK_HOME=$zkinstallpath/$zkversion">>~/.bash_profile
#echo 'export PATH=$PATH:$ZK_HOME/bin'>>~/.bash_profile
#source ~/.bash_profile

echo -e "是否远程复制 请输入y/n"
read flag
if [[ $flag == "y" ]]; then

#修改并分发安装文件
zkpath=$zkinstallpath/$zkversion
zkpathtemp=$zkinstallpath/$zkversion-temp
cp -r $zkpath $zkpathtemp

echo "以下输入的节点必须做免密登录"
currentnode=`hostname`
echo -e '请输入除当前节点之外的节点(当前节点$currentnode),严格符合以下格式hostname:zkID,空格隔开, 如zk02:2 zk03:3 zk04:4 zk05:5 zk06:6'
read allnodes
user=`whoami`
array2=(${allnodes// / })
for allnode in ${array2[@]}
do
array3=(${allnode//:/ })
hostname=${array3[0]}
zkid=${array3[1]}
echo ======= $hostname =======

#修改文件
ssh $hostname "mkdir -p $zkpath"
ssh $hostname "mkdir -p $zkdatapath"

#修改zk的myid唯一值
ssh $hostname "echo $zkid > $zkdatapath/myid"

scp -r $zkpathtemp/* ${user}@$hostname:$zkpath/

#ssh $hostname "echo ''>>~/.bash_profile"
#ssh $hostname "echo '#zk $currentTime'>>~/.bash_profile"
#ssh $hostname "echo 'export ZK_HOME=$zkinstallpath/$zkversion'>>~/.bash_profile"
#ssh $hostname 'echo "export PATH=\$PATH:\$ZK_HOME/bin">>~/.bash_profile'
#ssh $hostname "source ~/.bash_profile"

echo ======= $hostname 远程复制完成 =======
done

#删除临时文件
rm -rf $zkpathtemp

fi

else
echo "退出当前程序!"
exit
fi