目錄
一、POSIX信號量
1. 什么是信號量
2. 信號量的基本原理
二、與信號量相關的操作
1. 初始化信號量
2. 銷毀信號量
3. 等待信號量
4. 發布信號量
三、基于環形隊列的生產者消費者模型
1. 空間資源和數據資源
2. 生產者和消費者申請和釋放資源?
四、模擬實現基于環形隊列的生產者消費者模型
1. 單生產者與單消費者?
2. 多生產者與多消費者
UNIX/LINUX?????????在之前的博客中,我們利用加鎖解鎖保證了每次只有一個線程進入臨界資源,但是臨界資源很多也很大,如果每次只允許一個線程進入臨界資源往往會使效率很低。但是將臨界資源劃分為多個獨立的區域,劃分為多少個區域就可以讓多少個線程進入。信號量可以理解為一個計數器,它是用來描述臨界資源的有效個數;
????????但是這樣就同時帶來了一個問題 ---> 如果劃分為了5個區域,但是同時進入了10個線程該怎么辦?所以這一點可以通過信號量解決。但如何保證信號量是原子性的呢?
P操作:我們將申請信號量稱為P操作,申請信號量的本質就是申請獲得臨界資源中某塊資源的使用權限,當申請成功時臨界資源中資源的數目應該減一,因此P操作的本質就是讓計數器減一。
V操作:我們將釋放信號量稱為V操作,釋放信號量的本質就是歸還臨界資源中某塊資源的使用權限,當釋放成功時臨界資源中資源的數目就應該加一,因此V操作的本質就是讓計數器加一。
- 結合上圖和PV操作的理解,我們可以看出,當多個執行流申請信號量時,信號量本質上就是臨界資源,對信號量的PV操作表面看似是++和--操作,但是我們知道++和--不是原子性操作,所有我們就要保證PV操作是原子性操作,結合圖中右側的偽代碼,可以看出都對PV操作進行的加鎖和解鎖的操作,這樣的目的是為了保證申請和釋放信號量時是原子性;
- 當執行流申請信號量時,可能此時信號量為0,說明信號量描述的臨界資源被申請完了,那么這個執行流就要掛起等待,在信號量等待隊列中等待,直到有信號量釋放被喚醒。
- 信號量的本質是計數器,但不意味著只有計數器,信號量還包括一個等待隊列。
??要使用信號量就需要創建一個 sem_t 類型的變量
#include <semaphore.h>//頭文件
sem_t sem1;
在使用信號量前,需要對這個變量進行初始化,使用的函數是?sem_init?
int sem_init(sem_t *sem, int pshared, unsigned int value);
參數說明:
- sem:需要初始化的信號量。
- pshared:一般給0,傳入0值表示線程間共享,傳入非零值表示進程間共享。
- value:信號量的初始值(計數器的初始值)。
返回值說明:
- 初始化信號量成功返回0,失敗返回-1。
信號量使用完畢需要用 sem_destory 進行銷毀?
int sem_destroy(sem_t *sem);
參數說明:
- sem:需要銷毀的信號量。
返回值說明:
- 銷毀信號量成功返回0,失敗返回-1。
POSIX信號量中的P操作對應的接口是 sem_waitm?
int sem_wait(sem_t* sem);
信號量做減1操作
?POSIX信號量中的V操作對應的接口是 sem_post?
int sem_post(sem_t* sem);
信號量做加1操作
????????基于阻塞隊列的生產者與消費者模型存在一個很大的問題就是他們其實是在串行運行的,并沒有并行運行,這就導致他們的效率不是很高,而使用環形隊列則可以解決這個問題。
????????這樣的模型為什么可以實現并行操作呢?舉例來說,當消費者和生產者啟動時,由于隊列中全部為空,所以即便消費者先運行它也會因為沒有數據而被掛起,所以生產者就會先運行生產數據。一旦產生了數據,數據的信號量增加,于是消費者拿到信號進行消費,一旦所有空格都存放了數據,那么生產者就會掛起,當消費者消費完一個數據,然后歸還空格,于是生產者又會拿到信號啟動生產。這樣,只要隊列中同時有空格和數據,生產者和消費者就能同時運行。?
生產者關注的是空間資源,消費者關注的是數據資源?
- 只要環形隊列中有空間,生產者就可以進行生產
- 只要環形隊列中有數據,消費者就可以消費數據
????????我們假設空間資源為 block_sem , 數據資源為 data_sem。在對這兩個信號量進行初始化的時候,我們將?block_sem 初始化為環形隊列的容量,將?data_sem?初始化為0;(假設環形隊列中沒有任何數據)
1. 生產者申請空間資源,釋放數據資源:
- 如果block_sem不為0,表明環形隊列中有空間資源,生產者申請block_sem成功,那么對應的操作就是P(block_sem),向空間內加入數據;然后釋放數據資源,即V(data_sem),此時隊列中多了1塊空間,那么data_sem就要 –1;
- 如果block_sem為0,那么生產者申請信號量失敗,此時生產者就要掛起等待,等待有新的空間資源
2.消費者申請數據資源,釋放空間資源:
- 如果data_sem不為0,表明環形隊列中有數據,消費者申請data_sem成功,對應的操作時P(data_sem),從環形隊列中取出數據;然后釋放空間資源,即V(block_sem),此時空間資源就多了一個,那么block_sem既要 +1;
- 如果data_sem為0,消費者申請data_sem失敗,此時消費者掛起等待,等待新的數據資源。
注意點:
- 如果生產者生產的快,消費者消費的慢,當生產者在生成的過程中遇到了消費者并超過了消費者,那么再生產的數據就會覆蓋掉,是絕對不允許的,此時生產者就要掛起等待。
- 如果消費者消費的快,生產者生產的慢,當消費者在消費的過程中遇到了生成者并超過了生產者,那么再消費的數據就有可能是緩存中的廢棄數據,是絕對不允許的,此時消費者就要掛起等待。
歸根結底,還是環形隊列判空判滿的一個問題:
????????上圖中,雖然肉眼可見左為空,右為滿,但程序不一定能判斷出來;所以生產者在生成的時候和消費者在消費的時候,我們要對其下標進行一個合理的控制,確保生產者和消費者之間不會存在沖突。環形隊列在之前的博客中有講到過,這里簡單的提一下,判空:生產者和消費指向同一個位置;判滿:生產者和消費者之間要預留一個空間;具體操作就是模運算;我們可以看看下面的模擬實現。
我們采用SLT中的vector來實現環形隊列
ring_queue.hpp如下:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h> using namespace std; namespace ns_ring_queue
{ const int g_cap_default = 10; //假設環形隊列能存放10個數據 template <class T> class RingQueue { private: vector<T> ring_queue_; //環形隊列 int cap_; //環形隊列的容量上限 sem_t blank_sem_; //生產者關心空位置資源(信號量) sem_t data_sem_; //消費者關心數據(信號量)int c_step_; //記錄消費者的下標 int p_step_; //記錄生產者的下標 public: RingQueue(int cap = g_cap_default):ring_queue_(cap), cap_(cap) { sem_init(&blank_sem_, 0, cap); //初始化空間資源(信號量) sem_init(&data_sem_, 0, 0); //初始化數據資源(信號量)c_step_ = p_step_ = 0; } ~RingQueue() { sem_destroy(&blank_sem_); sem_destroy(&data_sem_); } public: void Push(const T& in)//生產接口 { sem_wait(&blank_sem_);//p空位置(申請空間信號量)ring_queue_[p_step_] = in; //向環形隊列中放數據sem_post(&data_sem_);//v數據 (發布數據信號量)p_step_++; p_step_ %= cap_; } void Pop(T* out)//消費接口 { sem_wait(&data_sem_);//p數據 (申請數據信號量)*out = ring_queue_[c_step_]; //從環形隊列中取數據sem_post(&blank_sem_);//v空位置 (發布空間信號量)c_step_++; c_step_ %= cap_; } };
}
?ring_cp.cc如下:
#include "ring_queue.hpp"
#include <pthread.h>
#include <time.h>
#include <unistd.h>
using namespace ns_ring_queue; void* consumer(void* args)
{ RingQueue<int>* rq = (RingQueue<int>*)args; while(true) { sleep(1); int data = 0; rq->Pop(&data); printf("消費的數據是:%d\n", data); } } void* producter(void* args)
{ RingQueue<int>* rq = (RingQueue<int>*)args; while(true) { sleep(1);int data = rand() % 20 + 1; printf("生產的數據是:%d\n", data); rq->Push(data); } } int main()
{ srand((long long)time(nullptr)); RingQueue<int>* rq = new RingQueue<int>(); pthread_t c, p; pthread_create(&c, nullptr, consumer,(void*)rq); pthread_create(&p, nullptr, producter,(void*)rq); pthread_join(c, nullptr); pthread_join(p, nullptr); return 0;
}
????????我們將生產者和消費者全部先休眠1秒后再生產和消費數據,運行發現,生產者生產一個數據,消費者消費一個數據
????????我們還是以上面的代碼為例,我們不在進行單純的放數據和拿數據,我們讓生產者生產出一批計算任務然后讓消費者去計算?
ring_queue.hpp如下:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
using namespace std; namespace ns_ring_queue
{ const int g_cap_default = 10; template <class T> class RingQueue { private: vector<T> ring_queue_; int cap_; //生產者關心空位置資源 sem_t blank_sem_; //消費者關心數據 sem_t data_sem_; int c_step_; int p_step_; pthread_mutex_t c_mtx_; pthread_mutex_t p_mtx_; public: RingQueue(int cap = g_cap_default):ring_queue_(cap), cap_(cap) { sem_init(&blank_sem_, 0, cap); sem_init(&data_sem_, 0, 0); c_step_ = p_step_ = 0; pthread_mutex_init(&c_mtx_, nullptr); pthread_mutex_init(&p_mtx_, nullptr); } ~RingQueue() { sem_destroy(&blank_sem_); sem_destroy(&data_sem_); pthread_mutex_destroy(&c_mtx_); pthread_mutex_destroy(&p_mtx_); } public: void Push(const T& in) { //生產接口 sem_wait(&blank_sem_);//p空位置 pthread_mutex_lock(&p_mtx_); ring_queue_[p_step_] = in; p_step_++; p_step_ %= cap_; pthread_mutex_unlock(&p_mtx_); sem_post(&data_sem_);//v數據 } void Pop(T* out) { //消費接口 sem_wait(&data_sem_);//p數據 pthread_mutex_lock(&c_mtx_); *out = ring_queue_[c_step_]; c_step_++; c_step_ %= cap_; pthread_mutex_unlock(&c_mtx_); sem_post(&blank_sem_);//v空位置 } };
}
????????對于多生產者和多消費者來說,我們要保證他們各自之間要滿足互斥,就必須加鎖,那么這把鎖是在信號量之前加還是之后加呢?
首先明確一點,信號量也是原子性的(上文已經提到過了)
1.在獲取信號量之前進行加鎖
? ? ? ? 在這種情況下,也就意味著只有一個執行流能夠競爭到鎖,然后申請信號量,那么我們對這個臨界資源進行劃分的意義何在呢,和之前的單生產者單消費者沒有太大的區別,顯然沒有太大的價值;
2.在獲取信號量之后進行加鎖
? ? ? ? 在這種情況下,當多個執行流訪問臨界資源的時候,他們都要去申請信號量,但是只會有一個執行流競爭鎖成功,等到這個執行流執行完畢后,下一個執行流就不需要再去申請信號量然后競爭鎖,因為它是拿著信號量被掛起的。
????????總的來說,在獲取信號量之后進行加鎖,確保了每個執行流都能預定到相應的部分臨界資源,相比第一種做法效率高一些;
ring_cp.cc如下:
#include "ring_queue.hpp"
#include "Task.hpp"
#include <time.h>
#include <string>
#include <unistd.h>
using namespace ns_ring_queue;
using namespace ns_task;
void* consumer(void* args)
{ RingQueue<Task>* rq = (RingQueue<Task>*)args; while(true) { sleep(1); Task t; rq->Pop(&t); t(); } } void* producter(void* args)
{ RingQueue<Task>* rq = (RingQueue<Task>*)args; const string ops = "+-*/%"; while(true) { sleep(1); int x = rand() % 20 + 1; int y = rand() % 10 + 1; char op = ops[rand() % ops.size()]; Task t(x, y, op); printf("我生產的數據是:%d %c %d =? 我是:%lu\n",x ,op ,y, pthread_self()); rq->Push(t); } } int main()
{ srand((long long)time(nullptr)); RingQueue<Task>* rq = new RingQueue<Task>(); pthread_t c0, c1, c2, c3, p0, p1, p2; pthread_create(&c0, nullptr, consumer,(void*)rq); pthread_create(&c1, nullptr, consumer,(void*)rq); pthread_create(&c2, nullptr, consumer,(void*)rq); pthread_create(&c3, nullptr, consumer,(void*)rq); pthread_create(&p0, nullptr, producter,(void*)rq); pthread_create(&p1, nullptr, producter,(void*)rq); pthread_create(&p2, nullptr, producter,(void*)rq); pthread_join(c0, nullptr); pthread_join(c1, nullptr); pthread_join(c2, nullptr); pthread_join(c3, nullptr); pthread_join(p0, nullptr); pthread_join(p1, nullptr); pthread_join(p2, nullptr); return 0;
}
Task.hpp如下:
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std; namespace ns_task
{ class Task { private: int x_; int y_; char op_;//用來表示:+ 、- 、* 、/ 、% public: Task(){} Task(int x, int y, char op):x_(x), y_(y), op_(op){} string show() { string message = to_string(x_); message += op_; message += to_string(y_); message += "=?"; return message; } int Run() { int res = 0; switch(op_) { case '+': res = x_ + y_; break; case '-': res = x_ - y_; break; case '*': res = x_ * y_; break; case '/': res = x_ / y_; break; case '%': res = x_ % y_; break; default: cout << "bug" << endl; break; } printf("當前任務正在被:%lu處理,處理結果為:%d %c %d = %d\n",pthread_self(), x_, op_, y_, res); return res; } int operator()() { return Run(); } ~Task(){} };
}
運行結果如下:
?
?
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态