python3多任务进程线程

1,多任务原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
'''

现代操作系统(windows、Mac os X、Linux、UNIX等)都支持"多任务"
多任务:操作系统同时可以运行多个任务

单核CPU实现多任务原理:操作系统轮流让各个任务交替执行,QQ执行2us,切换到微信,在执行2us,再切换到钉钉,执行2us
表面上看,每个任务反复执行下去,但是CPU调度执行速度太快了,导致我们感觉就像所有任务都在执行一样。

多核CPU实现多任务原理:真正的并行执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU核心数量,所以操作系统也会自动把
很多任务轮流调度到每个核心上执行
并发:看上去一起执行。任务数对于CPU核心数
并行:真正的一起执行。任务数小于等于CPU核心数。

实现多任务的方式:
1、多进程模式 其次常用
2、多线程模式 常用1
3、协程模式
4、多进程+多线程模式

'''

2, 进程

1
2
3
4
5
6
'''
对于操作系统而言,一个任务就是一个进程

进程是系统中程序执行和资源分配的基本单位。每个进程都有自己的数据段、代码段、和堆栈段

'''

1,单任务现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from time import sleep


def run():
while True:
print("yichen is a nice man")
sleep(1.2)


if __name__ == "__main__":
while True:
print("yichen is a good man")
sleep(1)

#不会执行到run方法。只有上面的while循环结束才可以执行
run()

2,启动进程实现多任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
'''
multiprocessing 库
跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象

'''
from multiprocessing import Process

from time import sleep
import os

#子进程需要执行的代码
def run(str):
while True:
#os.getpid() 获取当前进程的id号
#os.getppid() 获取当前进程的父进程id号
print("yichen is a %s man--%s--%s"%(str,os.getpid(),os.getppid()))
sleep(1.2)


if __name__ == "__main__":

print("主(父)进程启动--%s"%(os.getpid()))
# 创建子进程
#target说明进程执行的任务
p =Process(target=run,args=("handsome",))
#启动进程
p.start()

while True:
print("yichen is a good man")
sleep(1)

3,父子进程的先后顺序join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

from multiprocessing import Process

from time import sleep
import os

#子进程需要执行的代码
def run(str):
print("子进程启动")
sleep(3)
print("子进程结束")


if __name__ == "__main__":

print("父进程启动--%s"%(os.getpid()))

p =Process(target=run,args=("handsome",))
#启动子进程
p.start()
sleep(1)
#父进程的结束不能影响子进程,让父进程等待子进程结束后再执行父进程
#p.join 等待p子进程结束后再执行后面的内容
p.join()
print("父进程结束")

4,全局变量在多个进程中不能共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from multiprocessing import Process
from time import sleep
num = 100

def run():
print("子进程开始")
global num # 相当于引入了num = 100
num += 1
print(num)
print("子进程结束")



if __name__ =="__main__":
print("父进程开始")
p = Process(target=run)
p.start()
p.join()

p2 = Process(target=run)
p2.start()
p2.join()

# 在子进程中修改全局变量对父进程中的全局变量没有影响
# 在创建子进程时对全局变量做了一个备份,父进程中的与子进程的num是完全不同的两个变量
print("父进程结束--%d"%num)

5,启动大量的子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from multiprocessing import Pool
import os, time, random

def run(name):
print("子进程%s启动--%s"%(name,os.getpid()))
start =time.time()
time.sleep(random.choice([1,2,3]))
end = time.time()
print("子进程%s结束--%s耗时%.2f" % (name, os.getpid(),end-start))



if __name__ =="__main__":
print("父进程启动")

#创建多个进程
#进程池
#表示可以同时执行的进程数量,看有多少线程
#Pool默认大小是CPU核心数4核8处理器,就是同时处理8个进程
pp =Pool()
for i in range(10):
#创建进程,放入进程池统一管理
pp.apply_async(run,args=(i,))
#自定义创建进程名字参数1
pp.apply_async(run,args=("11",))
pp.apply_async(run,args =("test",))

#在调用join之前必须先调用close,并且调用close之后就不能再继续添加新的进程了
pp.close()
#进程池对象调用的join,会等待进程中的所有的子进程结束完毕再去执行父进程
pp.join()

print("父进程结束")

6, 拷贝文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os,time
from multiprocessing import Pool

#实现文件的拷贝
def copyFile(rPath,wPath):
fr = open(rPath, "rb")
fw = open(wPath, "wb")
context = fr.read()
fw.write(context)
fr.close()
fw.close()


path= r"D:\py_work\grep\爬虫\files"
toPath = r"D:\py_work\grep\爬虫\tofile"

fileList = os.listdir(path)

#单任务实现文件拷贝
'''
#启动for循环处理每一个文件
start = time.time()
for fileName in fileList:
copyFile(os.path.join(path,fileName),os.path.join(toPath,fileName))

end = time.time()
print("总耗时: %0.2f"%(end-start))

'''

