黄智勇 发表于 2015-12-29 11:12:54

[Perl多线程]自动上传FTP文件的多线程扩展

Perl脚本多线程上传FTP文件

本脚本是对《FTP自动上传文件的脚本以及配置文件》的多线程扩展,当然首先对方FTP站点允许同一个IP发起多个连接。

ithreads支持
Perl5.6.0已经加入了ithreads支持,我们通过
use threads;
导入threads多线程处理包;并且通过
use threads::shared;
使用线程间共享变量。
在定义全局并线程间共享的变量时,要这样写:
my $CurrentThreads: shared = 0;   #当前线程总数
因为在Perl的实现中,线程并不是像pthread那样共享变量,而是大家都分开,如同原来的进程一样。如果想让一个变量共享,须要显式地指定它才行。

当然我们还要限制并发活动线程数目,因为最多线程数是1-65。perl最多允许64个子线程,加上主线程因此最多65个线程,而且FTP站点允许多大连接数。这里$MaxThreads设置是从配置文件读取的。
配置文件的读取我们是采用
use Config::IniFiles;
库的。它的读取很简单:
my $cfg = Config::IniFiles->new( -file => "FTPUpload.config" );

等待线程的完全退出
通过不断地调用
my $thread = threads->create('processFile', $srcpath, $dstpath, $dstdir);
启动了多干个线程后,我们这里一定要调用
push(@$self, \$thread);
因为,创建一个thread以后要用join取得该thread的返回值,然后系统才会对thread进行清理,否则所有thread的信息都会保留下来,当然越积越多了。对返回值不关心的时候要用detach显式剥离该thread。
所以,在最后我们要等待这些线程的完全退出:
foreach my $thread (@$self)
{
print("Joining thread\n");
$$thread->join();
}
否则,threads库会在最后打印如下信息:
A thread exited while 2 threads were running.


程序大致的流程是:
      第一步,尝试用配置信息登陆ftp站点,这只是验证能够登陆而已;
       第二步,在指定文件夹A类下寻找符合条件的文件,并将对每一个A类文件创建一个线程上传。每个线程单独创建一个FTP连接,并不断地尝试上传这个文件直至成功;
       第三步,如果A类文件们全部上传成功,那么在指定文件夹B类下寻找指定文件,   并且上传到FTP指定目录下
      第四步,写成功/失败日志。

    最后,我们要写的成功/失败日志的格式如下所示:
         成功: 生成一个名为“Upload_Succ_2005_01_04_17_23.log”的日志文件
       文件格式:输出上传时间,以及所有上传文件名及其大小和耗费的时间。
      失败: 生成一个名为“Upload_Fail_2005_01_04_17_23.log”的日志文件
       文件格式:输出上传时间,以及已经上传的文件名及其大小和耗费的时间,和失败的文件名及原因。

配置perl脚本运行有两个办法:
u       您可以在Windows计划任务中配置运行“Perl Upload.pl”的时间,这需要在Windows环境中配置ActivePerl 5.8.4.810;
u       您也可以利用Perl2Exe(p2x-8.40-Win32)来将perl脚本编译为一个exe可执行程序,在计划任务中运行这个exe(这需要PerlCRT.dll在系统路径下)。
[注意!]在运行之前,您必须修改“Upload.config”文件以配置所需的重要参数。

外部配置参数
在和perl脚本同一目录下的“Upload.config”配置文件中,是事先配置的10个外部参数:
#            参数1: ftp_server:
#                                        FTP服务器的IP地址。
#            参数2: ftp_dir:
#                                        指定的FTP上传目录路径;
#            参数3: ftp_uid:
#                                        FTP的登陆用户名;
#            参数4: ftp_pw:
#                                        FTP的登陆密码;
#                   参数5: src_dir_WAVFiles,这是一个数组:
          指定A类文件夹,放置所有要上传的语音文件;
          注意:这个目录下的子文件夹也会被上传。
#                   参数6: src_dir_NamesListFile,这是一个数组:
          指定B类文件夹,放置B类文件. 注意:这个目录下的子文件夹也会被上传。
