本小节为您介绍如何配置和验证 logstash-kafka 相关插件。

前提条件

  • 已获取管理控制台登录账号和密码,且已获取集群操作权限。

  • Kafka 集群和 OpenSearch 集群状态为活跃

步骤 1:准备 kafka 环境

  1. 创建 Kafka 集群。

    说明

    要求 Kafka 版本为: 2.3.1 -v2.0.2

    登录控制台,按照以下规格创建一个 Kafka 集群,详细操作请参见创建 Kafka 集群

    节点角色 节点规格 节点数量

    Kafka节点

    1c2g,30G

    3

    客户端节点

    1c1g,10G

    1

    logstash kafka 01
  2. 创建测试用 Topic。

    浏览器登录 Kafka Manager 界面并创建以下两个 topic,详细操作请参见创建 Topic

    • es-output-topic:用于测试 logstash-output-kafka

    • es-test-topic:用于测试 logstash-input-kafka

    logstash kafka 02

步骤 2:准备 OpenSearch 集群

登录控制台,按照以下规格创建一个 OpenSearch 集群,详细操作请参见创建 OpenSearch 集群

节点角色 节点规格 节点数量

dashboard节点

2c4g

1

logstash节点

2c4g,10G

1

专有主节点

2c4g,10G

1

热节点

2c4g,60G

2

logstash kafka 03

步骤 3:验证 logstash-output-kafka

  1. 配置 Logstash。

    将 Logstash 节点的output_conf_content参数设置为以下内容,详细操作请参见修改配置参数

    kafka {
        topic_id => "es-output-topic"
        bootstrap_servers => "172.22.2.124:9092"
    }
    logstash kafka 04
  2. 重启 Logstash节点,详细操作请参见重启节点

    logstash kafka 05
  3. 登录 Logstash 节点服务器,进入 shell。

  4. 执行以下命令,使用 logstash-input-http 向logstash写入数据。

    curl -d "msg 1" 172.22.2.48:9700
    curl -d "msg 2" 172.22.2.48:9700
    curl -d "msg 3" 172.22.2.48:9700
    curl -d "msg 4" 172.22.2.48:9700
    curl -d "msg 5" 172.22.2.48:9700
  5. 登录 Kafka 客户端节点,详细操作请参见登录 Kafka 客户端

  6. 执行以下命令,查看消息接收情况。

    ./kafka-console-consumer.sh --bootstrap-server 172.22.2.124:9092 --topic es-output-topic --from-beginning
    logstash kafka 06

步骤 4:验证 logstash-input-kafka

  1. 配置 Logstash。

    将 Logstash 节点的input_conf_content参数设置为以下内容,详细操作请参见修改配置参数

    kafka {
        topics => ["es-test-topic"]
        bootstrap_servers => "172.22.2.124:9092"
    }
    logstash kafka 07
  2. 重启 Logstash节点,详细操作请参见重启节点

    logstash kafka 05
  3. 登录 Kafka 客户端节点,详细操作请参见登录 Kafka 客户端

  4. 执行以下命令,查看消息接收情况。

    ./kafka-console-producer.sh --broker-list 172.22.2.124:9092,172.22.2.125:9092,192.22.2.126:9092 --topic es-test-topic
    logstash kafka 08

步骤 5:登录 Dashboard 查看数据写入情况。

  1. 浏览器登录 Dashboard,详细操作请参见访问 Dashboard

  2. 创建索引,详细操作可参见创建索引策略

  3. 选择 Management > Dev Tools

  4. 执行以下命令,查看索引。

    GET _cat/indices

    使用 logstash-input-kafka 插件,从 topic 消费的消息被写入 Opensearch 集群,索引名称 logstash-yyyy.MM.dd。

    logstash kafka 09
  5. 执行以下命令,查看索引中的内容,包含前述在 topic 中写入的消息。

    GET /logstash-2022.08.16/_search
    logstash kafka 10