1. 测试环境

物理机设备:集群(3 master 2 infra 3 node)
管理(1manager 部署LB+外部镜像仓库+DNS)
物理机配置:40c 378G
Openshift:v3.11.0+1a90c5c-83
Kubernetes:v1.11.0+d4cacc0
Kafka Operator:amq-streams-1.1.0
Kafka:2.1.1

2. 测试要求和内容

根据测试要求,本次具体测试的功能点如下:

序号 测试项目 测试项 优先级
1 Kafka Operator部署 Kafka Operator部署
Kafka集群创建
Kafka监控部署
2 Kafka功能测试 Kafka创建Topic
Openshift集群内部Kafka producer与consumer测试
Openshift集群外部Kafka producer与consumer测试(NodePort方案)
Openshift集群外部Kafka producer与consumer测试(Router方案)
3 Kafka性能测试 Kafka写入消息压力测试[集群内部]
Kafka 消费消息压力测试[集群内部]
Kafka写入消息压力测试[集群外部NodePort方案]
Kafka 消费消息压力测试[集群外部NodePort方案]
Kafka写入消息压力测试[集群外部Router方案]
Kafka 消费消息压力测试[集群外部Router方案]
4 高可用性测试 Broker异常,恢复时长
磁盘故障,数据影响

3. 测试方法和结果

3.1 Kafka Operator部署

3.1.1 Kafka Operator部署

3.1.1.1 测试方法和步骤

测试准备

步骤一:拉取需要镜像,并将它们导入内部镜像仓库

步骤二:下载部署脚本,并解压服务器上

1
2
$ wget http://sha.stor.mbcloud.com/openshift-deploy-material/amq-streams-1.1.0-ocp-install-examples.zip
$ unzip amq-streams-1.1.0-ocp-install-examples.zip

步骤三:将脚本中访问外网镜像仓库地址,改成内网镜像地址
将install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml中的registry.access.redhat.com更改为harbor.apps.it.mbcloud.com

测试过程

步骤一:创建项目myproject

1
$ oc new-project myproject

步骤二:安装部署Operator

1
$ oc apply -f install/cluster-operator

3.1.1.2 测试结果

完成Kafka Operator部署
kafka operator部署

3.1.2  Kafka集群创建

3.1.2.1 测试方法和步骤

步骤一:进入myproject项目

1
$ oc project myproject

步骤二:创建Kafka集群my-cluster,三副本,存储无持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.1.1
    replicas: 3
    listeners:
      plain: {}
      tls: {}
      external:
        type: route
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: "2.1"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF

步骤三:查看部署结果

1
2
3
4
5
6
7
8
9
10
$ oc get pod
NAME                                              READY     STATUS    RESTARTS   AGE
my-cluster-entity-operator-78dc68cc86-9ghmw       3/3       Running   0          6d
my-cluster-kafka-0                                2/2       Running   0          6d
my-cluster-kafka-1                                2/2       Running   0          6d
my-cluster-kafka-2                                2/2       Running   0          6d
my-cluster-zookeeper-0                            2/2       Running   0          6d
my-cluster-zookeeper-1                            2/2       Running   0          6d
my-cluster-zookeeper-2                            2/2       Running   0          6d
strimzi-cluster-operator-7f46bcb9c6-s8x5r         1/1       Running   0          6d

3.1.2.2 测试结果

部署Kafka集群my-cluster。3 zookeeper + 3 kafka

Kafka集群部署

3.1.3 Kafka监控部署

3.1.3.1 测试方法和步骤

步骤一:在kafka my-cluster的配置中添加metrics设置,

参考example/metrics/kafka-metrics.yaml文件中metrics的属性

步骤二:给prometheus添加查看myproject权限

1
$ oc policy add-role-to-user view system:serviceaccount:openshift-monitoring:prometheus-k8s -n myproject

步骤三:在openshift-monitoring下新建servicemonitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ cat << EOF | oc create -f -
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  labels:
    k8s-app: prometheus
  name: my-cluster-kafka
  namespace: openshift-monitoring
spec:
  endpoints:
  - port: metrics
  namespaceSelector:
    matchNames:
    - myproject
  selector:
    matchLabels:
      strimzi.io/name: my-cluster-kafka-bootstrap

步骤四:在grafana中添加Kafka的监控Dashboard

Dashboard配置参考examples/metrics/grafana-dashboards/strimzi-kafka.json

