打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
【说解】在shell中通过mkfifo创建命名管道来控制多个进程并发执行

背景:

工作中有两个异地机房需要传数据,数据全名很规范,在某个目录下命名为统一的前缀加上编号。如/path/from/file.{1..100}。而机房间的专线对单个scp进程的传输速度是有限制的,比如最大在100Mb/s,如果直接启动100个scp,则又会遇到ssh的并发连接数限制。

所以需要控制并发数,即不超过ssh的并发限制,又要让单网卡上的带宽接近饱和,尽快完成传输(假设专线带宽远大于单机网卡带宽)

实现

之前知道通过mkfifo创建一个命名管道,可以实现对并发的控制。现在来实现一个。

在此之前,如果对mkfifo不了解,可以参考这个连接,作者写得很详细,我就不造轮子了。

这里直接给出代码,并做一些解释。因为单进程的带宽如上所述,所以考虑9个并发。代码如下:

 1 #!/bin/bash 2  3 your_func() 4 {   # use your cmd or func instead of sleep here. don't end with background(&) 5     date +%s 6     echo 'scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/' 7     sleep 2 8 } 9 10 concurrent()11 {   # from $1 to $2, (included $1,$2 itself), con-current $3 cmd12     start=$1 && end=$2 && cur_num=$313 14     # ff_file which is opened by fd 4 will be really removed after script stopped15     mkfifo   ./fifo.$$ &&  exec 4<> ./fifo.$$ && rm -f ./fifo.$$16 17     # initial fifo: write $cur_num line to $ff_file18     for ((i=$start; i<$cur_num+$start; i++));="">do19         echo 'init time add $i' >&420     done21 22     for((i=$start; i<=$end; i++));="">do23         read -u 4   # read from mkfifo file24         {   # REPLY is var for read25             echo -e '-- current loop: [cmd id: $i ; fifo id: $REPLY ]'26 27             your_func $i28             echo 'real time add $(($i+$cur_num))'  1>&4 # write to $ff_file29         } & # & to backgroud each process in {}30     done31     wait    # wait all con-current cmd in { } been running over32 }33 34 concurrent 0 8 3

上面以3为并发数,执行0到8号共9次,以便显示如下执行结果。

 1 bash concurrent.sh 2 -- current loop: [cmd id: 0 ; fifo id: init time add 0 ] 3 -- current loop: [cmd id: 1 ; fifo id: init time add 1 ] 4 -- current loop: [cmd id: 2 ; fifo id: init time add 2 ] 5 1453518400 6 1453518400 7 scp HOSTNAME:/home/USER/path/from/file.0 REMOTE_HOST:/home/USER/path/to/ 8 scp HOSTNAME:/home/USER/path/from/file.2 REMOTE_HOST:/home/USER/path/to/ 9 145351840010 scp HOSTNAME:/home/USER/path/from/file.1 REMOTE_HOST:/home/USER/path/to/11 -- current loop: [cmd id: 3 ; fifo id: real time add 3 ]12 -- current loop: [cmd id: 4 ; fifo id: real time add 5 ]13 -- current loop: [cmd id: 5 ; fifo id: real time add 4 ]14 145351840215 scp HOSTNAME:/home/USER/path/from/file.3 REMOTE_HOST:/home/USER/path/to/16 145351840217 145351840218 scp HOSTNAME:/home/USER/path/from/file.5 REMOTE_HOST:/home/USER/path/to/19 scp HOSTNAME:/home/USER/path/from/file.4 REMOTE_HOST:/home/USER/path/to/20 -- current loop: [cmd id: 6 ; fifo id: real time add 6 ]21 -- current loop: [cmd id: 7 ; fifo id: real time add 7 ]22 -- current loop: [cmd id: 8 ; fifo id: real time add 8 ]23 145351840424 scp HOSTNAME:/home/USER/path/from/file.6 REMOTE_HOST:/home/USER/path/to/25 145351840426 145351840427 scp HOSTNAME:/home/USER/path/from/file.7 REMOTE_HOST:/home/USER/path/to/28 scp HOSTNAME:/home/USER/path/from/file.8 REMOTE_HOST:/home/USER/path/to/

从date输出的时间上,可以看出,每2秒会执行3个并发。

说明

整体过程

设N的值为并发数。通过在fifo中初始化N行内容(可以为空值),再利用fifo的特性,从fifo中每读一行,启动一次your_func调用,当fifo读完N次时,fifo为空。再读时就会阻塞。这样开始执行时就是N个并发(1-N)。

当并发执行的进程your_func,任意一个完成操作时,下一步会招待如下语句:

echo 'real time add $(($i+$cur_num))'  1>&4

