已开启 SASL
开启 SASL 后,数据被加密传输,安全性更高。
本小节主要介绍如何在命令行模式下使用 SASL 证书连接 Kafka,并通过 Kafka 生产消费消息。
您也可以在业务代码中连接 Kafka,具体操作请参见 SDK 文档。
前提条件
-
已创建 Kafka 集群,且集群状态为
活跃
。 -
已创建 Topic,并获取 Topic 名称。
-
已获取 Kafka 连接地址。
-
已开启 SASL。
发送消息
-
登录 Kafka 客户端。
-
执行以下命令,进入命令行工具所在目录。
cd /opt/kafka/current/bin
-
执行以下命令,向 Topic 发送消息。
./kafka-console-producer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name> --producer.config /ssl/kafka.config
参数说明:
-
Kafka 连接地址:所连接的 Kafka 集群的地址。
-
topic_name:已创建的 Topic 名称。
示例:
如下以私网访问为例,Kafka 连接地址为 “192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092”。
输入需要发送的消息内容,按 Enter 发送消息,每一行的内容都将作为一条消息发送到 Topic。
$ cd /opt/kafka/current/bin $ ./kafka-console-producer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test --producer.config /ssl/kafka.config >hi >hello world >how are you
如需停止发送消息使用 Ctrl+C 命令退出。
-
消费消息
-
登录 Kafka 客户端。
-
执行以下命令,进入命令行工具所在目录。
cd /opt/kafka/current/bin
-
执行以下命令,消费 Topic 消息。
./kafka-console-consumer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name> --group <group_name> --from-beginning --consumer.config /ssl/kafka.config
参数说明:
-
Kafka 连接地址:所连接的 Kafka 集群的地址。
-
topic_name:已创建的 Topic 名称。
-
group_name:根据实际业务需求,设定消费组名称。
-
from-beginning 表示按顺序开始消费,若没有使用 --from-beginning,则从最新的开始消费。
示例:
$ cd /opt/kafka/current/bin $ ./kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test --from-beginning --group test --consumer.config /ssl/kafka.config hi hello world how are you
如需停止消费消息使用 Ctrl+C 命令退出。
-
查看 Topic 消息分布情况
如下示例表示查看 test 消息分布情况。
$ cd /opt/kafka/current/bin
$ ./kafka-topics.sh --describe --zookeeper 192.168.0.6:2181,192.168.0.8:2181,192.168.0.7:2181/kafka/cl-zom1un35 --topic test --command-config /ssl/kafka.config
Topic:test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: -1 Replicas: 1 Isr: 1
Topic: test Partition: 1 Leader: -1 Replicas: 2 Isr: 2
Topic: test Partition: 2 Leader: 3 Replicas: 3 Isr: 3
查看消费者消费情况
kafka 0.9.0.0 以后的版本使用 kafka-consumer-groups.sh 查看消费者消费情况。
$ cd /opt/kafka/current/bin
$ ./kafka-consumer-groups.sh --bootstrap-server 192.168.0.3:9092,192.168.0.4:9092,192.168.0.9:9092 --describe --group my-group --command-config /ssl/kafka.config
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 10 10 0 consumer-1-0000f0c2-eee7-432b-833b-c882334c8f71 /192.168.100.26 consumer-1
test 1 7 7 0 consumer-1-0000f0c2-eee7-432b-833b-c882334c8f71 /192.168.100.26 consumer-1
kafka 0.9.0.0 以前的版本使用 kafka-consumer-offset-checker.sh 查看消费者消费情况。
$ cd /opt/kafka/current/bin
$ ./kafka-consumer-offset-checker.sh --zookeeper 192.168.0.6:2181,192.168.0.8:2181,192.168.0.7:2181/kafka/cl-zom1un35 --topic test --group my-group --command-config /ssl/kafka.onfig