#多进程实现文件拷贝

if __name__== "__main__":
start = time.time()
pp = Pool(8)
for fileName in fileList:
pp.apply_async(copyFile,args=(os.path.join(path,fileName),os.path.join(toPath,fileName)))

pp.close()
pp.join()
end = time.time()
print("总耗时: %0.2f" % (end - start))

7, 封装进程对象

创建一个自定义文件yichenProcess.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os,time

class YichenProcess(Process):
def __init__(self,name):
Process.__init__(self)
self.name = name

def run(self):
print("子进程(%s-%s)启动"%(self.name,os.getpid()))

#子进程的功能
time.sleep(3)
print("子进程(%s-%s)结束" % (self.name, os.getpid()))

主程序执行

1
2
3
4
5
6
7
8
9
10
11
from yichenProcess import YichenProcess

if __name__ == "__main__":
print("父进程启动")

#创建子进程
p =YichenProcess("test")
#自动调用p进程对象的run方法
p.start()
p.join()
print("父进程结束")

8,进程间的通信-Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from multiprocessing import Process,Queue
import os,time

def write(q):
print("启动写子进程%s"%(os.getpid()))
for chr in ["A","B","C","D"]:
q.put(chr)
time.sleep(1)
print("结束写子进程%s" % (os.getpid()))


def read(q):

print("启动读子进程%s"%(os.getpid()))
while True:
value = q.get(True)
print("value= "+ value)

print("结束读子进程%s" % (os.getpid()))



if __name__ == "__main__":

#父进程创建队列,并传递给子进程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target= read, args=(q,))


pw.start()
pr.start()

pw.join()
#pr进程是个死循环,无法等待其结束,只能强行结束
pr.terminate()
print("父进程结束")

3,线程

1,线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
'''

在一个进程的内部,要同时干多件事,就需要同时运行多个"子任务",我们把进程内的这些"子任务",叫做线程

线程通常叫做轻型的进程。线程是共享内存空间的并发执行的多任务,每一个线程都共享一个进程的资源

线程是最小的执行单元,而进程有至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么
时候执行,执行多长时间。

模块
1、_thread模块 低级模块(接近底层)
2、threading模块 高级模块,对_thread进行了封装

'''

2,启动一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading ,time

a =10
def run(num):
print("子线程(%s)开始"%(threading.current_thread().name))
time.sleep(2)
#实现线程的功能
print("打印" ,num)
time.sleep(2)
print(a)
print("子线程(%s)结束" % (threading.current_thread().name))

if __name__ =="__main__":
#任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
#current_thread(): 返回当前线程的实例
print("主线程(%s)启动"%(threading.current_thread().name))

#创建子线程 线程的名称
t = threading.Thread(target=run,name ="runThread",args=(1,))
t.start()
#等待线程结束
t.join()

print("主线程(%s)结束" % (threading.current_thread().name))

3,线程间共享数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
'''

多线程和多进程最大的不同在于,多进程中,同一个变量各自有一份拷贝存在每个进程中,
互不影响。而多线程中,所有变量都有所有线程共享。所以任何一个变量都可以被任意一个线程修改,因此,线程之间共享数据最大
的危险在于多个线程同时修改一个变量,容易把内容改乱了。
'''

num = 0

def run(n):
global num
for i in range(100000):
num = num + n
num = num - n



if __name__ =="__main__":
t1= threading.Thread(target=run,args=(6,))
t2 = threading.Thread(target=run,args =(9,))
t1.start()
t2.start()
t1.join()
t2.join()


print("num = ", num)

'''
线程1 num + 6

线程2 num = num + 9
线程2 num -9

线程1 num = 9

'''

4,线程锁解决数据混乱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

import threading

'''
两个线程同时工作,一个存钱,一个取钱

可能导致数据异常

思路:加锁


'''

#锁对象
lock = threading.Lock()

num = 0

def run(n):
global num

for i in range(1000000):
'''
# 锁
# 确保了这段代码只能由一个线程从头到尾的完整执行
#阻止了多线程的并发执行,包含锁的某段代码实际上只能以单线程模式执行,所以效率大大的降低了。
#由于可以存在多个锁,不同线程持有不同的锁,并试图获取其他的锁,可能造成死锁,导致多个线程挂起。
只能靠操作系统强制终止
lock.acquire()
try:
num = num + n
num = num - n
finally:
#修改完一定要释放锁
lock.release()
'''
#与上面的代码功能相同, with lock可以自动上锁与解锁
with lock:
num = num + n
num = num - n


if __name__ =="__main__":
t1= threading.Thread(target=run,args=(6,))
t2 = threading.Thread(target=run,args =(9,))
t1.start()
t2.start()
t1.join()
t2.join()


print("num = ", num)

5,ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading

