longpan 发表于 2018-8-9 09:41:25

python 解析 crontab配置

  接触python一段时间了,最近要用py做个 监控功能,需要解析crontab中的配置信息, 本想偷懒一下,直接 百度/谷哥出来,无奈半天没找着,只好自己写一个,实现代码及使用 实例如下,望各位路过的大虾大神不吝赐教,能指点得到更优的处理办法:
  #/usr/bin/env python
  #_*_coding:utf-8_*_
  # Copyright (c) 2013 stephen <zhangxmgogo@gmail.com>
  # All rights reserved
  """
  1.解析 crontab 配置文件中的五个数间参数(分 时 日 月 周),获取他们对应的取值范围
  2.将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
  """
  #$Id $
  import re, time, sys
  def get_struct_time(time_stamp_int):
  ____"""
  ____按整型时间戳获取格式化时间 分 时 日 月 周
  ____Args:
  ________time_stamp_int 为传入的值为时间戳(整形),如:1332888820
  ________经过localtime转换后变成
  ________time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)
  ____Return:
  ________list____返回 分 时 日 月 周
  ____"""
  ____st_time = time.localtime(time_stamp_int)
  ____return
  def get_strptime(time_str, str_format):
  ____"""从字符串获取 整型时间戳
  ____Args:
  ________time_str 字符串类型的时间戳 如 '31/Jul/2013:17:46:01'
  ________str_format 指定 time_str 的格式 如 '%d/%b/%Y:%H:%M:%S'
  ____Return:
  ________返回10位整型(int)时间戳,如 1375146861
  ____"""
  ____return int(time.mktime(time.strptime(time_str, str_format)))
  def get_str_time(time_stamp, str_format='%Y%m%d%H%M'):
  ____"""
  ____获取时间戳,
  ____Args:
  ________time_stamp 10位整型(int)时间戳,如 1375146861
  ________str_format 指定返回格式,值类型为 字符串 str
  ____Rturn:
  ________返回格式 默认为 年月日时分,如2013年7月9日1时3分 :201207090103
  ____"""
  ____return time.strftime("%s" % str_format, time.localtime(time_stamp))
  def match_cont(patten, cont):
  ____"""
  ____正则匹配(精确符合的匹配)
  ____Args:
  ________patten 正则表达式
  ________cont____ 匹配内容
  ____Return:
  ________True or False
  ____"""
  ____res = re.match(patten, cont)
  ____if res:
  ________return True
  ____else:
  ________return False
  def handle_num(val, ranges=(0, 100), res=list()):
  ____"""处理纯数字"""
  ____val = int(val)
  ____if val >= ranges and val <= ranges:
  ________res.append(val)
  ____return res
  def handle_nlist(val, ranges=(0, 100), res=list()):
  ____"""处理数字列表 如 1,2,3,6"""
  ____val_list = val.split(',')
  ____for tmp_val in val_list:
  ________tmp_val = int(tmp_val)
  ________if tmp_val >= ranges and tmp_val <= ranges:
  ____________res.append(tmp_val)
  ____return res
  def handle_star(val, ranges=(0, 100), res=list()):
  ____"""处理星号"""
  ____if val == '*':
  ________tmp_val = ranges
  ________while tmp_val <= ranges:
  ____________res.append(tmp_val)
  ____________tmp_val = tmp_val + 1
  ____return res
  def handle_starnum(val, ranges=(0, 100), res=list()):
  ____"""星号/数字 组合 如 */3"""
  ____tmp = val.split('/')
  ____val_step = int(tmp)
  ____if val_step < 1:
  ________return res
  ____val_tmp = int(tmp)
  ____while val_tmp <= ranges:
  ________res.append(val_tmp)
  ________val_tmp = val_tmp + val_step
  ____return res
  def handle_range(val, ranges=(0, 100), res=list()):
  ____"""处理区间 如 8-20"""
  ____tmp = val.split('-')
  ____range1 = int(tmp)
  ____range2 = int(tmp)
  ____tmp_val = range1
  ____if range1 < 0:
  ________return res
  ____while tmp_val <= range2 and tmp_val <= ranges:
  ________res.append(tmp_val)
  ________tmp_val = tmp_val + 1
  ____return res
  def handle_rangedv(val, ranges=(0, 100), res=list()):
  ____"""处理区间/步长 组合 如 8-20/3 """
  ____tmp = val.split('/')
  ____range2 = tmp.split('-')
  ____val_start = int(range2)
  ____val_end = int(range2)
  ____val_step = int(tmp)
  ____if (val_step < 1) or (val_start < 0):
  ________return res
  ____val_tmp = val_start
  ____while val_tmp <= val_end and val_tmp <= ranges:
  ________res.append(val_tmp)
  ________val_tmp = val_tmp + val_step
  ____return res
  def parse_conf(conf, ranges=(0, 100), res=list()):
  ____"""解析crontab 五个时间参数中的任意一个"""
  ____#去除空格,再拆分
  ____conf = conf.strip(' ').strip(' ')
  ____conf_list = conf.split(',')
  ____other_conf = []
  ____number_conf = []
  ____for conf_val in conf_list:
  ________if match_cont(PATTEN['number'], conf_val):
  ____________#记录拆分后的纯数字参数
  ____________number_conf.append(conf_val)
  ________else:
  ____________#记录拆分后纯数字以外的参数,如通配符 * , 区间 0-8, 及 0-8/3 之类
  ____________other_conf.append(conf_val)
  ____if other_conf:
  ________#处理纯数字外各种参数
  ________for conf_val in other_conf:
  ____________for key, ptn in PATTEN.items():
  ________________if match_cont(ptn, conf_val):
  ____________________res = PATTEN_HANDLER(val=conf_val, ranges=ranges, res=res)
  ____if number_conf:
  ________if len(number_conf) > 1 or other_conf:
  ____________#纯数字多于1,或纯数字与其它参数共存,则数字作为时间列表
  ____________res = handle_nlist(val=','.join(number_conf), ranges=ranges, res=res)
  ________else:
  ____________#只有一个纯数字存在,则数字为时间 间隔
  ____________res = handle_num(val=number_conf, ranges=ranges, res=res)
  ____return res
  def parse_crontab_time(conf_string):
  ____"""
  ____解析crontab时间配置参数
  ____Args:
  ________conf_string____ 配置内容(共五个值:分 时 日 月 周)
  ________________________ 取值范围 分钟:0-59 小时:1-23 日期:1-31 月份:1-12 星期:0-6(0表示周日)
  ____Return:
  ________crontab_range____list格式,分 时 日 月 周 五个传入参数分别对应的取值范围
  ____"""
  ____time_limit____= ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))
  ____crontab_range = []
  ____clist________ = []
  ____conf_length = 5
  ____tmp_list____ = conf_string.split(' ')
  ____for val in tmp_list:
  ________if len(clist) == conf_length:
  ____________break
  ________if val:
  ____________clist.append(val)
  ____
  ____if len(clist) != conf_length:
  ________return -1, 'config error whith [%s]' % conf_string
  ____cindex = 0
  ____for conf in clist:
  ________res_conf = []
  ________res_conf = parse_conf(conf, ranges=time_limit, res=res_conf)
  ________if not res_conf:
  ____________return -1, 'config error whith [%s]' % conf_string
  ________crontab_range.append(res_conf)
  ________cindex = cindex + 1
  ____return 0, crontab_range
  def time_match_crontab(crontab_time, time_struct):
  ____"""
  ____将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
  ____Args:
  ________crontab_time____crontab配置中的五个时间(分 时 日 月 周)参数对应时间取值范围
  ________time_struct____ 某个整型时间戳,如:1375027200 对应的 分 时 日 月 周
  ____Return:
  ________tuple 状态码, 状态描述
  ____"""
  ____cindex = 0
  ____for val in time_struct:
  ________if val not in crontab_time:
  ____________return 0, False
  ________cindex = cindex + 1
  ____return 0, True
  def close_to_cron(crontab_time, time_struct):
  ____"""coron的指定范围(crontab_time)中 最接近 指定时间 time_struct 的值"""
  ____close_time = time_struct
  ____cindex = 0
  ____for val_struct in time_struct:
  ________offset_min = val_struct
  ________val_close = val_struct
  ________for val_cron in crontab_time:
  ____________offset_tmp = val_struct - val_cron
  ____________if offset_tmp > 0 and offset_tmp < offset_min:
  ________________val_close = val_struct
  ________________offset_min = offset_tmp
  ________close_time = val_close
  ________cindex = cindex + 1
  ____return close_time
  def cron_time_list(
  ________cron_time,
  ________year_num=int(get_str_time(time.time(), "%Y")),
  ________limit_start=get_str_time(time.time(), "%Y%m%d%H%M"),
  ________limit_end=get_str_time(time.time() + 86400, "%Y%m%d%H%M")
  ____):
  ____#print "\nfrom ", limit_start , ' to ' ,limit_end
  ____"""
  ____获取crontab时间配置参数取值范围内的所有时间点 的 时间戳
  ____Args:
  ________cron_time 符合crontab配置指定的所有时间点
  ________year_num____指定在哪一年内 获取
  ________limit_start 开始时间
  ____Rturn:
  ________List________所有时间点组成的列表(年月日时分 组成的时间,如2013年7月29日18时56分:201307291856)
  ____"""
  ____#按小时 和 分钟组装
  ____hour_minute = []
  ____for minute in cron_time:
  ________minute = str(minute)
  ________if len(minute) < 2:
  ____________minute = '0%s' % minute
  ________for hour in cron_time:
  ____________hour = str(hour)
  ____________if len(hour) < 2:
  ________________hour = '0%s' % hour
  ____________hour_minute.append('%s%s' % (hour, minute))
  ____#按天 和 小时组装
  ____day_hm = []
  ____for day in cron_time:
  ________day = str(day)
  ________if len(day) < 2:
  ____________day = '0%s' % day
  ________for hour_mnt in hour_minute:
  ____________day_hm.append('%s%s' % (day, hour_mnt))
  ____#按月 和 天组装
  ____month_dhm = []
  ____#只有30天的月份
  ____month_short = ['02', '04', '06', '09', '11']
  ____for month in cron_time:
  ________month = str(month)
  ________if len(month) < 2:
  ____________month = '0%s' % month
  ________for day_hm_s in day_hm:
  ____________if month == '02':
  ________________if (((not year_num % 4 ) and (year_num % 100)) or (not year_num % 400)):
  ____________________#闰年2月份有29天
  ____________________if int(day_hm_s[:2]) > 29:
  ________________________continue
  ________________else:
  ____________________#其它2月份有28天
  ____________________if int(day_hm_s[:2]) > 28:
  ________________________continue
  ____________if month in month_short:
  ________________if int(day_hm_s[:2]) > 30:
  ____________________continue
  ____________month_dhm.append('%s%s' % (month, day_hm_s))
  ____#按年 和 月组装
  ____len_start = len(limit_start)
  ____len_end = len(limit_end)
  ____month_dhm_limit = []
  ____for month_dhm_s in month_dhm:
  ________time_ymdhm = '%s%s' % (str(year_num), month_dhm_s)
  ________#开始时间\结束时间以外的排除
  ________if (int(time_ymdhm[:len_start]) < int(limit_start)) or \
  ________ (int(time_ymdhm[:len_end]) > int(limit_end)):
  ____________continue
  ________month_dhm_limit.append(time_ymdhm)
  ____if len(cron_time) < 7:
  ________#按不在每周指定时间的排除
  ________month_dhm_week = []
  ________for time_minute in month_dhm_limit:
  ____________str_time = time.strptime(time_minute, '%Y%m%d%H%M%S')
  ____________if str_time.tm_wday in cron_time:
  ________________month_dhm_week.append(time_minute)
  ________return month_dhm_week
  ____return month_dhm_limit
  #crontab时间参数各种写法 的 正则匹配
  PATTEN = {
  ____#纯数字
  ____'number':'^+$',
  ____#数字列表,如 1,2,3,6
  ____'num_list':'^+([,]+)+$',
  ____#星号 *
  ____'star':'^\*$',
  ____#星号/数字 组合,如 */3
  ____'star_num':'^\*\/+$',
  ____#区间 如 8-20
  ____'range':'^+[\-]+$',
  ____#区间/步长 组合 如 8-20/3
  ____'range_div':'^+[\-]+[\/]+$'
  ____#区间/步长 列表 组合,如 8-20/3,21,22,34
  ____#'range_div_list':'^(+[\-]+[\/]+)([,]+)+$'
  ____}
  #各正则对应的处理方法
  PATTEN_HANDLER = {
  ____'number':handle_num,
  ____'num_list':handle_nlist,
  ____'star':handle_star,
  ____'star_num':handle_starnum,
  ____'range':handle_range,
  ____'range_div':handle_rangedv
  }
  def main():
  ____"""测试用实例"""
  ____#crontab配置中一行时间参数
  ____conf_string = '*/10 * * * * (cd /opt/pythonpm/devpapps;' \
  ________________ ' /usr/local/bin/python2.5 data_test.py>>output_error.txt)'
  ____#时间戳
  ____time_stamp = int(time.time())
  ____#解析crontab时间配置参数 分 时 日 月 周 各个取值范围
  ____res, desc = parse_crontab_time(conf_string)
  ____if res == 0:
  ________cron_time = desc
  ____else:
  ________print desc
  ________sys, exit(-1)
  ____print "\nconfig:", conf_string
  ____print "\nparse result(range for crontab):"
  ____
  ____print " minute:", cron_time
  ____print " hour: ", cron_time
  ____print " day: ", cron_time
  ____print " month: ", cron_time
  ____print " week day:", cron_time
  ____#解析 时间戳对应的 分 时 日 月 周
  ____time_struct = get_struct_time(time_stamp)
  ____print "\nstruct time(minute hour day month week) for %d :" % \
  ________ time_stamp, time_struct
  ____#将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
  ____match_res = time_match_crontab(cron_time, time_struct)
  ____print "\nmatching result:", match_res
  ____#crontab配置设定范围中最近接近时指定间戳的一组时间
  ____most_close = close_to_cron(cron_time, time_struct)
  ____print "\nin range of crontab time which is most colse to struct ", most_close
  ____time_list = cron_time_list(cron_time)
  ____print "\n\n %d times need to tart-up:\n" % len(time_list)
  ____print time_list[:10], '...'
  if __name__ == '__main__':
  ____#请看 使用实例
  ____
  ____main()
页: [1]
查看完整版本: python 解析 crontab配置