Skip to content

Fluentd & Fluent Bit

Fluentd

Introduction

...

Deploy With Binary

Quick Start

bash
# Ubuntu Package install
# https://docs.fluentd.org/installation/install-by-deb

Config and Boot

[[sc-fluentd|Fluentd Config]]

bash
# change storage permission
# td-agent
chown td-agent.td-agent /opt/log_path/ -R
# fluentd
chown _fluentd:_fluentd /opt/log_path/ -R

# boot 
systemctl daemon-reload
systemctl start td-agent.service
systemctl enable td-agent.service

Verify

bash
# syntax check
# td-agent
td-agent -c td-agent.conf --dry-run
# fluentd
fluentd -c fluentd.conf --dry-run

Troubleshooting

bash
#

Deploy With Container

Run by Resource

bash
# https://docs.fluentd.org/container-deployment/kubernetes

Run in Kubernetes

bash
# add and update repo
helm repo add fluent https://fluent.github.io/helm-charts
helm update

# get charts package
helm pull fluent/fluentd --untar
cd fluentd

# configure and run
vim values.yaml
...
helm -n logging install fluentd .

Fluent Bit

Introduction

Fluent Bit 是一个开源的多平台日志处理器工具,它旨在成为用于日志处理和分发的通用利器。 如今,系统中信息源数量正在不断增加。处理大规模数据非常复杂,收集和汇总各种数据需要一种专门的工具,该工具可以解决如下问题:

  • 不同的数据源
  • 不同的数据格式
  • 数据可靠性
  • 安全性
  • 灵活的路由
  • 多目的地 Fluent Bit 在设计时就考虑了高性能和低资源消耗。

Fluent Bit & Fluentd 区别 Fluentd 和 Fluent Bit 都可以充当聚合器或转发器,它们可以互补使用或单独用作为解决方案。详情

Deploy With Binary

bash
# source code download
https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit

# create source list dir
mkdir -p /usr/share/keyrings/
mkdir -p /etc/apt/sources.list.d/
touch /etc/apt/sources.list.d/fluent-bit.list

# install GPG key and source list
curl https://packages.fluentbit.io/fluentbit.key | gpg --dearmor > /usr/share/keyrings/fluentbit-keyring.gpg
cat > /etc/apt/sources.list.d/fluent-bit.list << "EOF"
"deb [signed-by=/usr/share/keyrings/fluentbit-keyring.gpg] https://packages.fluentbit.io/ubuntu/focal focal main"
EOF

# install td-agent-bit
apt update
apt install td-agent-bit

# configuration file
cat /etc/td-agent-bit/td-agent-bit.conf

# start service
systemctl start td-agent-bit.service 
systemctl enable td-agent-bit.service

Deploy With Container

相关概念

Kubernetes 管理 nodes 集群,因此我们的日志代理工具需要在每个节点上运行以从每个 POD 收集日志,因此Fluent Bit 被部署为 DaemonSet(在集群的每个 node 上运行的 POD)。 当 Fluent Bit 运行时,它将读取,解析和过滤每个 POD 的日志,并将使用以下信息(元数据)丰富每条数据:

  • Pod Name
  • Pod ID
  • Container Name
  • Container ID
  • Labels
  • Annotations

日志输出方式

当前集群环境容器日志都为 console 输出,分为两部分:

  • 输出到 Elasticsearch,用于 CMDB / Kibana 前台搜索日志
  • 输出到 forward 接口,接口由 fluentd 服务提供并持久化日志,本地存储15天,归档日志到谷歌云 Cloud Storage 存储桶备份

helm 下载 charts 包

[[cc-helm|helm常用命令]]

bash
# 创建可观测性 chart 包目录
mkdir /opt/helm-charts/logging
cd /opt/helm-charts/logging

# 添加 helm 仓库,下载 fluent-bit charts 包
helm repo add fluent https://fluent.github.io/helm-charts
helm update
helm pull fluent/fluent-bit --untar
cd fluent-bit

