隊列是線程安全的嗎,Python多線程——隊列(Queue)

 2023-11-22 阅读 34 评论 0

摘要:一、 Queue作用:主要就是為多線程生產值、消費者之間線程通信提供服務,具有先進先出的數據結構。 1、首先我們組要明白為什么要使用隊列,隊列的性質, 多線程并發編程的重點,是線程之間共享數據的訪問問題和線程之間的通信問題,為了

一、 Queue作用:主要就是為多線程生產值、消費者之間線程通信提供服務,具有先進先出的數據結構。

1、首先我們組要明白為什么要使用隊列,隊列的性質,
  多線程并發編程的重點,是線程之間共享數據的訪問問題和線程之間的通信問題,為了解決線程之間數據共享問題,PYTHON 提供了一個數據類型【隊列】可以用于在多線程并發模式下,安全的訪問數據而不會造成數據共享沖突。正常請求的多線程,如果是消費之和生產者,通過列表實現,多線程會對列表中的數據取值,會出現同時訪問列表數據的情況,這時候就需要對線程進行加鎖或者是線程等待,手動進行解決,過于麻煩,但是隊列會通過先進先出或者先進后出的模式,保證了單個數據不會進行同時被多個線程進行訪問。
1.1 FIFO
 Queue.Queue(maxsize=0)FIFO即First in First Out,先進先出。Queue提供了一個基本的FIFO容器,使用方法很簡單,maxsize是個整數,指明了隊     列中能存放的數據個數的上限。一旦達到上限,插入會導致阻塞,直到隊列中的數據被消費掉。如果maxsize小于或者  等于0,隊列大小沒有限制。
1.2 LIFO
 Queue.LifoQueue(maxsize=0)LIFO即Last in First Out,后進先出。與棧的類似,使用也很簡單,maxsize用法同上priorityclass Queue.PriorityQueue(maxsize=0)構造一個優先隊列。maxsize用法同上。
1.3 基本方法:
 Queue.Queue(maxsize=0) #FIFO, 用來定義隊列的長度,如果maxsize小于1就表示隊列長度無限,Queue.LifoQueue(maxsize=0) #LIFO, 如果maxsize小于1就表示隊列長度無限Queue.qsize() #返回隊列的大小Queue.empty() #如果隊列為空,返回True,反之False ,在線程間通信的過程中,可以通過此來給消費者等待信息Queue.full() # 如果隊列滿了,返回True,反之False,給生產者提醒Queue.get([block[, timeout]]) 讀隊列,timeout等待時間Queue.put(item, [block[, timeout]]) 寫隊列,timeout等待時間Queue.queue.clear() 清空隊列task_done()#意味著之前入隊的一個任務已經完成。由隊列的消費者線程調用。每一個get()調用得到一個任務,接下來		
的task_done()調用告訴隊列該任務已經處理完畢如果當前一個join()正在阻塞,它將在隊列中的所有任務都處理完時恢復
執行(即每一個由put()調用入隊的任務都有一個對應的task_done()調用)。join()#阻塞調用線程,直到隊列中的所有任務被處理掉。只要有數據被加入隊列,未完成的任務數就會增加。當消費者
線程調用task_done((意味著有消費者取得任務并完成任務),未完成的任務數就會減少。當未完成的任務數降到0,
join()解除阻塞。

二、應用案例

2.1 隊列需求一(爬蟲的請求地址)

Python多線程主要是為了提高程序在IO方面的優勢,在爬蟲的過程中顯得尤為重要。正常的爬蟲請求直接封裝多線程就ok,但是爬蟲請求的過程中,對于url的請求需要通過隊列來實現,這是隊列的需求之一。
對于爬蟲的請求地址來說,一般是有規律性可循的,如果是翻頁數據,可以將請求到的url放到隊列中,通過多線程對隊列進行取數據,如果隊列為空,線程判斷自動等待,循環加入隊列url,線程自動請求,以下偽代碼,作為參考:

import threading
from queue import Queueclass ThreadCrawl(threading.Thread):def __init__(self, threadName, idQueue):# 繼承父類的方法super(ThreadCrawl, self).__init__()self.threadName = threadName          # 線程名字def run(self):print('啟動' + self.threadName)while not self.idQueue.empty():try:id = self.idQueue.get(False)  # False 如果隊列為空,拋出異常time.sleep(1)print("~"*300)self.get_con(id)except Exception as e:print('隊列為空。。。。。', e)passprint('#'*300)def get_con(self):  #自己封裝的請求自定義pass
def get_id(m, n):conn = psycopg2.connect(database='postgres', user='postgres', password='123456', host='127.0.0.1', port='5432')cur = conn.cursor()sql1 = 'SELECT doc_id from id LIMIT {} offset {};'.format(m, n)cur.execute(sql1)data = cur.fetchall()conn.commit()return data
def main():n = 60while True:m = 20# m是固定值,一次去20條, n是第幾條開始print('開始采集n的值為', n)if n == 200000:break# id的隊列idQueue = Queue(20)if idQueue.empty():data = get_id(m, n)for i in data:idQueue.put(i[0])# 采集線程的數量crawlList = []for id in range(1, 2):name = '采集線程{}'.format(id)crawlList.append(name)# 存儲采集線程的列表集合threadcrawl = []for threadName in crawlList:thread = ThreadCrawl(threadName, idQueue)thread.start()threadcrawl.append(thread)for thread in threadcrawl:thread.join()n = n + mprint("主線程退出..............")
if __name__ == '__main__':main()

隊列是線程安全的嗎、以上代碼是作者從數據庫中取數據,間隔性取,讓后拼裝到url,進行請求

2.2 隊列需求二(生產者、消費者模型)
import threading
import time
from queue import Queuedef put_id():i = 0while True:i = i + 1print("添加數據", i, id_queue.qsize())time.sleep(0.1)id_queue.put(i)def get_id(m):while True:i = id_queue.get()print("線程", m, '取值', i)if __name__ == "__main__":id_queue = Queue(20)Th1 = threading.Thread(target=put_id, )Th2 = threading.Thread(target=get_id, args=(2, ))Th3 = threading.Thread(target=get_id, args=(3, ))Th5 = threading.Thread(target=get_id, args=(4, ))Th4 = threading.Thread(target=get_id, args=(5, ))Th2.start()Th1.start()Th3.start()Th4.start()Th5.start()

作者:白白_嫩嫩
鏈接:https://www.jianshu.com/p/e30d302ebdeb
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/186969.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息