3.1.3.2 测试结果

在grafana上查看Kafka的监控图表

grafana kafka监控.png

3.2.Kafka功能测试

3.2.1 Kafka创建Topic

3.2.1.1 测试方法和步骤

步骤一:进入myproject项目

1
$ oc project myproject

步骤二:创建Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
EOF

步骤三:验证Topic状态

1
2
3
4
5
6
$ oc exec -c zookeeper  my-cluster-zookeeper-0 --  bin/kafka-topics.sh --zookeeper 127.0.0.1:21810 --topic my-topic --describe
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:my-topic PartitionCount:3 ReplicationFactor:3 Configs:message.format.version=2.1-IV2
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-topic Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
Topic: my-topic Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2

3.2.1.2 测试结果

my-topic Topic创建成功。为3分区3复本。

3.2.2 Openshift集群内部Kafka producer与consumer测试

3.2.2.1 测试方法和步骤

步骤一:运行consumer端

1
$ oc run kafka-consumer -ti --image=amq7/amq-streams-kafka:1.1.0-kafka-2.1.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

步骤二:运行producer端

1
$ oc run kafka-producer -ti --image=amq7/amq-streams-kafka:1.1.0-kafka-2.1.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic

步骤三:在producer端发送数据,在consumer端显示发送的数据

3.2.2.2 测试结果

通过my-topic Topic,producer端产生数据,consumer端消费数据

生产压测.png

3.2.3 Openshift集群外部Kafka producer与consumer测试(NodePort方案)

3.2.3.1 测试方法和步骤

TLS设置为False

步骤一:将Kafka的配置项中添加external为nodeport,并固定nodeport为32678

1
2
3
4
5
6
7
listeners:
      external:
        tls: false
        type: nodeport
        overrides:
          bootstrap:
            nodePort: 32678

步骤二:集群外网环境创建Kafka Producer,连接Kafka

1
2
3
4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['99.248.81.70:32678'])
producer.send('my-topic', 'hello world')
producer.close()

步骤三:集群外网环境新建Kafka Consumer,消费Kafka中my-topic主题的数据

1
2
3
4
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic', bootstrap_servers=['99.248.81.70:32678'])
for msg in consumer:
    print(msg)

TLS设置为True

步骤一:将Kafka的配置项中添加external为nodeport,并固定nodeport为32678

1
2
3
4
5
6
listeners:
      external:
        type: nodeport
        overrides:
          bootstrap:
            nodePort: 32678

步骤二:获得secret/my-cluster-clients-ca/ca.key与secret/my-cluster-clients-ca-cert/ca.crt

1
2
$ oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
$ oc extract secret/my-cluster-clients-ca --keys=ca.key --to=- > ca.key

步骤三:集群外网环境创建Kafka Producer,连接Kafka

1
2
3
4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['99.248.81.70:32678'], ssl_check_hostname=False, ssl_cafile='ca.crt', ssl_certfile='ca.key', security_protocol='SSL')
producer.send('my-topic', 'hello world')
producer.close()

步骤三:集群外网环境新建Kafka Consumer,消费Kafka中my-topic主题的数据

1
2
3
4
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic', bootstrap_servers=['99.248.81.70:32678'], ssl_check_hostname=False, ssl_cafile='ca.crt', ssl_certfile='ca.key', security_protocol='SSL')
for msg in consumer:
    print(msg)

3.2.3.2 测试结果

与TLS设置为False一样

通过nodePort连接到Kafka集群,Producer将会往Topic my-topic发送一条“hello world”的消息,Consumer将会将消息输出到console端

1
2
C:\Python27\python.exe D:/workspace/mb_autopf/utils/kafka-test/consumer.py
ConsumerRecord(topic=u'my-topic', partition=0, offset=333338, timestamp=1558511698388L, timestamp_type=0, key=None, value='hello world', checksum=-1265845277, serialized_key_size=-1, serialized_value_size=11)

3.2.4 Openshift集群外部Kafka producer与consumer测试(Router方案)

3.2.4.1 测试方法和步骤

步骤一:将Kafka的配置项中添加external为nodeport,并固定nodeport为32678

1
2
3
listeners:
      external:
        type: route

步骤二:获得secret/my-cluster-clients-ca/ca.key与secret/my-cluster-clients-ca-cert/ca.crt