# config
vim values.yaml
...
  inputs: |
    [INPUT]
        Name tail
        Path /var/log/containers/frontend*.log,/var/log/containers/backend*.log
        Exclude_path *fluent-bit-*,*fluentbit-*,*rancher-*,*cattle-*,*sysctl-*
        multiline.parser docker, cri
        Tag kube.*
        # 指定tail插件使用的最大内存,如果达到限制,插件会停止采集,刷新数据后会恢复
        Mem_Buf_Limit 15MB
        Buffer_Chunk_Size 1M
        Buffer_Max_Size 5M
        Skip_Long_Lines On
        Skip_Empty_Lines On
        Refresh_Interval 10
  filters: |
    [FILTER]
        Name kubernetes
        Match kube.*
        Kube_Tag_Prefix kube.var.log.containers.
        # 解析log字段的json内容,提取到根层级, 附加到Merge_Log_Key指定的字段上
        Merge_Log Off
        Keep_Log Off
        K8S-Logging.Parser Off
        K8S-Logging.Exclude Off
        Labels Off
        Annotations Off
    # nest过滤器主要是对包含pod_name的日志,在其字段中追加kubernetes_前缀
    [FILTER]
        Name         nest
        Match        kube.*
        Wildcard     pod_name
        Operation    lift
        Nested_under kubernetes
        Add_prefix   kubernetes_
    # modify过滤器主要是调整部分kubernetes元数据字段名,同时追加一些额外的字段
    [FILTER]
        Name modify
        Match kube.*
        # 将log字段重命名为message
        Rename log message
        # 移除冗余 kubernetes 字段数据
        Remove kubernetes_container_image
        Remove kubernetes_container_hash
    # 将错误日志由多行转为一行
    [FILTER]
        name multiline
        match kube.*
        multiline.key_content message
        multiline.parser multiline_stacktrace_parser
    # 自定义lua函数过滤,设置 es 索引名称字段
    [FILTER]
        Name    lua
        Match   kube.*
        script  /fluent-bit/etc/fluentbit.lua
        call    set_index
  outputs: |
    [OUTPUT]
        Name es
        Match kube.*
        #Host 172.30.2.218
        Host 172.30.2.236
        Port 9200
        HTTP_User elastic
        HTTP_Passwd elastic123
        Logstash_Format On
        #Logstash_Prefix logstash-uat_
        Logstash_Prefix_Key $es_index
        Logstash_DateFormat %Y-%m-%d
        Suppress_Type_Name On
        Retry_Limit False
        
  customParsers: |
    [PARSER]
        Name docker_no_time
        Format json
        Time_Keep Off
        Time_Key time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
    [MULTILINE_PARSER]
        name multiline_stacktrace_parser
        type regex
        flush_timeout 1000
        rule "start_state"      "/\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.*/" "exception_name"
        rule "exception_name"   "/(\w+\.)+\w+: .*/"                        "cont"
        rule "cont"             "/^\s+at.*/"                               "cont"
        
  extraFiles:
      # 自定义 lua 文件
      fluentbit.lua: |
        function set_index(tag, timestamp, record)
            prefix = "logstash-uat"
            if record["kubernetes_container_name"] ~= nil then
                project_initial_name = record["kubernetes_container_name"]
                project_name, _ = string.gsub(project_initial_name, '-', '_')
                record["es_index"] = prefix .. "_" .. project_name
                return 1, timestamp, record
            end
            return 1, timestamp, record
        end

配置启动

