步骤三:生产消费消息
本小节主要介绍如何在命令行模式下连接 Kafka,并通过 Kafka 生产消费消息。
您也可以在业务代码中连接 Kafka,具体操作请参见 SDK 文档。
前提条件
-
已创建 Kafka 集群,且集群状态为
活跃
。 -
已创建 Topic,并获取 Topic 名称。
登录 Kafka 客户端
-
登录管理控制台。
-
选择产品与服务 > 消息队列与中间件 > Kafka 服务,进入 Kafka 集群列表页面。
-
选择目标集群,点击目标集群 ID,进入集群详情页面。
-
选择节点页签,点击节点名称右侧的 Web 终端,并登录客户端节点。
说明 -
Kafka 2.3.1 - v2.0.1(包含)之前版本:客户端节点用户名为
ubuntu
,初始密码为kafka
。 -
Kafka 2.3.1 - v2.0.1 之后版本:客户端节点用户名为
client
,初始密码为client
。
-
发送消息
-
在 Kafka 客户端,执行以下命令,进入命令行工具所在目录。
cd /opt/kafka/current/bin
-
执行以下命令,向 Topic 发送消息。
-
Kafka 未开启 SASL
./kafka-console-producer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name>
-
Kafka 已开启 SASL
./kafka-console-producer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name> --producer.config /ssl/kafka.config
参数说明:
-
Kafka 连接地址:格式为
host_ip1:port,host_ip2:port,host_ip3:port
,详细说明请参见 Kafka 连接地址说明。 -
topic_name:已创建的 Topic 名称。
示例:
如下以 Kafka 未开启 SASL、未开启公网访问为例,Kafka 连接地址为 “192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092”。
输入需要发送的消息内容,按 Enter 发送消息,每一行的内容都将作为一条消息发送到 Topic。
$ cd /opt/kafka/current/bin $ ./kafka-console-producer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test >hi >hello world >how are you
如需停止发送消息使用 Ctrl+C 命令退出。
-
消费消息
-
在 Kafka 客户端,执行以下命令,进入命令行工具所在目录。
cd /opt/kafka/current/bin
-
执行以下命令,消费 Topic 消息。
-
Kafka 未开启 SASL
./kafka-console-consumer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name> --group <group_name> --from-beginning
-
Kafka 已开启 SASL
./kafka-console-consumer.sh --bootstrap-server <Kafka 连接地址> --topic <topic_name> --group <group_name> --from-beginning --consumer.config /ssl/kafka.config
参数说明:
-
Kafka 连接地址:格式为
host_ip1:port,host_ip2:port,host_ip3:port
,详细说明请参见 Kafka 连接地址说明。 -
topic_name:已创建的 Topic 名称。
-
group_name:根据实际业务需求,设定消费组名称。
-
from-beginning 表示按顺序开始消费,若没有使用 --from-beginning,则从最新的开始消费。
示例:
$ cd /opt/kafka/current/bin $ ./kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test --from-beginning --group test hi hello world how are you
如需停止消费消息使用 Ctrl+C 命令退出。
-