操作步骤

  1. 通过 VNC 登录 Kafka 客户端节点。详情参考通过 VNC 登录 Kafka

  2. 在客户端节点执行以下命令行,向 Topic 发送消息。

    • 命令行

      cd /opt/kafka/current/bin
      ./kafka-console-producer.sh --broker-list <连接地址> --topic <Topic 名称>
    • 参数说明

      参数 说明

      连接地址

      可在 Kafka 集群详情页面,查看连接地址。

      Topic 名称

      目标 Topic 名称,需根据实际情况进行填写。

    • 命令行示例

      $ cd /opt/kafka/current/bin
      ./kafka-console-producer.sh --broker-list 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test
  3. 输入需要发送的消息内容后,按 Enter 键发送消息,每一行的内容都将作为一条消息发送至 Kafka。

    >hi
    >hello world
    >how are you

附录

如何增大生产者发送消息的大小?

Kafka 默认生产者发送的消息不能超过 1M,如需增大生产者发送消息的大小,用户需要在生产端和服务端修改相关参数。

  • 生产端

    • 若生产者的消息大于 1M、小于等于 32M,增大 max.request.size 参数即可。

    • 若生产者的消息大于 32M,需要同时增大 max.request.sizebuffer.memory 参数,且 buffer.memory 参数值必须大于 max.request.size 参数值。

  • 客户端

    针对消费者,如果生产者的消息较大,可以修改参数 fetch.message.max.bytes,增加 fetch 的最大字节数,加大每次 fetch 的消息大小。

  • 服务端

    生产端增大了 max.request.size 参数值,用户需增大服务端端 message.max.bytes 参数,否则服务端无法接收这么大的消息。详情可参考修改集群配置参数

    说明

    message.max.bytes 参数值必须大于生产端设置的 max.request.size 参数值。

Kafka 增减分区对生产者有什么影响?

  • 减少分区

    若此时生产者未发送消息,将不会对生产者造成影响。若此时生产者正在发送消息,可能会导致部分消息发送失败,需要在生产者回调方法里进行重试,重新发送。

    假设 Topic 有 3 个分区 p0,p1,p2,3 个 Broker 0,1,2p0broker0 上,p1broker1 上,p2broker2 上。详细说明如下。

    1. 生产者发送 Message 1~10 的消息给 Broker。

    2. 生产者开始发送消息时,会获取元数据信息。如下所示。

      消息          broker
      message1      0
      message2      1
      message3      2
      message4      0
      message5      1
      message6      2
    3. message1,2,3 能正常发送。

    4. message4,5,6 消息组装好后,还没发送之前,将 broker2 下线。

    5. message4,5 能正常发送,message6 会超时,抛异常,发送更新元数据请求。(需要在生产者回调方法里进行重试,否则消息发送不出去)

    6. 可用分区变成 p0p1,剩下的消息会发到 p0p1 分区。

      消息          broker
      message7      0
      message8      1
      message9      0
    7. message7,8,9 能正常发送。

    8. 在生产者回调方法里进行重试发送 message6,否则该消息发送不出去。

  • 增加分区

    增加分区将不会对生产者造成影响。

    假设 Topic 目前有 2 个分区 p0p1,3 个 Broker 0,1,2。p0broker0 上,p1broker1 上。

    1. 现在生产者发送很多消息给 Broker。

    2. 此时增加分区的个数,增加 1 个,也就是分区个数变成 3,如果元数据没有更新,那么消息是不能发送到分区 p2 的,只会发送到 p0p1

    3. 元数据更新后,消息才可以发到 p2。元数据更新频率为 5 分钟。