bash
# config
cat > values.yaml << "EOF"
config:
  service: |
    [SERVICE]
        Daemon Off
        Flush {{ .Values.flush }}
        Log_Level {{ .Values.logLevel }}
        Parsers_File parsers.conf
        Parsers_File custom_parsers.conf
        HTTP_Server On
        HTTP_Listen 0.0.0.0
        HTTP_Port {{ .Values.metricsPort }}
        Health_Check On
  inputs: |
    [INPUT]
        Name tail
        Path /var/log/containers/public*.log
        Exclude_path *fluent-bit-*,*fluentbit-*,*rancher-*,*cattle-*,*sysctl-*
        multiline.parser docker, cri
        Tag kube.*
        # 指定tail插件使用的最大内存,如果达到限制,插件会停止采集,刷新数据后会恢复
        Mem_Buf_Limit 15MB
        # 初始buffer size
        Buffer_Chunk_Size 1M
        # 每个文件的最大buffer size
        Buffer_Max_Size 5M
        # 跳过长度大于 Buffer_Max_Size 的行,Skip_Long_Lines 若设为Off遇到超过长度的行会停止采集        
        Skip_Long_Lines On
        # 跳过空行
        Skip_Empty_Lines On
        # 监控日志文件 refresh 间隔
        Refresh_Interval 10
        # 采集文件没有数据库偏移位置记录的,从文件的头部开始读取,日志文件较大时会导致fluent内存占用率升高出现oomkill
        #Read_from_Head On
  filters: |
    [FILTER]
        Name kubernetes
        Match kube.*
        # 当源日志来自tail插件,用于指定tail插件使用的前缀值
        Kube_Tag_Prefix kube.var.log.containers.
        # 解析log字段的json内容,提取到根层级, 附加到Merge_Log_Key指定的字段上
        Merge_Log Off
        # 合并log字段后是否保持原始log字段
        Keep_Log Off
        # 允许Kubernetes Pod 建议预定义的解析器
        K8S-Logging.Parser Off
        # 允许Kubernetes Pod 从日志处理器中排除其日志
        K8S-Logging.Exclude Off
        # 是否在额外的元数据中包含 Kubernetes 资源标签信息
        Labels Off
        # 是否在额外的元数据中包括 Kubernetes 资源信息
        Annotations Off
    # nest过滤器主要是对包含pod_name的日志,在其字段中追加kubernetes_前缀
    [FILTER]
        Name         nest
        Match        kube.*
        Wildcard     pod_name
        Operation    lift
        Nested_under kubernetes
        Add_prefix   kubernetes_
    # modify过滤器主要是调整部分kubernetes元数据字段名,同时追加一些额外的字段
    [FILTER]
        # 使用modify过滤器
        Name modify
        Match kube.*
        # 将log字段重命名为message
        Rename log message
        # 将kubernetes_host字段重命名为host_ip
        Rename kubernetes_host host_ip
        # 将kubernetes_pod_name字段重命名为host
        Rename kubernetes_pod_name host
        # 移除所有匹配kubernetes_的字段
        # Remove_wildcard kubernetes_
    # 将错误日志由多行转为一行
    [FILTER]
        name multiline
        match kube.*
        multiline.key_content message
        multiline.parser multiline_stacktrace_parser
    # 自定义lua函数过滤,设置 es 索引名称字段
    [FILTER]
        Name    lua
        Match   kube.*
        script  /fluent-bit/etc/fluentbit.lua
        call    set_index
    # 自定义lua函数过滤,新增local_time字段,用于es查询
    [FILTER]
        Name    lua
        Match   kube.*
        script  /fluent-bit/etc/add_local_time.lua
        call    add_local_time
  outputs: |
    # 输出到 ES 相关配置
    [OUTPUT]
        Name es
        Match kube.*
        Host 172.30.2.218
        Port 9200
        HTTP_User elastic
        HTTP_Passwd es123
        Logstash_Format On
        Logstash_Prefix logstash-uat_
        Logstash_Prefix_Key $es_index
        Logstash_DateFormat %Y-%m-%d
        Suppress_Type_Name On
        Retry_Limit False
    [OUTPUT]
        Name forward
        Match kube.*
        Host 172.30.2.54
        Port 24224
        Compress gzip
    [OUTPUT]
        Name http
        Match kube.*
        Host 172.30.2.54
        Port 5999
        Format json_lines
    #[OUTPUT]
    #    name stdout
    #    Match kube.*
  customParsers: |
    [PARSER]
        Name docker_no_time
        Format json
        Time_Keep Off
        Time_Key time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
    [MULTILINE_PARSER]
        name multiline_stacktrace_parser
        type regex
        flush_timeout 1000
        rule "start_state"      "/\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.*/" "exception_name"
        rule "exception_name"   "/(\w+\.)+\w+: .*/"                        "cont" 
        rule "cont"             "/^\s+at.*/"                               "cont"
  extraFiles:
      fluentbit.lua: |
        function set_index(tag, timestamp, record)
            prefix = "logstash-uat"
            if record["kubernetes_container_name"] ~= nil then
                project_initial_name = record["kubernetes_container_name"]
                project_name, _ = string.gsub(project_initial_name, '-', '_')
                record["es_index"] = prefix .. "_" .. project_name
                return 1, timestamp, record
            end
            return 1, timestamp, record
        end
        function filter_error_log(tag, timestamp, record)
            if string.find(string.lower(record["message"] or ""), "%[error") then
                record["severity"] = "ERROR"
            end
            return 1, timestamp, record
        end
      add_local_time.lua: |
        function add_local_time(tag, timestamp, record)
           --local os_date = os.date("%Y-%m-%dT%H:%M:%SZ")
           local os_date = os.date("%Y-%m-%dT00:00:00.000Z")
           record["local_time"] = os_date
           return 1, timestamp, record
        end
logLevel: info
EOF

# start 
helm -n logging install fluent-bit-uat .

快速部署 fluent-bit & es 服务(仅用于测试环境)

bash
kubectl create -f https://raw.githubusercontent.com/fluent/fluent-bit-kubernetes-logging/master/output/elasticsearch/fluent-bit-ds.yaml

OUTPUT 插件服务相关配置

