tanggang1740 发表于 2018-11-13 11:14:07

python分析nginx日志并推送到open-falcon

不解释,直接撸代码  

  
#!/usr/bin/python
  
# --*-- coding: utf-8 --*--
  
# NGINX日志分析
  

  
import time
  
import datetime
  
import sys
  
import os
  
import os.path
  
import re
  
import json
  
import socket
  
import requests
  
import subprocess
  

  
class NginxLog(object):
  """初始化数据"""
  def __init__(self, log_file, interface_list, seek_file):
  self.log_file = log_file
  self.interface_list = interface_list
  self.seek_file = seek_file
  

  def jsonFormat(self, python_data):
  """格式化成json格式"""
  json_data = json.dumps(python_data, indent=2)
  return json_data
  

  def hostname(self):
  """host_name: 主机名
  host_ip: 主机ip
  """
  host_name = socket.getfqdn(socket.gethostname( ))
  host_ip = socket.gethostbyname(host_name)
  return host_name
  

  def writeSeek(self, seek):
  """读过的文件游标写入临时文件"""
  with open(self.seek_file,'w') as f:
  f.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) + '\n')
  f.write(str(seek) + "\n")
  

  def LogRead(self):
  """读出新生成的日志
  # 如果第一次运行,或是删除临时文件,从头运行,否则,从上次读取之后运行
  # 0代表从头开始,1代表当前位置,2代表文件最末尾位置
  """
  if os.path.exists(self.seek_file):
  with open(self.seek_file) as f:
  seek_tmp = f.readlines()
  seek_old = int(seek_tmp.strip())
  else:
  seek_old = 0
  with open(self.log_file) as f:
  #记录当前最新文件游标
  f.seek(0,2)#最新游标位置
  seek_now = f.tell()
  #读取上次读完之后的日志
  if seek_now >= seek_old:
  f.seek(seek_old,0)#从文件开头为止偏移
  chunk = f.read(seek_now - seek_old)
  #如果seek_now-seek_old小于0说明日志轮训
  else:
  f.seek(0,0)
  chunk = f.read(seek_now)
  # 将这次的游标写入临时文件
  self.writeSeek(seek_now)
  return chunk
  

  def LogStatistics(self):
  """分析NGINX日志的正则表达示,如果日志格式更改,则需要相应作更改
  """
  

  #log_examp = '127.0.0.1 - - "HEAD /aaa HTTP/1.1" 404 0 "-" "curl/7.29.0"'
  #输出结果
  #100.64.40.7 - - "GET /wpsad.php HTTP/1.0" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.98 Safari/537.36 LBBROWSER" "114.255.44.143" 0.369
  result_list = []
  time_ns =datetime.datetime.now().microsecond
  time_stamp = int(str(time.time()))
  host_name = self.hostname()
  # url列表循环
  for interface_item in self.interface_list:
  interface_item_dict_count = {}
  interface_item_dict_avg_request_time = {}
  interface_item_dict_2xx = {}
  interface_item_dict_4xx = {}
  interface_item_dict_5xx = {}
  interface_item_dict_count['ns']=interface_item_dict_avg_request_time['ns']=interface_item_dict_2xx['ns']=interface_item_dict_4xx['ns']=interface_item_dict_5xx['ns']=time_ns
  interface_item_dict_count['clock']=interface_item_dict_avg_request_time['clock']=interface_item_dict_2xx['clock']=interface_item_dict_4xx['clock']=interface_item_dict_5xx['clock']=time_stamp
  interface_item_dict_count['host']=interface_item_dict_avg_request_time['host']=interface_item_dict_2xx['host']=interface_item_dict_4xx['host']=interface_item_dict_5xx['host']=host_name
  interface_item_dict_count['key'] = interface_item + '_count'
  interface_item_dict_count['value'] = 0
  interface_item_dict_avg_request_time['key'] = interface_item + '_avg_request_time'
  interface_item_dict_avg_request_time['value'] = 0
  interface_item_dict_2xx['key'] = interface_item + '_2xx'
  interface_item_dict_2xx['value'] = 0
  interface_item_dict_4xx['key'] = interface_item + '_4xx'
  interface_item_dict_4xx['value'] = 0
  interface_item_dict_5xx['key'] = interface_item + '_5xx'
  interface_item_dict_5xx['value'] = 0
  hit_url_count = 0
  

  ##实时输出日志
  for line in self.LogRead().split('\n'):
  #print(line.split())
  if line != None and len(line.split()) != 0:
  #匹配字段
  remote_addr = line.split()
  #切割请求的url
  request_url = line.split()
  status_code = line.split()
  request_time = line.split()[-1]#保留等会用
  # 匹配之后数据结构操作
  if interface_item == request_url:
  hit_url_count += 1
  interface_item_dict_count['value'] += 1
  #响应时间
  interface_item_dict_avg_request_time['value'] += float(request_time)
  if status_code.strip('\"').startswith('2'):
  interface_item_dict_2xx['value'] += 1
  if status_code.strip('\"').startswith('4'):
  interface_item_dict_4xx['value'] += 1
  if status_code.strip('\"').startswith('5'):
  interface_item_dict_5xx['value'] += 1
  # 求平均请求反应时间
  if interface_item_dict_avg_request_time['value'] != 0:
  interface_item_dict_avg_request_time['value'] = interface_item_dict_avg_request_time['value'] / hit_url_count
  # 结果加入列表
  result_list.append(interface_item_dict_count)
  result_list.append(interface_item_dict_avg_request_time)
  result_list.append(interface_item_dict_2xx)
  result_list.append(interface_item_dict_4xx)
  result_list.append(interface_item_dict_5xx)
  return result_list
  #return self.jsonFormat(result_list)
  

  

  