#            参数7: ftp_timeout:
#                                        FTP的访问超时时间,以秒为单位;
#            参数8: ftp_retrytimes:
#                                        FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误;
#            参数9: ftp_debug :
#                                 FTP的详细的调试信息输出
#            参数10: maxthreads:
#                                 启动同时访问FTP站点的线程最大数目

附录:
Upoad.pl内容:
#!/usr/bin/perl -w

####################################################################
#
# 工程项目: FTP自动上传两类文件
#
# 模块名称: FTPAutoUpload
#
# 模块任务: 按照指定的文件夹目录,自动将该文件夹下的所有文件上传到指定ftp站点的指定目录下
#
# 程序名称:   Upload.pl
#
# 程序员:   Uwe Keim
#                   郑昀
# 历史记录:
# 编号   日期       作者               备注
# 1       2000          Uwe Keim    创建
# 2       2005.1.5    郑昀            加入容错处理和读取外部配置文件部分
# 3    2005.1.14   郑昀       加入多线程支持
#
####################################################################

#use strict;

##================================================================##
## 引用的库声明 1
## 读取ini配置文件的库
use Config::IniFiles;
my $cfg = Config::IniFiles->new( -file => "Upload.config" );
## 启动同时访问FTP站点的线程最大数目 ##
$config_maxthreads          = $cfg->val('Threads', 'maxthreads') || 1;
##================================================================##

##================================================================##
## 引用的库声明 2
use File::Copy;
use File::stat;
use File::Find;
use Net::FTP;
use Date::Pcalc qw(Delta_DHMS);
use Date::Parse;
#!!use Win32::OLE; # 加入这个库会导致Perl执行线程时崩溃
#!!use Win32::OLE::Variant; # 加入这个库会导致Perl执行线程时崩溃
##================================================================##

##================================================================##
## 引用的库声明 3
#       Perl5.6.0已经加入了ithreads支持
use threads;                #导入threads多线程处理包
use threads::shared;      #使用线程间共享变量
my $MaxThreads = $config_maxthreads;    #最多线程数(1-65),perl最多允许64个子线程,加上主线程因此最多65个线程
                                        #此次设置是从配置文件读取的

# 在定义全局并线程间共享的变量时,要这样写
my $CurrentThreads: shared = 0;   #当前线程总数
##$total_files是上传文件的数目
my $total_files: shared = 0;

##$processed_files是已上传文件的数目
my $processed_files: shared = 0;

##$skipped_files是跳过文件的数目
my $skipped_files: shared = 0;

##FTP重试次数
my $ftp_retrytimes: shared = 0;

## $g_nIsAllWAVsFile_UploadSuccess代表是否已经完全将语音文件上传,-1为不是,1为是:
my $g_nIsAllWAVsFile_UploadSuccess: shared = 1;

# 因为在Perl的实现中,线程并不是像pthread那样共享变量,而是大家都分开,如同原来的进程一样。
# 如果想让一个变量共享,须要显式地指定它才行。
#
##================================================================##


##================================================================##
## 从配置文件读取外部参数 ##
##
## FTP服务器的IP地址 ##
my $ftp_server            = $cfg->val('FTPServer', 'ftp_server') || '';

## 指定的FTP上传目录路径 ##
#! 切记:文件夹最后不要加"/"符号 !#
my $ftp_dir             = $cfg->val('FTPServer', 'ftp_dir') || '';

## FTP的登陆用户名 ##
my $ftp_uid         = $cfg->val('FTPServer', 'ftp_uid') || '';

## FTP的登陆密码 ##
my $ftp_pw                  = $cfg->val('FTPServer', 'ftp_pw') || '';

## FTP的访问超时时间,以秒为单位,默认设置为30分钟 ##
my $ftp_timeout               = $cfg->val('FTPServer', 'ftp_timeout') || 1800;
####$ftp_timeout                = 1800;

## FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误 ##
$ftp_retrytimes               = $cfg->val('FTPServer', 'ftp_retrytimes') || 3;
####$ftp_retrytimes             = 3;

