问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

一个for循环的Python脚本程序中如何加入多进程(并发进程)呢,急急急...

发布网友 发布时间:2022-04-22 23:41

我来回答

9个回答

懂视网 时间:2022-05-10 10:11

本篇文章给大家带来的内容是关于python多进程控制的教程讲解(附示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

multiprocessing简介

multiprocessing是python自带的多进程模块,可以大批量的生成进程,在服务器为多核CPU时效果更好,类似于threading模块。相对于多线程,多进程由于独享内存空间,更稳定安全,在运维里面做些批量操作时,多进程有更多适用的场景

multiprocessing包提供了本地和远程两种并发操作,有效的避开了使用子进程而不是全局解释锁的线程,因此,multiprocessing可以有效利用到多核处理

Process类

在multiporcessing中,通过Process类对象来批量产生进程,使用start()方法来启动这个进程

1.语法

multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*)

group: 这个参数一般为空,它只是为了兼容threading.Tread
target: 这个参数就是通过run()可调用对象的方法,默认为空,表示没有方法被调用
name: 表示进程名
args: 传给target调用方法的tuple(元组)参数
kwargs: 传给target调用方法的dict(字典)参数

2.Process类的方法及对象

run()
该方法是进程的运行过程,可以在子类中重写此方法,一般也很少去重构

start()
启动进程,每个进程对象都必须被该方法调用

join([timeout])
等待进程终止,再往下执行,可以设置超时时间

name
可以获取进程名字,多个进程也可以是相同的名字

is_alive()
返回进程是否还存活,True or False,进程存活是指start()开始到子进程终止

daemon
守护进程的标记,一个布尔值,在start()之后设置该值,表示是否后台运行
注意:如果设置了后台运行,那么后台程序不运行再创建子进程

pid
可以获取进程ID

exitcode
子进程退出时的值,如果进程还没有终止,值将是None,如果是负值,表示子进程被终止

terminate()
终止进程,如果是Windows,则使用terminateprocess(),该方法对已经退出和结束的进程,将不会执行

以下为一个简单的例子:

#-*- coding:utf8 -*- 
import multiprocessing
import time

def work(x):
 time.sleep(1)
 print time.ctime(),'这是子进程[{0}]...'.format(x)

if __name__ == '__main__':
 for i in range(5):
 p = multiprocessing.Process(target=work,args=(i,))
 print '启动进程数:{0}'.format(i)
 p.start()
 p.deamon = True

1225714763-5993b00b4eb7b_articlex.png

当然也可以显示每个进程的ID

#-*- coding:utf8 -*- 
import multiprocessing
import time
import os

def work(x):
 time.sleep(1)
 ppid = os.getppid()
 pid = os.getpid()
 print time.ctime(),'这是子进程[{0},父进程:{1},子进程:{2}]...'.format(x,ppid,pid)

if __name__ == '__main__':
 for i in range(5):
 p = multiprocessing.Process(target=work,args=(i,))
 print '启动进程数:{0}'.format(i)
 p.start()
 p.deamon = True

2901093574-59a50a956cd8e_articlex.png

但在实际使用的过程中,并不只是并发完就可以了,比如,有30个任务,由于服务器资源有限,每次并发5个任务,这里还涉及到30个任务怎么获取的问题,另外并发的进程任务执行时间很难保证一致,尤其是需要时间的任务,可能并发5个任务,有3个已经执行完了,2个还需要很长时间执行,总不能等到这两个进程执行完了,再继续执行后面的任务,因此进程控制就在此有了使用场景,可以利用Process的方法和一些multiprocessing的包,类等结合使用

进程控制及通信常用类

一、Queue类

类似于python自带的Queue.Queue,主要用在比较小的队列上面
语法:

multiprocessing.Queue([maxsize])

类方法:
qsize()
返回队列的大致大小,因为多进程或者多线程一直在消耗队列,因此该数据不一定正确

empty()
判断队列是否为空,如果是,则返回True,否则False

full()
判断队列是否已满,如果是,则返回True,否则False

put(obj[, block[, timeout]])
将对象放入队列,可选参数block为True,timeout为None

get()
从队列取出对象

#-*- coding:utf8 -*-
from multiprocessing import Process, Queue

def f(q):
 q.put([42,None,'hi'])

if __name__ == '__main__':
 q = Queue()
 p = Process(target=f, args=(q,))
 p.start()
 print q.get() #打印内容: [42,None,'hi']
 p.join()

二、Pipe类

