多线程
python原创python多线程大约 4 分钟约 1282 字
多核并行等待
#! coding=utf-8
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor ,wait
print(cpu_count())
def play(pari):
time.sleep(2)
print(pari)
return {'url':'url'}
def doneFunc(obj):
'''
回调函数
'''
res=obj.result();
# 获取返回的内容
print(res)
print('done')
import time
if __name__ == '__main__':
start = time.time()
# result = ProcessPoolExecutor().map(play, range(1,100))
futures = {}
with ProcessPoolExecutor() as p:
for i in range(30):
job=p.submit(play,i).add_done_callback(doneFunc)
futures[job]=i
# 等待执行
# wait(futures)
p.shutdown(True)
end = time.time()
print("耗时:", end - start)
print(123)
多cpu并行处理
#! coding=utf-8
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
print(cpu_count())
def play(pari):
time.sleep(2)
print(pari)
import time
if __name__ == '__main__':
start = time.time()
result = ProcessPoolExecutor().map(play, range(1,1000))
end = time.time()
print("耗时:", end - start)
print(123)
异步模块
asyncio
提示
Asyncio 和其他 Python 程序一样,是单线程的,它只有一个主线程,但可以进行多个不同的任务 loop.run_in_executor(executor, get_url, url):线程池调用
import asyncio
import time
# 定义第1个协程,协程就是将要具体完成的任务,该任务耗时3秒,完成后显示任务完成
async def to_do_something(i):
print('第{}个任务:任务启动...'.format(i))
# 遇到耗时的操作,await就会使任务挂起,继续去完成下一个任务
await asyncio.sleep(i)
print('第{}个任务:任务完成!'.format(i))
# 定义第2个协程,用于通知任务进行状态
async def mission_running():
print('任务正在执行...')
start = time.time()
# 创建一个循环
loop = asyncio.get_event_loop()
# 创建一个任务盒子tasks,包含了3个需要完成的任务
tasks = [asyncio.ensure_future(to_do_something(1)),
asyncio.ensure_future(to_do_something(2)),
asyncio.ensure_future(mission_running())]
# tasks接入loop中开始运行
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(end - start)
配合线程池使用
# 使用多线程:在协程中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
# 建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80)) # 阻塞不会消耗cpu
# 不停的询问连接是否建立好, 需要while循环不停的去检查状态
# 做计算任务或者再次发起其他的连接请求
client.send(
"GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
# 返回 task
task = loop.run_in_executor(executor, get_url, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time()-start_time))
多线程
等待子线程
def get_progress():
import time
time.sleep(10)
print('完成子线程')
def get_progress1():
import time
time.sleep(5)
print('完成子线程')
if __name__ == '__main__':
import multiprocessing as mp
p = mp.Process(target=get_progress(),name='test')
p.daemon=True;
p.start()
mp.Process(target=get_progress1(),name='test').start()
print('完成')
通信
#multiprocessing.Queue 与 queue.Queue 用法一致,
msg_format = 'pid:{} {} {}'
def getter(q:multiprocessing.Queue):
while 1:
value = q.get() #阻塞,直到有数据
print('\t\t'+msg_format.format(multiprocessing.current_process().pid,'get',value))
def setter(q:multiprocessing.Queue):
pid = multiprocessing.current_process().pid
for i in range(3):
q.put(i)
print(msg_format.format(pid,'set',i))
q.close() #close() 指示当前进程不会在此队列上放置更多数据
if __name__ == '__main__':
q = multiprocessing.Queue()
get_process = multiprocessing.Process(target=getter,args=(q,))
set_process = multiprocessing.Process(target=setter,args=(q,))
get_process.start()
set_process.start()
pid = os.getpid()
while 1:
print('main thread {} . getprocess alive : {} , setprocess alive : {}'.format(
pid,get_process.is_alive(),set_process.is_alive()
))
time.sleep(5)
JoinableQueue (应用于进程)用法与queue.Queue(线程安全)一致
import sys,multiprocessing,os,time
"""
JoinableQueue 与 queue.Queue的用法一样
只是一个用于进程一个用于线程
"""
def getter(q:multiprocessing.JoinableQueue):
while 1:
time.sleep(1)
value = q.get()
print('pid:',os.getpid(),',get :', value)
q.task_done() #如果注释掉这行, q.join() 将永远阻塞
if __name__ =='__main__':
q = multiprocessing.JoinableQueue()
p1 = multiprocessing.Process(target=getter,args=(q,))
p1.start()
for i in range(5):
q.put(i)
q.join()
while 1:
print('main threading , children:',multiprocessing.active_children())
time.sleep(1)
Pipe 管道
import sys,multiprocessing,os,time
"""
使用Pipe 2个进程互相发送数据.
recv 阻塞 , 直到有数据到来.
close 关闭管道
"""
def sender(pipe:multiprocessing.Pipe):
pipe.send({'1':1,'2':2})
print('sender 发送完成!')
pipe.close()
def recver(pipe:multiprocessing.Pipe):
time.sleep(3)
print('recver ready!')
obj = pipe.recv()
print('recver recv:',obj)
pipe.close()
if __name__ =='__main__':
con1 , con2 = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender,args=(con1,),name='sender')
p2 = multiprocessing.Process(target=recver,args=(con2,),name='recver')
p2.start()
p1.start()
while 1:
time.sleep(3)
print( ' active process :' , multiprocessing.active_children())
Pool. map, apply, imap 都是同步函数
"""
map , apply 都是同步函数,区别是apply 等到返回再执行下个进程
map 是等到所有进程返回
"""
def sub(x):
time.sleep(2)
print('sub process :{} value:{}'.format(multiprocessing.current_process().pid
,x))
return x
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
for i in range(4):
res = pool.apply(sub,args=(i,))
print('main :',res)
# res = pool.map(sub,[1,2,3,4])
# pool.close()
while 1:
time.sleep(1)
print('mian threading , active :',multiprocessing.active_children())
map_async : 异步函数
import sys,multiprocessing,os,time,threading
"""
map_async 异步函数,一次返回所有结果,即要等待所有结果
可选:callback函数 , 如果成功则调用,否则调用error_callback
"""
def sub(x):
time.sleep(2)
print('sub process pid :{} , tid:{} value:{}'.format(multiprocessing.current_process().pid,
threading.currentThread().ident
,x))
return x
def call_back(v):
print('pid:', os.getpid(),',tid:',threading.currentThread().ident,',v:',v)
if __name__ == '__main__':
print('main pid:', os.getpid(), ',tid:', threading.currentThread().ident)
pool = multiprocessing.Pool(4)
results = pool.map_async(sub,range(10),callback=call_back)
pool.close()
pool.join()
print(results.get())
apply_async
"""
apply_async :
唯一需要注意的是:
for i in range(10):
res = pool.apply_async(sub,args=(i,),callback=call_back)
res.get()
不要这样使用,否则又同步执行了
"""
def sub(x):
time.sleep(2)
print('sub process pid :{} , tid:{} value:{}'.format(multiprocessing.current_process().pid,
threading.currentThread().ident
,x))
return x
def call_back(v):
print('----> callback pid:', os.getpid(),',tid:',threading.currentThread().ident,',v:',v)
if __name__ == '__main__':
print('main pid:', os.getpid(), ',tid:', threading.currentThread().ident)
pool = multiprocessing.Pool(4)
results = [pool.apply_async(sub,args=(i,),callback=call_back) for i in range(10)]
pool.close()
pool.join()