## FTP的详细的调试信息输出 ##
$ftp_debug                  = $cfg->val('FTPServer', 'ftp_debug') || 0;
####$ftp_debug            = 0;

## 指定文件夹“语音文件”,放置所有要上传的语音文件 ##
#! 切记:文件夹最后不要加"\\"符号 !#
my @src_dir_WAVFiles    = $cfg->val('SrcDirectory', 'src_dir_WAVFiles');

## 指定文件夹“命名对照列表文件TXT”,放置命名对照列表文件 ##
#! 切记:文件夹最后不要加"\\"符号 !#
my @src_dir_NamesListFile   = $cfg->val('SrcDirectory', 'src_dir_NamesListFile');

## 一个字符串集合,表明哪些类型的文件/文件夹将不被上传到服务器 ##
my @wc_exclude          = ("_vti",".log","\\bak","\\data","server.inc");
##================================================================##


##================================================================##
## 记录全部过程的日志文件准备
my $logfilename = 'upload.log';

my $log_cnt = 0;

my $span = 0;

LOG("");
LOG("郑昀(R) 自动上传两类文件 Version 0.1");
LOG("没有版权(C) Linktone 2005-2006。不保留所有权利。");
LOG("");
LOG("用法: Perl FTPUpload.MultiThread.pl");
LOG("");
##================================================================##

##================================================================##
##=== 程序执行的第一步:尝试登陆ftp站点 ==========================##
##$start_date计算出当前开始的时间
my $start_date = timeString(time);

## $g_nUploadSuccess代表是否已经完全上传,-1为不是,1为是:
my $g_nUploadSuccess = 1;

## $g_strLastError代表上次错误原因:
my $g_strLastError = "";

LOG("正在链接至指定FTP服务器($ftp_server)...");
# 第二个参数用于指定超时时间
my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
    $g_strLastError = "不能连接到FTP服务器,错误原因:".$@;
    LOG("$g_strLastError@\n");
    $g_nUploadSuccess = -1;
}
else
{
    $ftp->login($ftp_uid, $ftp_pw);
    if($@)
    {
      $g_strLastError = "不能登陆FTP服务器,错误原因:".$@;
      LOG("$g_strLastError\n");
      $g_nUploadSuccess = -1;
    }
    else
    {
      LOG("链接FTP服务器成功!");
      ## 关闭连接
      $ftp->quit() or warn "咦,为什么不能够和FTP服务器断开连接呢?: $@\n";
##================================================================##


##================================================================##
##=== 程序执行的第二步,将指定文件夹“语音文件”下所有语音文件上传到FTP站点指定目录下 ===##

      #my %lookup;
      LOG("准备上传“语音文件”目录(@src_dir_WAVFiles)下的所有文件!");
      find(\&processFiles, @src_dir_WAVFiles);
      LOG("目录(@src_dir_WAVFiles)已经处理完毕,结果是:");

      foreach my $thread (@$self)
      {
            print("Joining thread\n");
            # join() does three things: it waits for a thread to exit,
            # cleans up after it, and returns any data the thread may
            # have produced.
            $$thread->join();
      }
      
##================================================================##
##=== 程序执行的第三步,将指定文件夹“命名对照列表文件TXT”下文件上传到FTP站点指定目录下 ===##
      if($g_nIsAllWAVsFile_UploadSuccess > 0)
      {
            LOG("+===============================+");
            LOG("准备上传“语音文件”目录(@src_dir_NamesListFile)下的所有文件!");
            @$self = {};
            find(\&processFiles, @src_dir_NamesListFile);
            foreach my $thread (@$self)
            {
                print("Joining thread\n");
                $$thread->join();
            }
            LOG("目录(@src_dir_NamesListFile)已经处理完毕,结果是:");
            LOG("-===============================-");
      }
      else
      {
            LOG("-===============================-");
            LOG("由于语音文件目录并没有完全上传,所以本命名对照文件不上传!");
            LOG("-===============================-");
      }
##================================================================##

##================================================================##
# 日志文件的最后是一个统计报告
      $span = calcDeltaSeconds($start_date,timeString(time));
      LOG("上传结果:成功。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");

    }
    closeLogfile();
}

