9AMC 发表于 2018-10-29 11:08:24

hadoop集群同步实现

  #!/usr/bin/env python
  #coding=utf-8
  #scribe日志接收存在小集群到大集群之间, distcp 同步失败的情况,需要手动进行补入。
  #1、如果查询补入的日志量少,则可以之间用脚本处理。如果量大,则使用 hadoop 提交job。
  # hadoop job 提交方式:
  #   hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-distcp-2.4.0.jar -m 100 hdfs://scribehadoop/scribelog/common_act/2016/08/02/13//file/realtime/distcpv2/scribelog/common_act/2016/08/02/13--update
  #--update 参数表示如果目标地址目录存在,则更新该目录中的内容。
  #手动同步脚本使用方法: python manual_check_sync.pydst_path
  #脚本完成大集群和小集群之间的目录大小比较,目录文件比较。 输出差异文件列表。最后完成同步入库。
  import sys,os,commands,re
  import logging,logging.handlers
  Module=sys.argv.split("scribelog")
  logger1 = logging.getLogger('mylogger1')
  logger1.setLevel(logging.DEBUG)
  # 创建一个handler,用于写入日志文件
  filehandle = logging.FileHandler('log/test.log',mode='a')
  # 再创建一个handler,用于输出到控制台
  consehandle = logging.StreamHandler()
  # 定义handler的输出格式formatter
  formatter = logging.Formatter('[%(asctime)s]:%(message)s',datefmt='%F %T')
  #formatter = logging.Formatter('[%(asctime)s-line:%(lineno)d-%(levelname)s]:%(message)s',datefmt='%F %T')
  filehandle.setFormatter(formatter)
  consehandle.setFormatter(formatter)
  logger1.addHandler(filehandle)
  logger1.addHandler(consehandle)
  little_cluster="hdfs://yz632.hadoop.com.cn/scribelog"
  large_cluster="hdfs://ns1"
  #logger1.info("源目录(小集群):%s%s" % ( little_cluster,Module))
  logger1.info("源目录(小集群):%s%s" %(little_cluster,Module))
  logger1.info("目标目录(大集群):%s%s" %(large_cluster,sys.argv))
  #统计目录大小等情况
  du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)
  du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv)
  #获取的值是str类型,所以需要转为list来做比较。
  logger1.info("               DIR_COUNTFILECOUNTCONTENTSIZE ")
  logger1.info("小集群目录信息:%s" %(du_little))
  logger1.info("大集群目录信息:%s" %(du_large))
  #Python的str类有split方法,只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以定义多个分隔符。这里可以用单个空格作为分隔即可。
  du_little=re.split('         |         | ' ,du_little)
  du_large=re.split('         |         | ' ,du_large)
  #du_little=du_little.split(" ")
  #du_large=du_large.split(" ")
  #print du_large,du_little
  #print du_large,du_little
  #print du_large,du_little
  #print du_large,du_little
  #
  if du_little == du_largeand du_little == du_large and du_little == du_large:
  logger1.info("大小集群文件数量、大小一致,不需要同步")
  exit()
  #如果大小不一致,取出目录下所有文件
  little_list=commands.getoutput("hadoop fs -lsr " + little_cluster + Module + "|grep -v \"^d\"" )
  large_list =commands.getoutput("hadoop fs -lsr " + large_cluster + sys.argv+"|grep -v \"^d\"")
  #logger1.info( "小集群情况:%s " %(little_list))
  #logger1.info( "大集群情况:%s ")%(large_list ))
  list1=[]
  list2=[]
  lost_list=[]
  for i inlittle_list.split("\n"):
  list1.append(i.split("scribelog"))
  for i inlarge_list.split("\n"):
  list2.append(i.split("scribelog"))
  #logger1.info("小集群目录文件:",list1
  #logger1.info("大集群目录文件" ,list2
  logger1.info("对比大小集群文件---》未同步文件列表:")
  for i in list1:
  if i not in list2:
  logger1.info(i)
  lost_list.append(i)
  logger1.info(lost_list)
  #拉取小集群文件到本地tmp目录中
  for i in lost_list:
  s=commands.getstatusoutput("hadoop fs -get " + little_cluster + i+" /tmp/")
  logger1.info("拉取小集群文件到本地tmp目录中%s" %s )
  #入库到大集群
  for i in lost_list:
  logger1.info(i)
  j=commands.getstatusoutput("sudo su - datacopy -s /bin/bash -c '/usr/local/hadoop-2.4.0/bin/hadoop fs -put/tmp/" +i.split("/")[-1] + "/file/realtime/distcpv2/scribelog"+i+" '" )
  logger1.info(j)
  if j == 0 :
  logger1.info(" %s + 同步完成" %(i))
  #clear tmp file
  for iin lost_list:
  commands.getstatusoutput("rm -f /tmp/"+i.split("/")[-1] )
  #当同步目录时,可能会出现put的时候提示没有目录存在。参考赵兵的 put-src dest方法,是否可以避免此类问题有待验证。
  du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)
  du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv)
  logger1.info("                  DIR_COUNT       FILECOUNT       CONTENTSIZE ")
  logger1.info("小集群目录信息:%s" %(du_little))
  logger1.info("大集群目录信息:%s" %(du_large))
  #Python的str类有split方法,但是这个split方法只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以用来做这件事情
  du_little=re.split('         |         | ' ,du_little)
  du_large=re.split('         |         | ' ,du_large)
  if du_little == du_largeand du_little == du_large and du_little == du_large:
  logger1.info("小集群文件数量、大小一致,不需要同步")
  exit()

页: [1]
查看完整版本: hadoop集群同步实现