Administrator
发布于 2026-05-17 / 1 阅读
0

python多线程

1 多进程 ===== ``` # 多进程, import os import time from multiprocessing import Process # 启动时必须在 if __name__ 判断下,windows 必须,其他 无限制 # ================================================= # def func(args): # print("子进程:",os.getpid()) # print("子进程的父进程:",os.getppid()) # time.sleep(10) # print("子进程结束") # if __name__ =="__main__": # p = Process(target=func,args=(1,)) # 注册 并传入元祖 元祖有一个参数要加逗号 # # p是进程对象 # p.start() # 开启子进程 # print("主进程:", os.getpid()) # print("主进程的父进程:",os.getppid()) # cmd 或者是 pycharm # 生命周期 # 主进程长:自己执行完结束 # 子进程长:等待子进程结束 # ================================================= # 多进程中的方法 # join # def fun(arg1,arg2): # print('*'*arg1) # # time.sleep(5) # print('*'*arg2) # if __name__ == "__main__": # p = Process(target=fun,args=(10,20)) # p.start() # # p.join() # 感知子进程结束 # # time.sleep(1) # print("all is stop") # # print("最后的语句") # os.walk(r"目录") # 返回 文件夹中文件名字 # ================================================= # def fun(): # print("xxx") # if __name__ == "__main__": # for i in range(10): # p = Process(target=fun) # p.start() # p.join() # 停止for循环 进程结束后继续 # print("for") # print("主进程") # ================================================= # 第二种方法 # class MyProcess(Process): # def __init__(self,args): # super().__init__() # 若要传递参数,需要调用父类init # # def run(self): # print("子进程",self.__dict__) # print(self.pid) # if __name__ == "__main__": # print("主进程:",os.getpid()) # p1 = MyProcess() # p1.start() # ================================================= # 进程之间数据是隔离,命名空间不通 # def fun(): # global n # n= 0 # print("pid:%s" %os.getpid(),n) # if __name__ == "__main__": # n=100 # p = Process(target=fun) # p.start() # p.join() # print(n) # -->100 # ================================================= # 多进程tcp连接 # import socket # # 客户端 # sk = socket.socket() # sk.connect(("127.0.0.1",8080)) # sk.send('N好'.encode("utf8")) # msg = sk.recv(1024).decode("utf8") # print(msg) # sk.close() # # # 服务端 # def server(conn): # ret= "你好".encode("utf8") # conn.send(ret) # msg = conn.recv(1024).decode("utf8") # print(msg) # conn.close() # # sk = socket.socket() # sk.bind(("127.0.0.1",8080)) # sk.listen() # if __name__ == "__main__": # while True: # conn, addr = sk.accept() # p = Process(target=server,args=(conn,)) # p.start() # ================================================= # 守护进程 # 默认情况 父进程 等待子进程结束 # p.daemon = True 在start前,设置为守护进程,守护进程随父进程(代码执行完毕)结束 # 若父进程在等待 子进程(非守护进程时) ,若父进程代码完毕,守护进程应该结束 # p.is_alive() 判断进程是否存活 # p.terminate() 终止进程 # ================================================= # 锁 # 未加锁实例: # 火车票 import json import time from multiprocessing import Process # def show(i): # with open('ticket') as f: # dic = json.load(f) # print('余票: %s'%dic['ticket']) ​ def buy_ticket(i): with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0 : dic['ticket'] -= 1 print('\033[32m%s买到票了\033[0m'%i) else: print('\033[31m%s没买到票\033[0m'%i) time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) if __name__ == '__main__': # for i in range(10): # p = Process(target=show,args=(i,)) # p.start() for i in range(10): p = Process(target=buy_ticket, args=(i)) p.start() # ================================================= # 锁 # 加锁实例 # 火车票 import json import time from multiprocessing import Process from multiprocessing import Lock ​ # def show(i): # with open('ticket') as f: # dic = json.load(f) # print('余票: %s'%dic['ticket']) ​ def buy_ticket(i,lock): lock.acquire() #拿钥匙进门 with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0 : dic['ticket'] -= 1 print('\033[32m%s买到票了\033[0m'%i) else: print('\033[31m%s没买到票\033[0m'%i) time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) lock.release() # 还钥匙 ​ if __name__ == '__main__': # for i in range(10): # p = Process(target=show,args=(i,)) # p.start() lock = Lock() for i in range(10): p = Process(target=buy_ticket, args=(i,lock)) p.start() # ================================================= # ================================================= # ================================================= ``` 2 信号量_事件 ======== ``` # 多进程中的组件 # 一个资源 同一时间 被n个人访问 import time import random from multiprocessing import Process,Event # ============================== # 未用信号量 # def ktv(i): # print('%s走进ktv'%i) # time.sleep(random.randint(1,5)) # print('%s走出ktv'%i) # if __name__ == '__main__' : # for i in range(20): # p = Process(target=ktv,args=(i)) # p.start() # ============================== from multiprocessing import Semaphore ​ # sem = Semaphore(4) # sem.acquire() # print('拿到第一把钥匙') # sem.acquire() # print('拿到第二把钥匙') # sem.acquire() # print('拿到第三把钥匙') # sem.acquire() # print('拿到第四把钥匙') # sem.acquire() # print('拿到第五把钥匙') # def ktv(i,sem): # sem.acquire() #获取钥匙 # print('%s走进ktv'%i) # time.sleep(random.randint(1,5)) # print('%s走出ktv'%i) # sem.release() # 释放钥匙 # # # if __name__ == '__main__' : # sem = Semaphore(4) # for i in range(20): # p = Process(target=ktv,args=(i,sem)) # p.start() ​ # ============================== # 事件 # 信号是控制进程阻塞与否 # 事件创建后,默认是阻塞状态 # e = Event() # 创建事件 # e.is_set() # False 默认阻塞 # print("xx") # 可打印 e.set() 设置为True e.clear() 设置为False # e.wait() # print("xx") # 阻塞 # 遇到wait()会判断is_set() 为False 阻塞 # ============================== # 红绿灯事件 def cars(e,i): if not e.is_set(): print("car%i在等待" % i) e.wait() print("car%i通过" % i) ​ ​ def light(e): while True: if e.is_set(): e.clear() print("\033[31m红灯\033[0m") else: e.set() print("\033[32m绿灯\033[0m") time.sleep(2) if __name__ == "__main__": e = Event() p =Process(target=light,args=(e,)) p.start() for i in range(1,21): car = Process(target=cars,args=(e,i)) car.start() time.sleep(random.random()) # ============================== # ============================== # ============================== # ============================== # def test(e): # e.set() # print("xxx") # if __name__=="__main__": # e = Event() # print(e.is_set()) # p = Process(target=test,args=(e,)) # p.start() # e.wait() # print("注") ​ ``` 3 进程通信_队列管道 =========== ``` # IPC 内部进程通信,不能使用普通queue from multiprocessing import Queue,Process # =============================== # q = Queue(5) # 队列大小 # q.put(1) # q.put(1) # q.full() # 若队列满了,阻塞等待 # q.get() # q.empty() # 若为空,阻塞等待有数据 后取值 # q.get() # q.get_nowait() # 用于跳过等待,需要用try # =============================== # def produce(q): # q.put('hello') # def consume(q): # print(q.get()) # if __name__ =="__main__": # q = Queue() # p = Process(target=produce,args=(q,)) # p.start() # p2 = Process(target=consume, args=(q,)) # p2.start() # =============================== # 生产者消费者模型 # 若生产者,生产有数量,消费者,不停消费,最后消费进程会处于等待状态 # 可在主进程后边join生产进程,消费进程判断为空,但不准确 # 需要在队列put(None) 子进程判断,由于数据之间不能共享,需要put 消费数量的None # =============================== from multiprocessing import JoinableQueue # consume : # .... # q.task_done() # produce : # ... # q.join() # =============================== # 循环通知,致使进程结束 # JoinableQueue # 生产者生产,不停不停消费,若q为空 一直等待, # 生产者完毕后,会join等待消费值消费完毕,因为是同一个q,一个生产者完毕后,其他还没有完毕q会处于,他会处于阻塞 # 等待 消费者 全部消费完毕,q.join()会感知,因此 生产进程会结束,主进程最后join生产进程,生产结束 # 主进程就结束,身为守护进程的子进程也结束 # import time # import random # from multiprocessing import Process,JoinableQueue # def consumer(q,name): # while True: # food = q.get() # print('\033[31m%s消费了%s\033[0m' % (name,food)) # time.sleep(random.randint(1,3)) # q.task_done() # count - 1 # # def producer(name,food,q): # for i in range(4): # time.sleep(random.randint(1,3)) # f = '%s生产了%s%s'%(name,food,i) # print(f) # q.put(f) # q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕 # # if __name__ == '__main__': # q = JoinableQueue(20) # p1 = Process(target=producer,args=('Egon','包子',q)) # p2 = Process(target=producer, args=('wusir','泔水', q)) # c1 = Process(target=consumer, args=(q,'alex')) # c2 = Process(target=consumer, args=(q,'jinboss')) # p1.start() # p2.start() # c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束 # c2.daemon = True # c1.start() # c2.start() # p1.join() # p2.join() # 感知一个进程的结束 ​ # 在消费者这一端: # 每次获取一个数据 # 处理一个数据 # 发送一个记号 : 标志一个数据被处理成功 ​ # 在生产者这一端: # 每一次生产一个数据, # 且每一次生产的数据都放在队列中 # 在队列中刻上一个记号 # 当生产者全部生产完毕之后, # join信号 : 已经停止生产数据了 # 且要等待之前被刻上的记号都被消费完 # 当数据都被处理完时,join阻塞结束 ​ # consumer 中把所有的任务消耗完 # producer 端 的 join感知到,停止阻塞 # 所有的producer进程结束 # 主进程中的p.join结束 # 主进程中代码结束 # 守护进程(消费者的进程)结束 # =============================== # 管道 双向通信工具 from multiprocessing import Pipe # conn,conn2 = Pipe() # conn.send("123456") # 不用字节 # print(conn2.recv()) # 不用指定大小 ​ def fun(conn): conn.send("hello") if __name__=="__main__": conn1,conn2 = Pipe() Process(target=fun,args=(conn1,)).start() print(conn1.recv()) # 管道返回2个连接 # conn1, conn2 # P 发送 接受 # p2 接受 发送 # 若只发数据,可关闭一端,若取数据的时候,对面已关闭,则报错,看根据此 终止程序 from multiprocessing import Pipe,Process # def func(conn1,conn2): # conn2.close() # while True: # try : # msg = conn1.recv() # print(msg) # except EOFError: # conn1.close() # break # # if __name__ == '__main__': # conn1, conn2 = Pipe() # Process(target=func,args = (conn1,conn2)).start() # conn1.close() # for i in range(20): # conn2.send('吃了么') # conn2.close() # =============================== # from multiprocessing import Lock,Pipe,Process # def producer(con,pro,name,food): # con.close() # for i in range(100): # f = '%s生产%s%s'%(name,food,i) # print(f) # pro.send(f) # pro.send(None) # pro.send(None) # pro.send(None) # pro.close() # # def consumer(con,pro,name,lock): # pro.close() # while True: # lock.acquire() # 不安全主要是recv的时候,因此两端加锁即可 # food = con.recv() # lock.release() # if food is None: # con.close() # break # print('%s吃了%s' % (name, food)) # if __name__ == '__main__': # con,pro = Pipe() # lock= Lock() # p = Process(target=producer,args=(con,pro,'egon','泔水')) # c1 = Process(target=consumer, args=(con, pro, 'alex',lock)) # c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock)) # c3 = Process(target=consumer, args=(con, pro, 'wusir',lock)) # c1.start() # c2.start() # c3.start() # p.start() # con.close() # pro.close() ​ # from multiprocessing import Process,Pipe,Lock # # def consumer(produce, consume,name,lock): # produce.close() # while True: # lock.acquire() # baozi=consume.recv() # lock.release() # if baozi: # print('%s 收到包子:%s' %(name,baozi)) # else: # consume.close() # break # # def producer(produce, consume,n): # consume.close() # for i in range(n): # produce.send(i) # produce.send(None) # produce.send(None) # produce.close() # # if __name__ == '__main__': # produce,consume=Pipe() # lock = Lock() # c1=Process(target=consumer,args=(produce,consume,'c1',lock)) # c2=Process(target=consumer,args=(produce,consume,'c2',lock)) # p1=Process(target=producer,args=(produce,consume,30)) # c1.start() # c2.start() # p1.start() # produce.close() # consume.close() ​ # pipe 数据不安全性 # IPC # 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象 ​ # 队列 进程之间数据安全的 # 管道 + 锁 ``` 3_进程通信__数据共享 ============ ``` from multiprocessing import Manager,Process,Lock # 进程之间不能传递数据,通过方法的参数可以传过去,但修改后,无反应,不知道原因 def main(dict,lock): lock.acquire() dict['count']-=1 lock.release() if __name__ == "__main__": m = Manager() l = Lock() dict = m.dict({"count":100}) p_list=[] for i in range(50): p = Process(target=main,args=(dict,l)) p.start() p_list.append(p) for i in p_list:i.join() print(dict['count']) ``` 4_进程池 ===== ``` # 进程池 # 上一个例子中50个进程很慢 # 寄存器 堆栈 文件 # 操作系统调度,cup切换 # 高级线程池有数量限定 ,有最低,任务变多的时候 逐步加到最高限制 import os,time from multiprocessing import Pool,Manager # ================================== # def func2(i): # print(os.getpid(),os.getppid()) # i+1 # def func(list): # list[1]['set'].add(os.getpid()) # print(len(list[1]['set'])) # # 一般超过5个使用pool # if __name__ == "__main__": # pid = Manager() # dict1 = pid.dict({"set":set()}) # pool = Pool(5) # # 执行不同的任务,map 自带join方法 # #pool.map(func2,range(100)) # pool.map(func, [[i,dict1]for i in range(100)]) # print(len(dict1['set'])) # """ # 等于 # for i in range(100): # p = Process(target=func,args=(i,)) # p.start() # """ # ================================== # def fun(n): # print("start fun%s" %n,os.getpid()) # time.sleep(1) # print("end fun%s" % n, os.getpid()) # if __name__ == "__main__": # p = Pool() # 默认cup核心数量 # for i in range(10): # #p.apply(fun,args=(i,)) # 同步提交的 # p.apply_async(fun,args=(i,)) # 异步提交,真的异步,因此需要join # p.close() # 不再接受新的任务 # p.join() # 感知进程池中任务结束 保持 主进程 与子进程同步 ​ # ================================== # import socket # from multiprocessing import Pool # # def func(conn): # conn.send(b'hello') # print(conn.recv(1024).decode('utf-8')) # conn.close() # # if __name__ == '__main__': # p = Pool(5) # sk = socket.socket() # sk.bind(('127.0.0.1',8080)) # sk.listen() # while True: # conn, addr = sk.accept() # p.apply_async(func,args=(conn,)) # sk.close() # import socket # # sk = socket.socket() # sk.connect(('127.0.0.1', 8080)) # # ret = sk.recv(1024).decode('utf-8') # print(ret) # msg = input('>>>').encode('utf-8') # sk.send(msg) # sk.close() # ================================== # 进程池的返回值 # p = Pool() # p.map(funcname,iterable) 默认异步的执行任务,且自带close和join # p.apply 同步调用的 # p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join # from multiprocessing import Pool # def func(i): # return i*i # # if __name__ == '__main__': # p = Pool(5) # for i in range(10): # res = p.apply(func,args=(i,)) # apply的结果就是func的返回值 # print(res) --> 直接就是返回值 # ================================== # import time # from multiprocessing import Pool # def func(i): # time.sleep(0.5) # return i*i # # if __name__ == '__main__': # p = Pool(5) # res_l = [] # for i in range(10): # res = p.apply_async(func,args=(i,)) # apply的结果就是func的返回值 # res_l.append(res) # 若在for 中直接获取res.get()会在成阻塞,程序变同步执行 # for res in res_l:print(res.get())# 等着 func的计算结果 # 调用res.get时返回 # ================================== # import time # from multiprocessing import Pool # def func(i): # time.sleep(0.5) # return i*i # # if __name__ == '__main__': # p = Pool(5) # ret = p.map(func,range(100)) # print(ret) # -> 直接返回全部,列表返回 # 自带join,close 最后一起返回 # ==================================== # 回调函数 , 回调的函数在主进程调用 # 对于子进程中再起子进程问题,还不知道 # 每个进程的回调函数 交给主进程顺序执行 import os from multiprocessing import Pool,Process def func2(nn): print('in func2',os.getpid()) print(nn) def func3(n): print('in func3', os.getpid()) return n*n def func1(n): print('in func1',os.getpid()) p = Pool(5) p.apply_async(func3, args=(10,), callback=func2) p.close() p.join() return n*n if __name__ == '__main__': print('主进程 :',os.getpid()) p = Pool(5) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() # =================================================== import requests from urllib.request import urlopen from multiprocessing import Pool # 200 网页正常的返回 # 404 网页找不到 # 502 504 # 场景:callback 耗时段,远小于网络延时,此时使用,在主进程运行, def get(url): response = requests.get(url) if response.status_code == 200: return url, response.content.decode('utf-8') ​ ​ def get_urllib(url): ret = urlopen(url) return ret.read().decode('utf-8') ​ ​ def call_back(args): url, content = args print(url, len(content)) ​ ​ if __name__ == '__main__': url_lst = [ 'https://www.cnblogs.com/', 'http://www.baidu.com', 'https://www.sogou.com/', 'http://www.sohu.com/', ] p = Pool(5) for url in url_lst: p.apply_async(get, args=(url,), callback=call_back) # callback 中的参数为 get函数的返回值 p.close() p.join() ``` 4_线程 ==== #### 两者之间应该有对应关系1:1 1:n ### linux 中的nptl 1对1 线程 ``` # 同一进程的线程间的数据共享的,共享的 共享的 # 可通过直接访问全局变量 global,还需要进程同步 # 创建,切换,撤销 相比进程 消耗小,轻量级 # 进程:资源分配单位,每个进程 至少一个线程 # 线程:cup调度单位 # thread 基本模块,避免使用,可能与threading 冲突 # threading thread的高级版本 # Queue 多线程之间共享数据的数据结构 # 与进程类似,好多方法相同 import time from threading import Thread import threading # def func(n): # time.sleep(1) # print(n) # t = Thread(target=func,args=(12,)) # t.daemon = True # 成为"守护线程" # t.start() # print("主线程") # 默认情况等待子线程结束 # =================================== # class MyThread(Thread): # def __init__(self,name): # super().__init__() # self.name = name # def run(self): # # time.sleep(1) # print(self.name) # MyThread("thread1").start() # ================================ # GIL 锁的是线程,同一时间 只有一个线程 ,cpython解释器的问题,jpython 就不会 # 对于io密集型 没什么区别,只要io时会切换即可 # 但对于多核cup python 同时只能运行一个cup ,其他语言的会运行多个,因此... # 即不能通过物理核心数增加速度,不能实现(并行) # ============================================ # 多线程socket 可以input # import socket # from threading import Thread # def chat(conn): # conn.send(b'hello') # msg = conn.recv(1024).decode('utf-8') # print(msg) # conn.close() # sk = socket.socket() # sk.bind(('127.0.0.1',8080)) # sk.listen() # while True: # conn,addr = sk.accept() # Thread(target=chat,args = (conn,)).start() # sk.close() # # import socket # sk = socket.socket() # sk.connect(('127.0.0.1',8080)) # msg = sk.recv(1024) # print(msg) # inp = input('>>> ').encode('utf-8') # sk.send(inp) # sk.close() # ========================= # print(threading.current_thread()) # 当前线程 # print(threading.active_count()) # 全部线程,包括主线程 # print(threading.enumerate()) # 列表返回全部线程对象 # ========================================== # 守护线程 # import time # from threading import Thread # def func1(): # while True: # print('*'*10) # time.sleep(1) # def func2(): # print('in func2') # time.sleep(5) # # t = Thread(target=func1,) # t.daemon = True # t.start() # t2 = Thread(target=func2,) # t2.start() # t2.join() # print('主线程') ​ # (守护进程)随着(主进程代码)的执行结束而结束 # 守护(线程)会在主线程结束之后等待(其他非守护子线程)的结束才结束 ​ # 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 回收子进程的资源 # import time # from multiprocessing import Process # def func(): # time.sleep(5) # # if __name__ == '__main__': # Process(target=func).start() # ========================================= # 线程锁 ,与gil无关 import time from threading import Lock,Thread # Lock 互斥锁 # def func(lock): # global n # lock.acquire() # temp = n # time.sleep(0.2) # n = temp - 1 # lock.release() # # n = 10 # t_lst = [] # lock = Lock() # for i in range(10): # t = Thread(target=func,args=(lock,)) # t.start() # t_lst.append(t) ​ # for t in t_lst: t.join() # print(n) ​ # 科学家吃面 还会死锁 ​ # noodle_lock = Lock() # fork_lock = Lock() # def eat1(name): # noodle_lock.acquire() # print('%s拿到面条啦'%name) # fork_lock.acquire() # print('%s拿到叉子了'%name) # print('%s吃面'%name) # fork_lock.release() # noodle_lock.release() # # def eat2(name): # fork_lock.acquire() # print('%s拿到叉子了'%name) # time.sleep(1) # noodle_lock.acquire() # print('%s拿到面条啦'%name) # print('%s吃面'%name) # noodle_lock.release() # fork_lock.release() # # Thread(target=eat1,args=('alex',)).start() # Thread(target=eat2,args=('Egon',)).start() # Thread(target=eat1,args=('bossjin',)).start() # Thread(target=eat2,args=('nezha',)).start() # =============================================== from threading import RLock # 递归锁 fork_lock = noodle_lock = RLock() # 一个钥匙串上的两把钥匙,同一个lock 在一个线程中可又多次acquire # 传给其他线程时 不能被acquire # def eat1(name): # print(name) # noodle_lock.acquire() # 一把钥匙 # print('%s拿到面条啦'%name) # fork_lock.acquire() # print('%s拿到叉子了'%name) # print('%s吃面'%name) # fork_lock.release() # noodle_lock.release() # # def eat2(name): # print(name) # fork_lock.acquire() # print('%s拿到叉子了'%name) # time.sleep(1) # noodle_lock.acquire() # print('%s拿到面条啦'%name) # print('%s吃面'%name) # noodle_lock.release() # fork_lock.release() # Thread(target=eat1,args=('alex',)).start() # Thread(target=eat2,args=('Egon',)).start() # Thread(target=eat1,args=('bossjin',)).start() # Thread(target=eat2,args=('nezha',)).start() # ================================================= ``` 5_线程_信号量_事件_条件_定时器_列队_线程池 ========================= ``` import time from threading import Semaphore,Thread # ==================================== # def func(sem,a,b): # sem.acquire() # time.sleep(1) # print(a+b) # sem.release() # sem = Semaphore(4) # for i in range(10): # t = Thread(target=func,args=(sem,i,i+5)) # t.start() # ==================================== # 事件被创建的时候 # False状态 # wait() 阻塞 # True状态 # wait() 非阻塞 # clear 设置状态为False # set 设置状态为True # 数据库 - 文件夹 # 文件夹里有好多excel表格 # 1.能够更方便的对数据进行增删改查 # 2.安全访问的机制 # 起两个线程 # 第一个线程 : 连接数据库 # 等待一个信号 告诉我我们之间的网络是通的 # 连接数据库 # 第二个线程 : 检测与数据库之间的网络是否连通 # time.sleep(0,2) 2 # 将事件的状态设置为True # import time # import random # from threading import Thread,Event # def connect_db(e): # count = 0 # while count < 3: # e.wait(0.5) # 状态为False的时候,我只等待1s就结束 # if e.is_set() == True: # print('连接数据库') # break # else: # count += 1 # print('第%s次连接失败'%count) # else: # raise TimeoutError('数据库连接超时') # def check_web(e): # time.sleep(random.randint(0,3)) # e.set() # e = Event() # t1 = Thread(target=connect_db,args=(e,)) # t2 = Thread(target=check_web,args=(e,)) # t1.start() # t2.start() # ==================================== # 条件 复杂的锁 # 条件 from threading import Condition # 条件 # 锁 # acquire release # 一个条件被创建之初 默认有一个(False)状态 # False状态 会影响wait一直处于等待状态 # notify(int数据类型) 造钥匙 # from threading import Thread,Condition # def func(con,i): # con.acquire() # con.wait() # 等钥匙 # print('在第%s个循环里'%i) # con.release() # con = Condition() # for i in range(10): # Thread(target=func,args = (con,i)).start() # while True: # num = int(input('>>>')) # con.acquire() # con.notify(num) # 造钥匙 # con.release() # ==================================== #定时器 # import time # from threading import Timer # def func(): # print('时间同步') #1-3 # while True: # t = Timer(5,func).start() # 非阻塞的 ,异步的 ,会把所有的5s在一起 # time.sleep(5) # 睡5s 每5s进行意思时间同步 # ==================================== # 加锁 麻烦 所以使用队列 #线程通信 # queue # import queue #直接导入普通queue 是线程安全的 # q = queue.Queue() # 队列 先进先出 # q.put() # q.get() # q.put_nowait() # q.get_nowait() # q = queue.LifoQueue() # 栈 先进后出 # q.put(1) # q.put(2) # q.put(3) # print(q.get()) # print(q.get()) # q = queue.PriorityQueue() # 优先级队列 # q.put((1,'a')) # q.put((10,'b')) # q.put((30,'c')) # q.put((1,'d')) # q.put((1,'f')) # print(q.get()) # 元祖中的元素按顺序比较,数字越小优先级大,祖父按照ascii越小优先级越大 # ==================================== # 线程池 import time # 以前没有线程池 from concurrent.futures import ThreadPoolExecutor # ProcessPoolExecutor 该模块下还有一个进程池,与multi 功能相同 # submit(fn,*args,**kwargs) 异步提交任务 # map(fun,*iterables,timeout=None,chunksize - 1) 循环的submit # shutdown(wait=True) # 等于原来的 close join 合并 # result(time=None) 取得结果 # add_done_callback(fn) 回调函数 def func(n): time.sleep(2) print(n) return n*n def call_back(m): print('结果是 %s'%m.result()) # 若使用进程池 只换ThreadPoolExecutor->ProcessPoolExecutor tpool = ThreadPoolExecutor(max_workers=5) # 默认 不要超过cpu个数*5 for i in range(20): tpool.submit(func,i).add_done_callback(call_back) tpool.shutdown() # tpool.map(func,range(20)) # 拿不到返回值 ​ # t_lst = [] # for i in range(20): # t = tpool.submit(func,i) # t_lst.append(t) # tpool.shutdown() # close+join # # print('主线程') # for t in t_lst:print('***',t.result()) # 拿返回值 ``` 6_协程 ==== ``` # 进程 多个进程,操作系统负责 # 线程 不能同一时间多个cup 其他语言可以,但不影响高io # 开启线程 创建线程 寄存器 堆栈 # 关闭一个线程 # 协程 # 本质是一个线程 # 能够在多个任务间切换,不需要寄存器,堆栈切换 # 任务之间切换时间开销 远小于线程 # 计算任务之间切换消耗也很大,一般都是遇到io的时候切换 # 进程(cup数+1)+线程(cup数*5)+协程(500) = 50000 # 适合爬虫 # 实现并发的手段 # import time # 实现在 con,pro之间来回切换 # def consumer(): # while True: # x = yield # time.sleep(1) # print('处理了数据 :',x) # # def producer(): # c = consumer() # next(c) # for i in range(10): # time.sleep(1) # print('生产了数据 :',i) # c.send(i) # # producer() # ============================================= # 真正的协程模块就是使用greenlet完成的切换 from greenlet import greenlet # def eat(): # print('eating start') # g2.switch() # print('eating end') # g2.switch() # # def play(): # print('playing start') # g1.switch() # print('playing end') # g1 = greenlet(eat) # 必须先有g1 ,g2 函数中才能使用g # g2 = greenlet(play) # 不会自动切换 # g1.switch() # ====================================== # 不能感知time.sleep(1) # 可以感知gevent.sleep(1),在第一行引入 如下from... # 后边的time 都会经过特殊处理,time.sleep() 就可以被识别 # from gevent import monkey;monkey.patch_all() # import time # import gevent # import threading # def eat(): # DummyThread-1 虚拟的线程 # print(threading.current_thread().getName()) # print(threading.current_thread()) # print('eating start') # time.sleep(1) # print('eating end') # # def play(): # DummyThread-2 虚拟的线程 # print(threading.current_thread().getName()) # print(threading.current_thread()) # print('playing start') # time.sleep(1) # print('playing end') # # g1 = gevent.spawn(eat) # 注册进入,会自动切换,不是操作系统调度 # g2 = gevent.spawn(play) # gevent 负责协程的调度 通过封装的greenlet switch # g1.join() gevent 是完全异步的 join等待协程结束 # g2.join() # 进程和线程的任务切换右操作系统完成 # 协程任务之间的切换由程序(代码)完成,只有遇到协程模块能识别的IO操作,(时间片等不识别)的时候,程序才会进行任务切换,实现并发的效果 # ======================================== # 同步 和 异步 # from gevent import monkey;monkey.patch_all() # 放最前面 # import time # import gevent # def task(n): # time.sleep(1) # print(n) # def sync(): # 同步 # for i in range(10): # task(i) # def async(): # 异步 # g_lst = [] # for i in range(10): # g = gevent.spawn(task,i) # g_lst.append(g) # gevent.joinall(g_lst) #两种方法都可 # for g in g_lst:g.join() # ====================================== # 协程 : 能够在一个线程中实现并发效果的概念 # 能够规避一些任务中的IO操作 # 在任务的执行过程中,检测到IO就切换到其他任务 ​ # 多线程 被弱化了 # 协程 在一个线程上 提高CPU 的利用率 # 协程相比于多线程的优势 切换的效率更快 # ========================================== # 爬虫的例子 # 请求过程中的IO等待 # from gevent import monkey;monkey.patch_all() # import gevent # from urllib.request import urlopen # 内置的模块 # urlopen html时有个格式的 reguests 无格式 # def get_url(url): # response = urlopen(url) # content = response.read().decode('utf-8') # return len(content) # # g1 = gevent.spawn(get_url,'http://www.baidu.com') # g2 = gevent.spawn(get_url,'http://www.sogou.com') # g3 = gevent.spawn(get_url,'http://www.taobao.com') # g4 = gevent.spawn(get_url,'http://www.hao123.com') # g5 = gevent.spawn(get_url,'http://www.cnblogs.com') # gevent.joinall([g1,g2,g3,g4,g5]) # print(g1.value) # print(g2.value) # print(g3.value) # print(g4.value) # print(g5.value) ​ # ret = get_url('http://www.baidu.com') # print(ret) # ====================================== from gevent import monkey;monkey.patch_all() import socket import gevent def talk(conn): conn.send(b'hello') print(conn.recv(1024).decode('utf-8')) conn.close() ​ sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() gevent.spawn(talk,conn) sk.close() ​ import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) print(sk.recv(1024)) msg = input('>>>').encode('utf-8') sk.send(msg) sk.close() ``` 7_io模型 ====== 阻塞模型 ---- 非阻塞模型 ----- io多路复用 ------ ``` # 同步 提交一个任务之后要等待这个任务执行完毕 # 异步 只管提交任务,不等待这个任务执行完毕就可以做其他事情 # 阻塞 recv recvfrom accept # 非阻塞 # 阻塞 线程 运行状态 --> 阻塞状态 --> 就绪 # 非阻塞 # IO多路复用 # select机制 Windows linux 都是操作系统轮询每一个被监听的项,看是否有读操作 # poll机制 linux 它可以监听的对象比select机制可以监听的数量多 # 随着监听项的增多,导致效率降低 # epoll机制 linux 更高级,绑定回调函数, # ================================= # 以前的都是阻塞io # ================================= # 非阻塞io实例 # import socket # sk = socket.socket() # sk.bind(('127.0.0.1',9000)) # sk.setblocking(False) # 设置不阻塞 # sk.listen() # conn_l = [] # del_conn = [] # while True: # try: # conn,addr = sk.accept() #不阻塞,但是没人连我会报错 # print('建立连接了:',addr) # conn_l.append(conn) # except BlockingIOError: # for con in conn_l: # try: # msg = con.recv(1024) # 非阻塞,如果没有数据就报错 # if msg == b'': # 若客户端关闭 会发送空消息 # del_conn.append(con) # continue # print(msg) # con.send(b'byebye') # except BlockingIOError:pass # for con in del_conn: # con.close() # conn_l.remove(con) # del_conn.clear() # # while True : 10000 500 501 # # import time # import socket # import threading # def func(): # sk = socket.socket() # sk.connect(('127.0.0.1',9000)) # sk.send(b'hello') # time.sleep(1) # print(sk.recv(1024)) # sk.close() # # for i in range(2): # threading.Thread(target=func).start() # ================================= # io 多路复用, 监听列表的循环 变为有操作系统执行 import select import socket sk = socket.socket() sk.bind(('127.0.0.1',8000)) sk.setblocking(False) sk.listen() read_lst = [sk] # 监听列表 while True: # [sk,conn] # 等待读列表,写列表,修改列表 都必传 # 返回元祖中3个列表,对应三个list,一般只用第一个 # r_lst里面就是sk对象 r_lst,w_lst,x_lst = select.select(read_lst,[],[]) for i in r_lst: if i is sk: conn,addr = i.accept() read_lst.append(conn) else: ret = i.recv(1024) if ret == b'': i.close() read_lst.remove(i) continue print(ret) i.send(b'goodbye!') import time import socket import threading def func(): sk = socket.socket() sk.connect(('127.0.0.1', 8000)) sk.send(b'hello') time.sleep(3) print(sk.recv(1024)) sk.close() for i in range(20): threading.Thread(target=func).start() # ================================= import selectors # 选择合适的多路复用机制 from socket import * def accept(sk,mask): conn,addr=sk.accept() sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask): try: data=conn.recv(1024) if not data: print('closing',conn) sel.unregister(conn) conn.close() return conn.send(data.upper()+b'_SB') except Exception: print('closing', conn) sel.unregister(conn) conn.close() sk=socket() sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) sk.bind(('127.0.0.1',8088)) sk.listen(5) sk.setblocking(False) #设置socket的接口为非阻塞 sel = selectors.DefaultSelector() # 自动选择一个适合我的IO多路复用的机制 sel.register(sk,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个sk对象,并且绑定了一个回调函数accept # 说白了就是 如果有人请求连接sk,就调用accrpt方法 while True: events=sel.select() #检测所有的sk,conn,是否有完成wait data阶段 for sel_obj,mask in events: # [conn] callback=sel_obj.data #callback=read callback(sel_obj.fileobj,mask) #read(conn,1) ``` pymysql ======= ``` import pymysql # 连接 conn = pymysql.connect( host="106.15.39.74", port=3306, database="test", user="root", password="dzf123,.", charset="utf8" # 没有"-" 没有 ) cursor = conn.cursor() sql = "select*from student" name = "dzf" password = "123456" sql = "select * from student where name = %s and password = %s" ret = cursor.execute(sql,[name,password]) # 自己拼接需要加引号,使用防注入sql不用加引号,参数不能少,多 #print(cursor.lastrowid) # 获取刚插入数据的id 应该就是主键 自增的那个,与名字无关 print(ret) # 返回受影响行数 ret = cursor.fetchall() # 元祖 大元组里边小元祖 print(ret,"a") ret = cursor.fetchone() # 取一条数据 print(ret,"a") ret = cursor.fetchone() print(ret,"a") # 直接返回一条元素,格式是 小元祖,或只有list中的一个小字典,外边没有元祖或list # 若连续fetchone() 第一次第一条,第二次第二条,一次向下取 # 若取完后 再次 fetchone() 取不到 # -->(('dzf','1234'),('dzf','1234')) # 在执行语句前 修改cursor格式 cursor.fetchmany(3) # 在cursor位置接下取3条,大元组中小元祖 # 移动光标 cursor.scroll(1,mode="absolute") # 绝对移动 移到1位置,从2开始 , cursor.scroll(1,mode="relative") # 相对移动 原来在3 位置,从4 开始读,现在 移动到4 从5开始读 # 向上移可以使用负的 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 指定为字典格式 """ [ {'id':1,'name':'dzf'}, {'id':1,'name':'dzf'}, ] """ cursor.close() conn.close() # ==================================== # 插入数据还是用cursor.execute(),注意提交后conn.commit() # 若多语句,可能错误,conn.rollback() # sql2 = "insert into student (name,password) values(%s, %s)" # ret = cursor.execute(sql2,['123','123']) # conn.commit() # 或insert into student (name,password) values(%(name)s, %(pwd)s) # 下边传入字典excute(sql,{"name":xxx..}) # ==================================== # 批量执行 data = (['12','12'],['23','32'],['32','23']) # 格式必须固定 cursor.executemany(sql,data) # 内部的for循环 # try 防止异常,要回滚, 会取消以前正确的插入语句 # ================================= # 删除,同理,也要提交 # ================================ # 修改 记得提交 ```