##================================================================##

##================================================================##
##=== 程序执行的第四步,写成功日志 ===============================##
if($g_nIsAllWAVsFile_UploadSuccess > 0 && $g_nUploadSuccess > 0)
{
    $logfilename = 'FTPUpload.MultiThread_Succ_'.shortTimeString(time).'.log';

    $log_cnt = 0;

    LOG("");
    LOG("郑昀(R)自动上传文件 Version 0.1");
    LOG("没有版权 (C) Linktone 2005-2006。不保留所有权利。");
    LOG("");
    LOG("上传结果:成功。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");
    LOG("");
    closeLogfile();
}
##================================================================##

##================================================================##
##=== 程序执行的第四步,写失败日志 ===============================##
if($g_nIsAllWAVsFile_UploadSuccess < 0 || $g_nUploadSuccess < 0)
{
    $logfilename = 'FTPUpload.MultiThread_Fail_'.shortTimeString(time).'.log';

    $log_cnt = 0;

    LOG("");
    LOG("郑昀(R)自动上传文件 Version 0.1");
    LOG("没有版权 (C) Linktone 2005-2006。不保留所有权利。");
    LOG("");
    LOG("上传结果:失败。失败原因:$g_strLastError。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");
    LOG("");
    closeLogfile();
}
##================================================================##


####################################################################

## 以下是子函数体的定义