pipe()函数返回一对对象的连接,可以为进程间传输消息,在打印一些日志、进程控制上面有一些用处,Pip()对象返回两个对象connection,代表两个通道,每个connection对象都有send()和recv()方法,需要注意的是两个或以上的进程同时读取或者写入同一管道,可能会导致数据混乱,测试了下,是直接覆盖了。另外,返回的两个connection,如果一个是send()数据,那么另外一个就只能recv()接收数据了

#-*- coding:utf8 -*-
from multiprocessing import Process, Pipe
import time
def f(conn,i):
 print '[{0}]已经执行到子进程:{1}'.format(time.ctime(),i)
 time.sleep(1)
 w = "[{0}]hi,this is :{1}".format(time.ctime(),i)
 conn.send(w)
 conn.close()

if __name__ == '__main__':
 reader = []
 parent_conn, child_conn = Pipe()
 for i in range(4):
 p = Process(target=f, args=(child_conn,i))
 p.start()
 reader.append(parent_conn)
 p.deamon=True

 # 等待所有子进程跑完
 time.sleep(3)
 print '
[{0}]下面打印child_conn向parent_conn传输的信息:'.format(time.ctime())
 for i in reader:
 print i.recv()

输出为:

3420055133-59c4d415e67dc_articlex.png

三、Value,Array

在进行并发编程时,应尽量避免使用共享状态,因为多进程同时修改数据会导致数据破坏。但如果确实需要在多进程间共享数据,multiprocessing也提供了方法Value、Array

from multiprocessing import Process, Value, Array

def f(n, a):
 n.value = 3.1415927
 for i in range(len(a)):
 a[i] = -a[i]

if __name__ == '__main__':
 num = Value('d',0.0)
 arr = Array('i', range(10))

 p = Process(target=f, args=(num, arr))
 p.start()
 p.join()

 print num.value
 print arr[:]

*print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]*

四、Manager进程管理模块

Manager类管理进程使用得较多,它返回对象可以操控子进程,并且支持很多类型的操作,如: list, dict, Namespace、lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array,因此使用Manager基本上就够了

from multiprocessing import Process, Manager

def f(d, l):
 d[1] = '1'
 d['2'] = 2
 d[0.25] = None
 l.reverse()

if __name__ == '__main__':
 with Manager() as manager:
 d = manager.dict()
 l = manager.list(range(10))

 p = Process(target=f, args=(d, l))
 p.start()
 p.join() #等待进程结束后往下执行
 print d,'
',l

输出:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
可以看到,跟共享数据一样的效果,大部分管理进程的方法都集成到了Manager()模块了

五、对多进程控制的应用实例

 #-*- coding:utf8 -*-
 from multiprocessing import Process, Queue
 import time
 
 def work(pname,q):
 time.sleep(1)
 print_some = "{0}|this is process: {1}".format(time.ctime(),pname)
 print print_some
 q.put(pname)
 
 if __name__ == '__main__':
 p_manag_num = 2 # 进程并发控制数量2
 # 并发的进程名
 q_process = ['process_1','process_2','process_3','process_4','process_5']
 q_a = Queue() # 将进程名放入队列
 q_b = Queue() # 将q_a的进程名放往q_b进程,由子进程完成
 
 for i in q_process:
  q_a.put(i)
 
 p_list = [] # 完成的进程队列
 while not q_a.empty():
  if len(p_list) <= 2:
  pname=q_a.get()
  p = Process(target=work, args=(pname,q_b))
  p.start()
  p_list.append(p)
  print pname
 
  for p in p_list:
  if not p.is_alive():
   p_list.remove(p)
 
 # 等待5秒,预估执行完后看队列通信信息
 # 当然也可以循环判断队列里面的进程是否执行完成
 time.sleep(5)
 print '打印p_b队列:'
 while not q_b.empty():
  print q_b.get()

执行结果:

1132967470-59a5182251923_articlex.png

热心网友 时间:2022-05-10 07:19

#下面是一个示例,我写了一个简单的for循环,并加入了多线程并发。
# -*- coding:utf-8 -*-
import thread,threading

#Test Function
def ForTest():
    for i in range(10):
        print i
        
class mythread(threading.Thread):
    def __init__(self,threadname):
        threading.Thread.__init__(self)
    def run(self):
        lock.acquire() 
        for j in xrange(int(times)):
            #Add Own Fuction Here
            ForTest()
        lock.release()
        
def MutiThread(num,times):
    threads=[]
    global ft
    for x in xrange(num):
        threads.append(mythread(num))
    for t in threads:
        lock.acquire()
        t.start() 
        lock.release()
    for t in threads:
        t.join()
