开启 SASL 后,数据被加密传输,安全性更高。

本小节主要介绍如何在命令行模式下使用 SASL 证书连接 Kafka,并通过 Kafka 生产消费消息。

您也可以在业务代码中连接 Kafka,具体操作请参见 SDK 文档

前提条件

  • 已创建 Kafka 集群,且集群状态为活跃

  • 已创建 Topic,并获取 Topic 名称。

  • 已获取 Kafka 连接地址。

  • 已开启 SASL。

发送消息

  1. 登录 Kafka 客户端。

  2. 执行以下命令,进入命令行工具所在目录。

    cd /opt/kafka/current/bin
  3. 执行以下命令,向 Topic 发送消息。

    ./kafka-console-producer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name> --producer.config /ssl/kafka.config

    参数说明

    • Kafka 连接地址:所连接的 Kafka 集群的地址。

    • topic_name:已创建的 Topic 名称。

    示例

    如下以私网访问为例,Kafka 连接地址为 “192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092”。

    输入需要发送的消息内容,按 Enter 发送消息,每一行的内容都将作为一条消息发送到 Topic。

    $ cd /opt/kafka/current/bin
    $ ./kafka-console-producer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test --producer.config /ssl/kafka.config
    
    >hi
    >hello world
    >how are you

    如需停止发送消息使用 Ctrl+C 命令退出。

消费消息

  1. 登录 Kafka 客户端。

  2. 执行以下命令,进入命令行工具所在目录。

    cd /opt/kafka/current/bin
  3. 执行以下命令,消费 Topic 消息。

    ./kafka-console-consumer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name> --group <group_name> --from-beginning --consumer.config /ssl/kafka.config

    参数说明

    • Kafka 连接地址:所连接的 Kafka 集群的地址。

    • topic_name:已创建的 Topic 名称。

    • group_name:根据实际业务需求,设定消费组名称。

    • from-beginning 表示按顺序开始消费,若没有使用 --from-beginning,则从最新的开始消费。

    示例

    $ cd /opt/kafka/current/bin
    $ ./kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test --from-beginning --group test --consumer.config /ssl/kafka.config
    hi
    hello world
    how are you

    如需停止消费消息使用 Ctrl+C 命令退出。

查看 Topic 消息分布情况

如下示例表示查看 test 消息分布情况。

$ cd /opt/kafka/current/bin
$ ./kafka-topics.sh --describe --zookeeper 192.168.0.6:2181,192.168.0.8:2181,192.168.0.7:2181/kafka/cl-zom1un35 --topic test --command-config /ssl/kafka.config
Topic:test	  PartitionCount:3	  ReplicationFactor:1	  Configs:
Topic: test	Partition: 0	Leader: -1	Replicas: 1	Isr: 1
Topic: test	Partition: 1	Leader: -1	Replicas: 2	Isr: 2
Topic: test	Partition: 2	Leader: 3	Replicas: 3	Isr: 3

查看消费者消费情况

kafka 0.9.0.0 以后的版本使用 kafka-consumer-groups.sh 查看消费者消费情况。

$ cd /opt/kafka/current/bin
$ ./kafka-consumer-groups.sh --bootstrap-server 192.168.0.3:9092,192.168.0.4:9092,192.168.0.9:9092 --describe --group my-group --command-config /ssl/kafka.config
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                          0          10              10              0          consumer-1-0000f0c2-eee7-432b-833b-c882334c8f71   /192.168.100.26                consumer-1
test                          1          7               7               0          consumer-1-0000f0c2-eee7-432b-833b-c882334c8f71   /192.168.100.26                consumer-1

kafka 0.9.0.0 以前的版本使用 kafka-consumer-offset-checker.sh 查看消费者消费情况。

$ cd /opt/kafka/current/bin
$ ./kafka-consumer-offset-checker.sh  --zookeeper 192.168.0.6:2181,192.168.0.8:2181,192.168.0.7:2181/kafka/cl-zom1un35 --topic test --group my-group --command-config /ssl/kafka.onfig