num = 0
#创建一个全局的ThreadLocal对象
#每个线程有独立的存储空间
#每个线程对ThreadLocal对象都可以读写,但是互不影响。
local = threading.local()
list = [num,num]

def run(x,n):
x = x + n
x = x - n

def func(n):
#每个线程都有local.x,就是线程的局部变量
local.x = num
for i in range(100000):
run(local.x,n)
print("%s-%d"%(threading.current_thread().name,local.x ))

if __name__ =="__main__":
t1= threading.Thread(target=func,args=(6,))
t2 = threading.Thread(target=func,args =(9,))
t1.start()
t2.start()
t1.join()
t2.join()


print("num = ", num)

#作用:为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便的访问这些资源。

6,信号量控制线程数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading,time


sem = threading.Semaphore(3)
def run():
while sem:
for i in range(5):
print("%s --= %d"%(threading.current_thread().name,i))
time.sleep(1)

if __name__ =="__main__":

for i in range(5):

threading.Thread(target=run).start()

7,凑够一定数量才能一起执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading,time


bar = threading.Barrier(3)
def run():

print("%s --start "%(threading.current_thread().name))
time.sleep(1)
bar.wait()

print("%s --end " % (threading.current_thread().name))
if __name__ =="__main__":

for i in range(5):

threading.Thread(target=run).start()

8,定时线程

1
2
3
4
5
6
7
8
9
10
11
12
import threading

def run():
print("yichen is a good man")


#延时执行线程
t= threading.Timer(5,run)
t.start()
t.join()

print("父线程结束")

9,线程通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading,time



def func():
#事件对象
event = threading.Event()

def run():
for i in range(5):
#阻塞,等待事件的触发
event.wait()
#重置
event.clear()
print("yichen is a good man!! %d"%(i))
t = threading.Thread(target= run).start()
return event


e = func()

#触发事件

for i in range(5):
time.sleep(2)
e.set()

10,生产者与消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading,queue,time,random

#生产者
def product(id,q):
while True:
num = random.randint(0,10000)
q.put(num)
print("生产者%d生产了%d数据放入了队列"%(id,num))
time.sleep(3)
#任务完成
q.task_done()




#消费者
def customer(id,q):
while True:
item = q.get()
if item is None:
break

print("消费者%d消费了%d数据"%(id,item))
time.sleep(3)
# 任务完成
q.task_done()


if __name__ =="__main__":
#消息队列
q = queue.Queue()

#启动生产者
for i in range(4):
threading.Thread(target=product,args=(i,q)).start()

#启动消费者
for i in range(3):
threading.Thread(target=customer,args=(i,q)).start()

11,线程调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading,time

cond = threading.Condition()
def run1():
with cond:
for i in range(0,10, 2):
print(threading.current_thread().name, i)
#time.sleep(1)
cond.wait()
cond.notify()
def run2():
with cond:
for i in range(1,10, 2):
print(threading.current_thread().name, i)
#time.sleep(1)
cond.notify()
cond.wait()


threading.Thread(target=run1).start()
threading.Thread(target=run2).start()

4,协程

1,协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53


'''
子程序/函数: 在所有语言中都是层级调用,比如A调用B,在B执行的过程中又可以调用C,C执行完毕返回,
B执行完毕返回,最后是A执行完毕,是通过栈实现的,一个线程就是执行一个子程序,子程序调用总是一个入口,
一次返回,调用的顺序是明确的。

概述:看上去也是子程序,但执行过程中,在子程序的内部可中断,然后转而去执行别的子程序,不是函数调用。

'''
'''
def C():
print("C--start")

print("C--end")

def B():
print("B--start")
C()
print("B--end")
def A():
print("A--start")
B()
print("A--end")

A()
'''
def A():
print(1)
print(2)
print(3)


def B():
print("x")
print("y")
print("z")


'''
1
2
x
y
z
3
执行出这个结果
但是A中没有B的调用
看起来A、B执行过程中有点像线程,但协程的特点在于是一个线程执行

与线程相比,协程的执行效率极高,因为只有一个线程,也不存在同时写变量额冲突,在协程中共享资源部加锁,只需要判断状态

'''

2,协程原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22


'''
Python对协程的支持是通过generator实现的
'''

def run():
print(1)
yield 10
print(2)
yield 20
print(2)
yield 30


#协程的最简单风格,控制函数的阶段执行,节约线程或者进程的切换
#返回值是一个生成器

m = run()
print(next(m))
print(next(m))
print(next(m))

3,数据传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22


def run():
#空变量,存储的作用data始终为空
data = ""
r = yield data
print(1,r,data)
r = yield data
print(2,r, data)
r = yield data
print(3,r,data)
r = yield data


m = run()

#启动m
print(m.send(None))
print(m.send("a"))
print(m.send("b"))
print(m.send("c"))
print("********")

评论


:D 一言句子获取中...

加载中,最新评论有1分钟缓存...