if __name__ == '__main__':
    global num,times,lock
    
    num=2       #num 并发数
    times=2     #times 运行次数
    
    lock=threading.Lock()
    MutiThread(num,times)

运行结果:

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

0

1

2

3

4

5

6

7

8

9

热心网友 时间:2022-05-10 08:37

from multiprocessing import Process
#用这个list 保存进程
p_list = list()
# 先一起定义好,放到一个list里面
for i in range(100):
-----p1 = Process(target=目标函数, name='名字')
-----p_list .append(p1)
# 批量启动
for p in p_list:
-----p.daemon = True
-----p.start()
----- 这个是用来代替缩进的,方便观看代码,使用的时候记得删除。

热心网友 时间:2022-05-10 10:12

什么意思呢?如果你是在要在循环体内创建多进程,每循环一次便增加一个进程,那么只需要把创建多进程的代码写入循环体内就好了,完全与在循环体外无任何区别。
如果你想要让整个循环创建的所有进程在执行时间上大致同步,那么我建议你先按循环长度创建好进程和管道或者队列,推荐队列,然后循环时将变量推送给不同的进程处理就行了。

热心网友 时间:2022-05-10 12:03

简单的如下

from multiprocessing import Process
def ps(i):
print(str(i))
def run():
for i in range(5):
Process(target=ps, args=(i, )).start()
if __name__ == "__main__":
    run()

热心网友 时间:2022-05-10 14:11

实现rannable接口的run方法

热心网友 时间:2022-05-10 16:36

# 假如原脚本大概是这样的话

def main(name):
print(name)
if __name__ == '__main__':

my_list = ['新垣结衣', '长泽雅美', '波多老师', '濑亚老师']
for my_love in my_list:
main(my_love)
-----------------------------------------------------------
# 那么我们导入多进程包
from multiprocessing import Pool
# 讲上面的例子改写成如下的样式
def main(name):
print(name)
if __name__ == '__main__':

my_list = ['新垣结衣', '长泽雅美', '波多老师', '濑亚老师']
pool = Pool()

pool.map(main, [my_love for my_love in my_list])

热心网友 时间:2022-05-10 19:17

百度:python futures

热心网友 时间:2022-05-10 22:15

参考一下这个博文:http://www.coder4.com/archives/3352
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
怀孕错过了糖筛怎么办 糖筛有哪些注意事项 怀孕几个月查唐筛 蔡家沟镇慈云寺 哈佳高铁高楞站多少平方米位置在那 超威电池生产日期怎么看 超威电池寿命一般多长时间 华晨宇鸟巢四面台10w+秒罄!!火星演唱会舞台概念图震撼发布! 手机连接160wifi电脑蓝屏使用160WiFi遇到问题应该如何解决 为什么手机wifi总是刚开启,锁屏就掉了。wifi设置里的休眠模式也是永久的... 建行卡为什么不能网上转账 学校发我们的建设银行卡一定要激活吗? 我打电话查余额说输入取钱密码... 本人现急需一份参加社会实践活动的证明 请教一个关于python socket的问题 急!急!急!暑期社会实践证明… python编程中线程结束的问题 中学生社会实践证明有什么格式么~? python3 如何在线程间进行事件通知 实习证明格式是什么? python thread怎么强制结束一个已开启核对进城 关于暑假社会实践证明 Python面试题,线程与进程的区别,Python中如何创建多线程? 作文:节约粮食珍惜粮食 暑期社会实践证明格式是什么? 北林暑期社会实践证明怎么开??有什么格式吗? 我是在家乡的企业进行了实习,不知道怎样开证明。 社会实践证明怎么开 节约粮食的格言 怎么写社会实践证明 暑假社会实践证明怎么写比较好? 我们如何从自身做起节约粮食? 大学的暑假社会实践证明该怎么写呀? 暑期社会实践证明怎么写?? 暑期社会实践证明 关于python多线程的一些问题。 json的简单介绍及基本使用 有3个文件1.txt 2.txt 3.txt 我想利用python多线程同时查看3个文件的... js中for遍历出了多个json,如何把这些json存到数组? js怎么获取json数据里重复的值? python 多进程通讯 使用什么好 json s数据能传给异步任务吗 android Python socket 如何实现广播单播切换 javascript 解析json数据的问题 如何用python方法检测UDP端口 为什么在解析json中汉字解析不了 python threads can only be started once DELPHi分布式多层设计,的瘦客户机的理解 c#读取json linux下vyysh命令有什么作用啊?经常见到! json对象获取属性名 python 多线程如何延时 json数据格式,xml数据格式的区别和用法 referenceerror lessonjsons啥意思