def pushFalcon(return_data):
  """数据推到openfalcon"""
  all_data = []
  all_request_dic = {}
  wpsad_2xx_request_dic = {}
  wpsad_4xx_request_dic = {}
  wpsad_5xx_request_dic = {}
  wpsad_response_time_dic = {}
  for i in return_data:
  if 'wpsad.php_count' in i['key']:
  all_request_dic['value'] = i['value']
  all_request_dic['key'] = 'wpsad.php_count'
  all_request_dic['host_name'] = i['host']
  all_data.append(all_request_dic)
  if 'wpsad.php_2xx' in i['key']:
  wpsad_2xx_request_dic['value'] = i['value']
  wpsad_2xx_request_dic['key'] = 'wpsad.php_2xx_count'
  wpsad_2xx_request_dic['host_name'] = i['host']
  all_data.append(wpsad_2xx_request_dic)
  if 'wpsad.php_4xx' in i['key']:
  wpsad_4xx_request_dic['value'] = i['value']
  wpsad_4xx_request_dic['key'] = 'wpsad.php_4xx_count'
  wpsad_4xx_request_dic['host_name'] = i['host']
  all_data.append(wpsad_4xx_request_dic)
  if 'wpsad.php_5xx' in i['key']:
  wpsad_5xx_request_dic['value'] = i['value']
  wpsad_5xx_request_dic['key'] = 'wpsad.php_5xx_count'
  wpsad_5xx_request_dic['host_name'] = i['host']
  all_data.append(wpsad_5xx_request_dic)
  if 'wpsad.php_avg_request_time' in i['key']:
  wpsad_response_time_dic['key'] = 'wpsad_response_time'
  wpsad_response_time_dic['value'] = i['value']
  #all_data.append(wpsad_response_time_dic)
  # nginx请求状态数据
  ts = int(time.time())
  payload = []
  for i in all_data:
  temp_dic = {
  #"endpoint": i['host_name'],
  "endpoint": "vm172-31-32-13.ksc.com",
  "metric": i['key'],
  "timestamp": ts,
  "step": 60,
  "value": i['value'],
  "counterType": "GAUGE",
  "tags": "url="+i['key']
  }
  payload.append(temp_dic)
  #print(payload)
  

  # 响应时间数据
  response_time_dic = {
  "endpoint": "vm172-31-32-13.ksc.com",
  "metric": wpsad_response_time_dic['key'],
  "timestamp": ts,
  "step": 60,
  "value": wpsad_response_time_dic['value'],
  "counterType": "GAUGE",
  "tags": "",
  }
  payload.append(response_time_dic)
  # nginx并发请求数统计
  estab_data = {
  "endpoint": "vm172-31-32-13.ksc.com",
  "metric": "nginx_estab_num",
  "timestamp": ts,
  "step": 60,
  "value": 0,
  "counterType": "GAUGE",
  "tags": "",
  }
  time_wait = {
  "endpoint": "vm172-31-32-13.ksc.com",
  
                "metric": "nginx_timewait_num",
  
                "timestamp": ts,
  
                "step": 60,
  
                "value": 0,
  
                "counterType": "GAUGE",
  
                "tags": "",
  }
  # TIME_WAIT
  time_wait_cmd = "netstat -ant|grep -i '80'|grep 'TIME_WAIT'|wc -l"
  time_wait_p = subprocess.Popen(time_wait_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timewait_out = time_wait_p.stdout.read()
  timewait_err = time_wait_p.stderr.read()
  if not timewait_err:
  time_wait['value'] = int(timewait_out.strip())
  payload.append(time_wait)
  

  # ESTABLISHED
  estab_cmd = "netstat -ant|grep -i '80'|grep 'ESTABLISHED'|wc -l"
  estab_p = subprocess.Popen(estab_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  estab_out = estab_p.stdout.read()
  estab_err = estab_p.stderr.read()
  if not estab_err:
  estab_data['value'] = int(estab_out.strip())
  payload.append(estab_data)
  # nginx进程占用的内存监控
  mem_dic = {
  "endpoint": "vm172-31-32-13.ksc.com",
  "metric": "nginx_mem",
  "timestamp":ts,
  "step": 60,
  "value": 0,
  "counterType": "GAUGE",
  "tags": "",
  }
  #mem_cmd = "top -b -n1|grep nginx|gawk '{if($6~/m$/) {sum+=$6*1024} else {sum+=$6} }; END {print int(sum/1024)}'"
  #mem_p = subprocess.Popen(time_wait_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  #mem_out = mem_p.stdout.read()
  #print(mem_out)
  #mem_err = mem_p.stderr.read()
  #if not mem_err:
  #mem_dic['value'] = int(mem_out.strip())
  #print(mem_dic['value'])
  #
  

  nginxpid = subprocess.Popen(["pidof", "nginx"], stdout=subprocess.PIPE)
  nginxpid = nginxpid.stdout.read().split()
  memsum = 0
  for i in nginxpid:
  pidfile = os.path.join("/proc/", str(i), "status")
  with open(pidfile) as f:
  for mem in f:
  if mem.startswith("VmRSS"):
  pidmem = int(mem.split())
  memsum += pidmem
  memsum = int(memsum)//1024
  #print("%d %s" %(memsum,"M"))
  mem_dic['value'] = memsum
  payload.append(mem_dic)
  # 推送到falcon-agent
  #print(payload)
  r = requests.post("http://127.0.0.1:1988/v1/push", data=json.dumps(payload))
  
def main():
  # 需要分析的url列表
  interface_list = ['/wpsad.php']
  

  # 日志文件位置
  log_file = "/data/logs/nginx/ads.access.log"
  # 临时文件位置
  seek_file = "/data/logs/nginx/ads_log_check_seek.tmp"
  nginx_log = NginxLog(log_file, interface_list, seek_file)
  return_data = nginx_log.LogStatistics()
  #print return_json_data
  pushFalcon(return_data)
  

  
if __name__ == '__main__':
  main()


页: [1]
查看完整版本: python分析nginx日志并推送到open-falcon