##-------------------------------------------------------------##
##
## 函数名称:processFiles
## 功能:   
##         得到指定文件夹下的所有文件以及子文件夹,然后依次处理它们。
##
## 程序员:郑昀(Yun.Zheng@Linktone.com)
##
## 历史记录:
## 编号   日期          作者    备注
## 1       2005.1.4       郑昀   
##
##-------------------------------------------------------------##
sub processFiles
{
    my $srcdir = fsToBs($File::Find::dir);
    my $srcpath = fsToBs($File::Find::name);
    my $base = fsToBs($File::Find::topdir);

    foreach my $exclude (@wc_exclude) {
      if ( index($srcpath, $exclude)>-1 ) {
            $File::Find::prune = 1 if -d $srcpath;
            return;
      }
    }

    # no DIRECT processing of directories.
    if ( -d $srcpath ) {
      return;
    }

    my $dstdir = $srcdir;
    my $dstpath = $srcpath;
    $dstdir =~ s{\Q$base\E}{$ftp_dir}is;
    $dstpath =~ s{\Q$base\E}{$ftp_dir}is;
    $dstdir = bsToFs($dstdir);
    $dstpath = bsToFs($dstpath);

    #-------------------------------------
    # 早先版本,单线程逐次处理每一个文件
    #processFile($srcpath,$dstpath,$dstdir);
    #-------------------------------------
   
    #-------------------------------------
    # 20050114版本,多线程并发处理文件
    # 每次创建线程处理文件之前,先检查当前已启动的线程数目是否小于最大允许数目
    # 如果已经创建了足够多的线程,就不断地轮询
    while(1)
    {
      if($CurrentThreads < $MaxThreads)
      {
            # 创建线程,传入参数
            my $thread = threads->create('processFile', $srcpath, $dstpath, $dstdir);
            #my $thread = threads->create('processFile2',"zhengyun");
            # 创建一个thread以后要用join取得该thread的返回值,然后系统才会对thread进行清理,
            # 否则所有thread的信息都会保留下来,当然越积越多了。对返回值不关心的时候要用detach显式剥离该thread。
            push(@$self, \$thread);
            #$thread->detach();
            last; # 退出循环
      }
      else
      {
            # 静候一秒钟,注意这里sleep的参数是以秒为单位的:
            LOG("-sleep 1秒钟");
            sleep 1;
      }
    }
    #-------------------------------------
}
sub processFile2
{
    my ($prime) = @_;
    print "Thread started\n".$prime."\n";
}
##-------------------------------------------------------------##
##
## 函数名称:processFile
## 功能:   
##         对目标文件进行FTP上传,如果失败就重试若干次。
##
## 程序员:郑昀(Yun.Zheng@Linktone.com)
##
## 历史记录:
## 编号   日期          作者    备注
## 1       2005.1.4       郑昀   
##
##-------------------------------------------------------------##
sub processFile
{
    ## 当前新建的线程数加一:
    {
      lock($CurrentThreads);
      $CurrentThreads++;
    }
    my ($srcThread, $dstThread, $dstdirThread) = @_;

    ## 当前处理的总文件数加一:
    {
      lock($total_files);
      $total_files++;
      LOG("正在处理文件 $total_files \"$srcThread\"...");
    }   

    # --------------------
    # check time.

    my $need_upload = 0;
    my $bPutResult = 0;

    # create time.
    my $t1 = $lookup{$srcThread};
    my $t2 = timeString(stat($srcThread)->mtime);

    if ( not defined $t1 ) {
      $lookup{$srcThread} = $t2;
      $need_upload = 1;
    } else {
      my $delta_sec = calcDeltaSeconds($t1,$t2);
      $need_upload = 1 if $delta_sec>5;                   # 5 seconds as tolerance.
    }

    # --------------------

    if ( $need_upload > 0 )
    {
      my $nProcessIndex = 1;
      for ($nProcessIndex = 1; $nProcessIndex <= $ftp_retrytimes; $nProcessIndex++)
      {
            my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
            if($@)
            {
                $g_strLastError = "不能连接到FTP服务器,错误原因:".$@;
                LOG("$g_strLastError@\n");
            }
            else
            {
                $ftp->login($ftp_uid, $ftp_pw);
                if($@)
                {
                  $g_strLastError = "不能登陆FTP服务器,错误原因:".$@;
                  LOG("$g_strLastError\n");
                }
                else
                {
                  $ftp->binary;
                  LOG("第$nProcessIndex次尝试:正在上传文件:从源 \"$srcThread\" 到目标 \"$dstThread\"...当前活动线程数:$CurrentThreads");

                  {
                        $bPutResult = 0;
                        $ftp->mkdir($dstdirThread, 1);
                        $ftp->put($srcThread, $dstThread) or $bPutResult = -1;
                  }
                  if($bPutResult < 0)
                  {
                        LOG("第$nProcessIndex次尝试:不能上传文件:从源 \"$srcThread\" 到目标 \"$dstThread\" (dst-dir: \"$dstdirThread\")。\n");
                        if($@)
                        {
                            LOG("第$nProcessIndex次尝试:错误原因是:$@\n");
                        }
                  }
                  else
                  {
                        LOG("第$nProcessIndex次尝试:上传文件成功:从源 \"$srcThread\" 到目标 \"$dstThread\"!\n");
                        {
                            lock($processed_files);
                            $processed_files++;
                        }
                        ## 关闭连接
                        $ftp->quit() if($ftp);
                        last; # 退出循环
                  }
                }
            }
            
            ## 关闭连接
            $ftp->quit() if($ftp);
            
      }#for
               
      if($bPutResult < 0)
      {
            # 尝试了这么多次之后还是不能够上传,报告错误罢了
            {
                lock($skipped_files);
                $skipped_files++;
                lock($g_nIsAllWAVsFile_UploadSuccess);
                $g_nIsAllWAVsFile_UploadSuccess = -1;
            }
      }
    }
    else
    {
      {
            lock($skipped_files);
            $skipped_files++;
      }
    }
   
    ## 当前活动的线程数减一:
    {
      lock($CurrentThreads);
      $CurrentThreads--;
    }
}

####################################################################

sub bsToFs {
    my ($s) = @_;
    $s =~ s/\\/\//gis;
    return $s;
}

sub fsToBs {
    my ($s) = @_;
    $s =~ s/\//\\/gis;
    return $s;
}

sub timeString {
    my ($tm) = @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
    return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
}