1
2
$ oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
$ oc extract secret/my-cluster-clients-ca --keys=ca.key --to=- > ca.key

步骤三:获取对外的router域名

1
2
$ oc get route my-cluster-kafka-bootstrap --template={{range.status.ingress}}{{.host}}{{println}}{{end}}
my-cluster-kafka-bootstrap-myproject.apps.paas1.mbcloud.com

步骤四:集群外网环境创建Kafka Producer,连接Kafka

1
2
3
4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['my-cluster-kafka-bootstrap-myproject.apps.paas1.mbcloud.com:443'], ssl_check_hostname=False, ssl_cafile='ca.crt', ssl_certfile='ca.key', security_protocol='SSL')
producer.send('my-topic', 'hello world')
producer.close()

步骤三:集群外网环境新建Kafka Consumer,消费Kafka中my-topic主题的数据

1
2
3
4
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic', bootstrap_servers=['my-cluster-kafka-bootstrap-myproject.apps.paas1.mbcloud.com:443'], ssl_check_hostname=False, ssl_cafile='ca.crt', ssl_certfile='ca.key', security_protocol='SSL')
for msg in consumer:
    print(msg)

3.2.4.2 测试结果

通过Route域名连接到Kafka集群,Producer将会往Topic my-topic发送一条“hello world”的消息,Consumer将会将消息输出到console端

1
2
C:\Python27\python.exe D:/workspace/mb_autopf/utils/kafka-test/consumer.py
ConsumerRecord(topic=u'my-topic', partition=0, offset=333340, timestamp=1558514729435L, timestamp_type=0, key=None, value='hello world', checksum=-459554926, serialized_key_size=-1, serialized_value_size=11)

3.3 Kafka性能测试

3.3.1 测试工具:Kafka自带的压测脚本

1
2
bin/kafka-producer-perf-test.sh
bin/kafka-consumer-perf-test.sh

3.3.1.1 参数说明:

bin/kafka-producer-perf-test.sh
–num-records:发送的总的消息数 –record-size:消息大小
–throughput:最大消息吞吐限制
–producer-props:kafka集群参数
acks:1默认值副本收到数据得到确认后发送下一条数据,0可靠性最低无需确认副本是否收到数据,-1可靠性最高等待副本所有的follower都确认收到数据后再发送下一条数据
retries:2
linger.ms:基于时间的batching策略1
batch.size:基于大小的batching策略100000
compression.type:压缩类型,速度排序lz4 = snappy < gzip

以下条件之一达到了,认为一个batch是完毕的

1. batch.size达到
2. linger.ms达到
3. 同一个broker的其他batch已经完毕
4. flush()和close()被调用

bin/kafka-consumer-perf-test.sh
–zookeeper:zookeeper连接信息
–broker-list:kafka连接信息
–topic:topic名称
–fetch-size:每次fetch的数据大小,1048576为1M
–messages:总共要消费的消息数

3.3.1.2 测试命令

1
2
bin/kafka-producer-perf-test.sh --topic my-topic --num-records 100000000 --record-size 100 --throughput 5000000 --producer-props bootstrap.servers=my-cluster-kafka-bootstrap.myproject.svc:9092 retries=2 batch.size=100000
bin/kafka-consumer-perf-test.sh --broker-list my-cluster-kafka-bootstrap.myproject.svc:9092 --topic my-topic --messages 100000000

3.3.2 Kafka生产消息压力测试[集群内部]

3.3.2.1 测试方法和步骤

步骤一:创建4种类型Topic:1副本1分区、1副本3分区、3副本1分区、3副本3分区

步骤二:运行kafka写入压测命令

1
bin/kafka-producer-perf-test.sh --topic my-topic --num-records 10000000 --record-size 100 --throughput 5000000 --producer-props bootstrap.servers=my-cluster-kafka-bootstrap.myproject.svc:9092 linger.ms=1

3.3.2.2 测试结果

使用kafka-producer-perf-test.sh测试Kafka消息写入性能:

副本数 分区数 线程数 消息/s 流量MB/s 延时99th ms
1 1 1 1514692 144.45 2
1 3 1 1462009 139.43 7
3 1 1 987069 94.13 9
3 3 1 918020 87.55 23

3.3.3 Kafka 消费消息压力测试[集群内部]

3.3.3.1 测试方法和步骤

步骤一:使用3.3.2中创建的消息进行消费