这样就对fifo新写入了一行,前面被阻塞的第N+1号待执行的进程read成功,开始进入{}语句块执行。这样通过read fifo的阻塞功能,实现了并发数的控制。

需要注意的是,当并发数较大时,多个并发进程即使在使用sleep相同秒数模拟时,也会存在进程调度的顺序问题,因而并不是按启动顺序结束的,可能会后启动的进程先结束。

从而导致如下语句所示的输出中,两个数字并不一定是相等的。并发数越大,这种差异性越大。

-- current loop: [cmd id: 8 ; fifo id: real time add 9 ]

自定义函数

修改自定义函数your_func,这个函数实际只需要一行就完成了。

your_func(){   # use your cmd or func instead of sleep here. don't end with background(&)    date +%s    scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/}

需要注意的是,scp命令最后不需要添加压后台的&符号。因为在上一级就已经压后台并发了。

再来说明concurrent函数的第14行。

exec digit<>  filename

这是一个平常很少使用到的命令。特别是‘<>’这个符号。既然不明白我们来查一下系统帮助。

man bash# search 'exec 'Opening File Descriptors for Reading and Writing       The redirection operator              [n]<>word       causes  the  file  whose  name  is  the  expansion  of word to be opened for both reading and writing on file       descriptor n, or on file descriptor 0 if n is not specified.  If the file does not exist, it is created.

通过man bash来搜索exec加空格,会找到对exec的说明。注意如果直接man exec,会搜索到linux programer's manual,是对execl, execlp, execle, execv, execvp, execvpe - execute a file这一堆系统函数的调用说明。

还要注意哦,4<> 这几个字符不要加空格,必然连着写。word前可以加空格。

rm file

mkfifo先创建管道文件,再通过exec将该文件绑定到文件描述符4。也许你在疑惑后面的rm操作。其实当该文件绑定到文件描述符后,内核已经通过open系统调用打开了该文件,这个时候执行rm操作,删除的是文件的Inode,但concurrent函数已经连接到文件的block块区。

如果你遇到过这样的情况,你就明白了:如果线上的nginx日志是没有切分的,access.log会越来越大,这时你直接rm access.log文件后,文件不见了,但df查看系统并没有释放磁盘空间。这就是因为rm只是删除了inode,但这之前nginx早已经通过open打开了这个文件,nginx进程的进程控制块中的文件描述符表中对应的fd,已经有相应的文件指针指向了该文件在内存中的文件表,以及其在内存中的v节点表,并最终指向文件的实际存储块。因此nginx依然可以继续写日志,磁盘还在被写入。只有重启或者reload,让进程重新读一次配置,重新打开一遍相应的文件时,才会发现该文件不存在的,并新建该文件。而这时因为Inode节点已经释放,再用df查看时就能看到可用空间增大了。

不懂可以参考APUE的图3.1及想着说明。

因此14行的rm并不影响后继脚本执行,直到脚本结束,系统收回所有文件描述符。

初始化

18-20行在做初始化管道的工作。其中读取管道有两类写法:

1 # style 12     for ((i=$start; i<$cur_num+$start; i++));="">do3         echo 'init time add $i' >&44     done5 6 # style 27     for ((i=$start; i<$cur_num+$start; i++));="">do8         echo 'init time add $i'9     done >&4

差别就是‘>&4’ 这几个字符放在echo语句后面,还是放在done后面,两者都可以,前者针对echo语句,后者针对整个for循环。

同理,在下一个for循环中,read命令也有两种方式:

# style 1for((i=$start; i<=$end; i++));="">do        read -u 4           {               your_func $i            echo 'real time add $(($i+$cur_num))'  1>&4 # write to $ff_file        } & done# style 2for((i=$start; i<=$end; i++));="">do        read           {               your_func $i            echo 'real time add $(($i+$cur_num))'  1>&4 # write to $ff_file        } & done <>4

关于REPLY

再解释一下REPLY变量。这是上述循环中,用来存放read命令从fifo中读到的内容。其实在整个脚本中,是不需要关注这个点的。不过这里随带也解释一下。

通过能fifo的不断读写,才实现了echo如下语句:

-- current loop: [cmd id: 7 ; fifo id: real time add 7 ]

如何了解到REPLY呢?我们又得man一下了。为了找到read的参数。先man read发现不对。再如下查找,因为read是bash自建命令。

1 man  bash 2 # search  'Shell Variables'3 4        REPLY  Set to the line of input read by the read builtin command when no arguments are supplied.

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
一些让人很容易忽视的shell基础知识
linux下unlink函数的使用
Linux Shell实现多进程并发执行
进程间通信之管道(pipe、fifo)
使用 scp 命令批量复制文件
Ubuntu备份
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服