sub shortTimeString {
    my ($tm) = @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
    return sprintf("%04d_%02d_%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);
}

# input dates as string "YYYY-MM-DD HH:MM:SS".
# earlier as first parameter, later as second.
sub calcDeltaSeconds {
    my ($t1,$t2) = @_;

    my ($year1,$month1,$day1,$hh1,$mm1,$ss1) = scanDate($t1);
    my ($year2,$month2,$day2,$hh2,$mm2,$ss2) = scanDate($t2);

    my ($days, $hours, $minutes, $seconds) = Delta_DHMS(
      $year1, $month1, $day1, $hh1, $mm1, $ss1,                   # earlier.
      $year2, $month2, $day2, $hh2, $mm2, $ss2);            # later.

    return $seconds + $minutes*60 + $hours*60*60 + $days*60*60*24.
}

sub removeFilename {
    my ($s) = @_;
    my $pos = rindex($s,'\\');
    return substr($s, 0, $pos);
}

# format: "2000-09-29 09:09:51".
sub scanDate {
    my ($date) = @_;
    my ($year, $month, $day, $hour, $minute, $seconds);

    $year         = substr($date, 0, 4);
    $month      = substr($date, 5, 2);
    $day            = substr($date, 8, 2);
    $hour         = substr($date, 11, 2);
    $minute   = substr($date, 14, 2);
    $seconds    = substr($date, 17, 2);

    return ($year, $month, $day, $hour, $minute, $seconds);
}

####################################################################

sub LOG {
    my ($text) = @_;
    my $time = timeString time;

    # log to stdout.
    print "[$time] $text\n";

    # log to logfile.
    my $LOG_STEP = 10;
    flushLogfile() if ($log_cnt % $LOG_STEP)==0 or $log_cnt==0;
    $log_cnt++;
    print HLOG "[$time] $text\n";
}

sub openLogfile {
    closeLogfile();
    open(HLOG,">>$logfilename") or die("打开日志文件出错:文件名为 $logfilename ;错误原因为: $!");
};

sub closeLogfile {
    close HLOG if defined HLOG;
}

sub flushLogfile {
    closeLogfile();
    openLogfile();
}

####################################################################


附录:
Upoad.config内容:
##================================================================##
## 配置的外部参数 ##
##

#- FTP服务器的IP地址 -#
ftp_server   =
#- 指定的FTP上传目录路径 -#
#! 切记:文件夹最后不要加"/"符号 !#
ftp_dir       =
#- FTP的登陆用户名 -#
ftp_uid    =
#- FTP的登陆密码 -#
ftp_pw    =
#- FTP的访问超时时间,以秒为单位 -#
ftp_timeout          =1800
#- FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误 -#
ftp_retrytimes         =10
#- FTP的详细的调试信息输出 -#
    ftp_debug            =1
##=====================================================##

##=====================================##
## 配置的外部参数 ##
##

#- 指定文件夹“语音文件”,放置所有要上传的语音文件 -#
#! 切记:文件夹最后不要加"\"符号 !#
src_dir_WAVFiles=

#- 指定文件夹“命名对照列表文件TXT”,放置命名对照列表文件 -#
#! 切记:文件夹最后不要加"\"符号 !#
src_dir_NamesListFile =
##=========================================##

##========================================##
## 配置的外部参数3 ##
##

#- 启动同时访问FTP站点的线程最大数目 -#
maxthreads    =1
##==========================================##



Disclaimers:
Programmer’s Blog List:

博客堂

博客园

Don Box's Blog
Eric.Weblog()

Blogs@asp.net


本文档仅供参考。本文档所包含的信息代表了在发布之日,zhengyun_ustc对所讨论问题的当前看法,zhengyun_ustc不保证所给信息在发布之日以后的准确性。
用户应清楚本文档的准确性及其使用可能带来的全部风险。可以复制和传播本文档,但须遵守以下条款:

[*]复制时不得修改原文,复制内容须包含所有页 ;
[*]所有副本均须含有 zhengyun_ustc的版权声明以及所提供的其它声明 ;
[*]不得以赢利为目的对本文档进行传播 。



Trackback: http://tb.blog.csdn.net/TrackBack.aspx?PostId=255453
页: [1]
查看完整版本: [Perl多线程]自动上传FTP文件的多线程扩展