shanghaipc 发表于 2018-11-14 12:12:37

flume对nginx群集日志收集方案

  Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
  flume的基本概念
  如下图

  每个agent都具有三个元素,source、channel、sink。
  source:数据流的源。产生event。
  channel:可以理解成数据流的管道。传递event
  sink :数据流的终点。消耗event
  注:source可以上一节点的sink,sink可以指定为下一节点的source。比较常见的场景如下图

  为以收集日志,并做实时的集中存储,元素相应类型如下
  1.source : client端使用exec类型,通过tail –F 产生event。server端使用avro类型,用于接收client端发出的event
  2.channel : 使用file类型。(测试期使用了mem类似)
  3. sink: client端使用avro类型,传递给server端。server端使用file_roll类型,指定相应目录储存日志。最终方案会使用hdfs

  flume具体配置如下:
  1、将flume 解压到 /usr/local/flume下
  2、agent端配置
  flume允许对环境资源使用做设置,需要修改配置,/PREFIX/conf/flume-env.sh可以通过实际情况进行调整
  JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
  #此处PREFIX 代表 /usr/local/flume
  配置(/PREFIX/conf/flume-client.properties)
  # Name the components on this agent
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1
  # Describe/configure the source
  a1.sources.r1.type = exec
  a1.sources.r1.command = tail -n 0 -F /space/nginxlogs/access.web.log
  a1.sources.r1.channels = c1
  # Describe/configure the channels (后面有memory channel配置方案)
  a1.channels.c1.type = file
  a1.channels.c1.keep-alive = 10
  a1.channels.c1.write-timeout = 10
  a1.channels.c1.checkpointDir = /space/guang_mobileapi_flume/checkpoint
  a1.channels.c1.useDualCheckpoints = true
  a1.channels.c1.backupCheckpointDir =/space/guang_mobileapi_flume/backupcheckpoint
  a1.channels.c1.dataDirs = /space/guang_mobileapi_flume/data
  # Describe/configure the sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = 192.168.10.35
  a1.sinks.k1.port = 44444
  a1.sinks.k1.channel = c1
  (# Describe/configure the channels(次方案可以替换前面的file channel配置方案)
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 1000
  a1.channels.c1.transactionCapacity = 100
  )
  server配置:
  # Name the components on this agent
  a1.sources = r1
  a1.sinks = k1
  a1.channels = c1
  # Describe/configure the source
  a1.sources.r1.type = avro
  a1.sources.r1.channels = c1
  a1.sources.r1.bind = 0.0.0.0
  a1.sources.r1.port = 44444
  # Describe the sink
  a1.sinks.k1.type = hdfs
  a1.sinks.k1.hdfs.path = /logs/web/nginx/%Y%m%d
  a1.sinks.k1.hdfs.rollInterval = 3600
  a1.sinks.k1.hdfs.fileType = DataStream
  a1.sinks.k1.hdfs.rollSize = 0
  a1.sinks.k1.hdfs.rollCount = 0
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  a1.sinks.k1.hdfs.filePrefix = access.guang.j.cn
  a1.sinks.k1.hdfs.inUseSuffix = .tmp
  a1.sinks.k1.hdfs.idleTimeout = 300
  # Use a channel which buffers events in memory
  a1.channels.c1.type = file
  a1.channels.c1.write-timeout = 10
  a1.channels.c1.keep-alive = 10
  a1.channels.c1.checkpointDir = /space/flume/checkpoint
  a1.channels.c1.useDualCheckpoints = true
  a1.channels.c1.backupCheckpointDir =/space/flume/backupcheckpoint
  a1.channels.c1.dataDirs = /space/flume/data
  # Bind the source and sink to the channel
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  (
  # Describe the sink (前面面有hdfs方式)
  a1.sinks.k1.type = file_roll
  a1.sinks.k1.sink.directory = /tmp/test
  a1.sinks.k1.sink.rollInterval = 3600
  # Use a channel which buffers events in memory   (前面有file channel配置方案)
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 1000
  a1.channels.c1.transactionCapacity = 100
  # Bind the source and sink to the channel
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  )
  以上配置1小时自动释放一次。可理解成每小时截断一次。因为hadoop目录使用日志变量,在某文件空闲5分钟后自己释放。
  启动(client 和server端只有配置文件不同)
  cd /PREFIX
  ./flume-ng agent -n a1 -c ../conf -f ../conf/flume-XXXXX.properties    2>/dev/null&
  另附启停脚本
  #!/bin/bash
  PREFIX="/usr/local/flume"
  AGENT="a1"
  start(){
  echostarting
  ps aux |grep -v grep|grep $PREFIX -q
  if [ $? -eq 0 ];then
  echo flume is already running
  exit 1
  fi
  cd $PREFIX
  bin/flume-ng agent -n $AGENT -c conf -f conf/flume-XXXXX.properties    2>/dev/null &
  }
  stop(){
  ps aux |grep -v grep|grep $PREFIX -q
  if [ $? -ne 0 ];then
  echo flume is not running
  else
  ps aux |grep -v grep|grep $PREFIX|awk '{print $2}'|xargs kill -15
  sleep 3
  ps aux |grep -v grep|grep $PREFIX -q
  [ $? -eq 0 ]&& ps aux |grep flume|grep -v grep|awk '{print $2}'|xargs kill -9
  fi
  }
  status(){
  ps aux |grep -v grep|grep $PREFIX -q
  if [ $? -eq 0 ];then
  echo flume is running
  else
  echo flume is not running
  fi
  }
  case $1 in
  start)
  start
  ;;
  stop)
  stop
  ;;
  restart)
  stop
  sleep 1
  start
  ;;
  status)
  status
  ;;
  *)
  echo "Useage : $0 "
  ;;
  esac
  如果有不同的agent并且server是一个而且收集的日志不同,那么上述配置只需要修改几个地方就OK了
  1、修改agent和server的端口
  2、server端配置文件要换一个新的 而且中的a1 c1 r1 k1换成其他标示如:a2 c2 r2 k2 以及a1.sinks.k1.hdfs.patha1.channels.c1.checkpointDir a1.channels.c1.backupCheckpointDir a1.channels.c1.dataDirs 的路径

页: [1]
查看完整版本: flume对nginx群集日志收集方案