步骤二:创建 Topic
功能概述
Kafka 通过 Topic 进行生产消息和消费消息,生产者往 Topic 中写消息,消费者从 Topic 中读消息。
操作步骤
通过 Kafka Manager 创建 Topic
用户可以通过 Kafka Manager 创建 Topic,详情可参考使用 Kafka Manager 管理 Topic。
通过 Kafbat UI 创建 Topic
-
在本地浏览器里输入 Kafbat UI 地址
http://client_ip:port。-
client_ip 为 客户端节点的 IP 地址。
-
内网连接:IP 地址为客户端节点内网 IP。
-
公网连接:IP 地址为客户端节点绑定的公网 IP。
-
-
port 为 Kafbat UI 的访问端口,可通过集群配置参数 kafka-manager.port 进行设置,默认为
9000。
-
-
如果集群配置参数
kafka-manager.basicAuthentication.enabled设置为true,则表示需要登录,请使用配置的帐号登录。该参数默认为false,即不需要登录。集群参数配置可参考修改配置参数。 -
在 Kafbat UI 页面,点击左侧 Topics,进入 Topic 列表页面。
-
在 Topic 列表页面,点击右上角的 + Add Topic,弹出 Topic 创建对话框。
-
用户根据页面信息,填写相关参数后,点击Create Topic,完成 Topic 创建操作。
Topic Name为必填项,其余参数若不填写,系统会使用集群级别默认参数。
使用 Kafka 命令行工具创建 Topic
-
为了方便用户通过本地访问 Kafka 集群,用户可以为客户端节点开启公网访问,或通过 VPN 的方式打通网络,以确保本地服务器可以访问 Kafka 集群网络。
-
在本地服务器终端,执行以下命令行,创建 Topic。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num> # 已开启 SASL ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num> --command-config /ssl/kafka.config命令行参数说明
参数 说明 topic_name
Topic 名称。
Kafka 连接地址
可在 Kafka 集群详情页面,获取连接地址。
partition_num
Partition 数。每个 Topic 被拆分成多个 Partition,每个 Partition 由一系列有序的消息组成。
replication_num
副本数量。
-
在本地服务器终端,执行以下命令行,查看刚创建的 Topic。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址> # 已开启 SASL ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,平衡 Topic 分区 Leader。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址> # 已开启 SASL ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,更改 Topic 配置参数。用户也可以在创建 Topic 的时候指定,格式为
--config a=b --config x=y。cd /opt/kafka/current/bin # 未开启 SASL ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value> # 已开启 SASL ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,修改 Topic 分区。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num> # 已开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,删除 Topic。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name> # 已开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name> --command-config /ssl/kafka.config