Elasticsearch 配置

[[cc-elasticsearch|ES 常用命令]] [[sc-elasticsearch|ES 常用配置]]

bash
# elasticsearch 部署配置:略


# ES 调整配置
# 1、写入的索引名称定义,通过 fluent-bit 写入前定义好 index 名称
# 2、分词器安装
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.4.3/elasticsearch-analysis-ik-8.4.3.zip
# 3、索引模板创建:副本数、分片、生命周期策略设置
curl -X PUT 'http://172.30.2.218:9200/_template/logstash_template' \
-H 'Content-Type: application/json' \
-d '{
        "order": 100,
        "version": 8010099,
        "index_patterns": [
            "logstash-*"
        ],
        "settings": {
            "index": {
                "max_result_window": "1000000",
                "refresh_interval": "5s",
                "number_of_shards": "1",
                "number_of_replicas": "0",
                "lifecycle.name": "7-days-default",
                "lifecycle.rollover_alias": "7-days-default"
            }
        },
        "mappings": {
            "dynamic_templates": [
                {
                    "message_field": {
                        "path_match": "message",
                        "mapping": {
                            "norms": false,
                            "analyzer": "ik_max_word"
                        },
                        "match_mapping_type": "string"
                    }
                },
                {
                    "string_fields": {
                        "mapping": {
                            "analyzer": "ik_max_word",
                            "fields": {
                                "keyword": {
                                    "ignore_above": 256,
                                    "type": "keyword"
                                }
                            }
                        },
                        "match_mapping_type": "string",
                        "match": "*"
                    }
                }
            ],
            "properties": {
                "@timestamp": {
                    "type": "date"
                },
                "geoip": {
                    "dynamic": true,
                    "properties": {
                        "ip": {
                            "type": "ip"
                        },
                        "latitude": {
                            "type": "half_float"
                        },
                        "location": {
                            "type": "geo_point"
                        },
                        "longitude": {
                            "type": "half_float"
                        }
                    }
                },
                "local_time": {
                    "type": "date"
                },
                "@version": {
                    "type": "keyword"
                }
            }
        },
        "aliases": {}
    }'

Logstash 配置

[[sc-logstash|logstash 常用配置]]

bash
# 下载解压
cd /opt
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.4.3-linux-x86_64.tar.gz
tar xf logstash-8.4.3-linux-x86_64.tar.gz && rm -f logstash-8.4.3-linux-x86_64.tar.gz

# 配置
mkdir -p config/conf.d/
cat > config/conf.d/logstash.conf << "EOF"
# filebeat input
input {
    beats {
      port => 5044
    }
}
# http input
input {
  http {
    host => "0.0.0.0"
    port => 5999
    additional_codecs => {"application/json"=>"json"}
    codec => json {charset=>"UTF-8"}
    ssl => false
  }
}
filter {
    ruby {
        code => "
            event.set('local_time' , Time.now.strftime('%Y-%m-%d'))
            event.set('backup_time' , Time.now.strftime('%Y-%m-%d'))
        "
    }

    if [agent][type] == "filebeat" {
        mutate { update => { "host" => '%{[agent][name]}' }}
        mutate { replace => { "source" => '%{[log][file][path]}' }}
    }

    else if [user_agent][original] == "Fluent-Bit" {
      json {
        source => "message"
      }

      mutate {
        add_field => { "index_name" => "%{[kubernetes_container_name]}" }
      }

      mutate {
        gsub => ["[index_name]", "-", "_"]
      }
    }
}
output {
    #stdout { codec => rubydebug } 

    # fluent-bit backup
    if [user_agent][original] == "Fluent-Bit" {
      file {
          path => "/opt/backup_logs/%{backup_time}/%{host_ip}_%{index_name}/%{index_name}.gz"
          gzip => true
          codec =>  line {
              format => "[%{index_name} -->| %{message}"
              }
          }
    }
    # filebeast log to es
    if [agent][type] == "filebeat" {
      elasticsearch {
        hosts => ["http://1.1.1.1:9200"]
        user => elastic
        password => "es123"
        index => "logstash-uat_%{index_name}-%{local_time}"
      }
    }
}

谷歌云存储桶服务配置

bash
# 1、创建存储桶:xxx_logs_store
# 2、创建存储桶日志目录(非必需):backup_logs
# 3、将 logstash 备份日志目录使用 gcloud 工具定时任务上传谷歌云存储桶服务

Reference:

  1. Official Website
  2. Repository
  3. fluentd-beat plugin
  4. Fluentd-bit
  5. Fluentd-bit Repository