步骤二:运行kafka消费压测命令

1
bin/kafka-consumer-perf-test.sh --broker-list my-cluster-kafka-bootstrap.myproject.svc:9092 --topic my-topic --fetch-size 1048576 --messages 100000000 --threads 1

3.3.3.2 测试结果

使用kafka-consumer-perf-test.sh测试Kafka消息消费性能:

副本数 分区数 线程数 消息/s 流量MB/s
1 1 1 800262.4861 76.3190
1 3 1 1000100.0100 95.3770
3 1 1 3037667.0717 289.6945
3 3 1 796854.8513 75.9940
1 1 3 800409.8098 76.3330
1 3 3 974896.4173 92.9734
3 1 3 3033060.3579 289.2552
3 3 3 795820.1480 75.8953

3.3.4 Kafka生产消息压力测试[集群外部NodePort方案]

3.3.4.1 测试方法和步骤

步骤一:创建4种类型Topic:1副本1分区、1副本3分区、3副本1分区、3副本3分区

步骤二:运行kafka写入压测命令

1
bin/kafka-producer-perf-test.sh --topic my-topic --num-records 10000000 --record-size 100 --throughput 5000000 --producer-props bootstrap.servers=99.248.82.11:32678 linger.ms=1

3.3.4.2 测试结果

使用kafka-producer-perf-test.sh测试Kafka消息写入性能:

副本数 分区数 线程数 消息/s 流量MB/s 延时99th ms
1 1 1 884486 84.35 34
1 3 1 880669 83.99 73
3 1 1 822842 78.47 258
3 3 1 607053 57.89 193

3.3.5 Kafka 消费消息压力测试[集群外部NodePort方案]

3.3.5.1 测试方法和步骤

步骤一:使用3.3.2中创建的消息进行消费

步骤二:运行kafka消费压测命令

1
bin/kafka-consumer-perf-test.sh --broker-list 99.248.82.11:32678 --topic my-topic --fetch-size 1048576 --messages 100000000 --threads 1

3.3.5.2 测试结果

副本数 分区数 线程数 消息/s 流量MB/s
1 1 1 387672.0295 36.9713
1 3 1 389226.2183 37.1195
3 1 1 564939.8339 53.8769
3 3 1 432991.3929 41.2932
1 1 3 386488.3667 36.8584
1 3 3 389468.7646 37.1426
3 1 3 563920.3744 53.7796
3 3 3 433067.1543 41.3004

3.4 附录

3.4.1 镜像列表

harbor.apps.it.mbcloud.com/amq7/amq-streams-cluster-operator:1.1.0
harbor.apps.it.mbcloud.com/amq7/amq-streams-zookeeper:1.1.0-kafka-2.1.1
harbor.apps.it.mbcloud.com/amq7/amq-streams-kafka:1.1.0-kafka-2.1.1
harbor.apps.it.mbcloud.com/amq7/amq-streams-kafka-connect:1.1.0-kafka-2.1.1
harbor.apps.it.mbcloud.com/amq7/amq-streams-kafka-connect-s2i:1.1.0-kafka-2.1.1
harbor.apps.it.mbcloud.com/amq7/amq-streams-kafka-mirror-maker:1.1.0-kafka-2.1.1
harbor.apps.it.mbcloud.com/amq7/amq-streams-topic-operator:1.1.0
harbor.apps.it.mbcloud.com/amq7/amq-streams-user-operator:1.1.0
harbor.apps.it.mbcloud.com/amq7/amq-streams-kafka-init:1.1.0
harbor.apps.it.mbcloud.com/amq7/amq-streams-zookeeper-stunnel:1.1.0
harbor.apps.it.mbcloud.com/amq7/amq-streams-kafka-stunnel:1.1.0
harbor.apps.it.mbcloud.com/amq7/amq-streams-entity-operator-stunnel:1.1.0

3.4.2 部署脚本

strimzi-kafka-operator

百度云盘:/招银云创/openshift/redhat-kafka/

while true; do a=$(date); echo $a; echo $a > ~/a.txt; ./kafka-console-producer.sh –broker-list 99.248.82.11:31761,99.248.82.12:31304,99.248.82.21:31271 –topic my_topic < ~/a.txt; done

./kafka-console-consumer.sh –bootstrap-server 99.248.82.11:31761,99.248.82.12:31304,99.248.82.21:31271 –topic my_topic