火冰狐 发表于 2018-11-9 13:03:52

Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB

# -- coding: utf-8 --  '''
  By Willson Luo at 2017/11/23 v1.0
  '''
  import pandas as pd
  import json,time,datetime,iso8601
  from elasticsearch import Elasticsearch
  from geoip import geolite2
  # connect to elasticsearch database
  es = Elasticsearch( "localhost:9200" )
  es = Elasticsearch(hosts=[{'host': 'localhost', 'port': '9200'}],httpauth=('elastic', 'xxxxx'))
  # nginx column name
  #title    = ['@timestamp','host','clientip','size','responsetime','upstreamtime','upstreamhost','httphost','url','xff','referer','agent','status']
  # nginx access log
  ngxlog= 'access.log'
  ngxdata = open(ngxlog).readlines()
  # nginx data(json format)
  ngxjson = {}
  for a1 in range(len(ngxdata)):
  step1 = ngxdata.strip().split("\"")
  abc = iso8601.parsedate(step1)
  bcd = abc.strftime('%Y-%m-%dT%H:%M:%S%Z')
  cde = abc.strftime('%Y%m%d')
  ngxindex   = 'logstash-weixin-nginx-access-'+ cde
  ngxjson['@timestamp'] = bcd
  ngxjson['host'] = step1
  ngxjson['size'] = step1.replace(":","").replace(",","")
  ngxjson['responsetime'] = step1.replace(":","").replace(",","")
  ngxjson['upstreamtime'] = step1
  ngxjson['upstreamhost'] = step1
  if step1 == "-":
  ngxjson['clientip']= step1
  ngxjson['httphost'] = step1
  ipaddr = step1
  else:
  ngxjson['clientip']= step1.split(",")
  ngxjson['httphost'] = step1
  ipaddr = step1.split(",")
  if "Apple" in step1:
  ngxjson['agent']="Apple"
  elif "WeChat" in step1:
  ngxjson['agent']="WeChat"
  elif "curl" in step1:
  ngxjson['agent']="Linux"
  elif "Alibaba" in step1:
  ngxjson['agent']="Aliyun"
  elif "Android" in step1:
  ngxjson['agent']="Android"
  elif "MSIE" in step1:
  ngxjson['agent']="IE"
  elif "Firefox" in step1:
  ngxjson['agent']="Firefox"
  elif "Windows" in step1:
  ngxjson['agent']="Windows"
  elif "Apache-Http" in step1:
  ngxjson['agent']="Apache"
  else:
  ngxjson['agent']= step1
  ngxjson['status']= step1
  location = geolite2.lookup(ipaddr).location
  match = geolite2.lookup(ipaddr).getinfodict()
  location = []
  location.append(match['location']['longitude'])
  location.append(match['location']['latitude'])
  geoip = {}
  geoip['location'] = location
  if match.haskey('city'):
  city = match['city']['names']['en']
  else:
  city = "-"
  if match.haskey('country'):
  country = match['country']['names']['en']
  else:
  country = "-"
  if match.haskey('subdivisions'):
  subdivisions = match['subdivisions']['names']['en']
  else:
  subdivisions = "-"
  ngxjson['geoip']      = geoip
  ngxjson['country']      = country
  ngxjson['subdivisions'] = subdivisions
  ngxjson['city']         = city
  ngxjson['possition']    = country+"-"+subdivisions+"-"+city
  print a1,ngxjson
  es.index( index=ngxindex, doctype="logs", body=ngxjson )

页: [1]
查看完整版本: Python重写Logstash,把Nginx Access Log清洗后汇入Elastic DB