二、(LINUX 线程同步) 互斥量、条件变量以及生产者消费者问题

原创转载请注明出处:

接上一篇:

一、(LINUX 线程同步) 引入  http://blog.itpub.net/7728585/viewspace-2137980/

在线程同步中我们经常会使用到mutex互斥量,其作用用于保护一块临界区,避免多线程并发操作对这片临界区带来的数据混乱,
POSIX的互斥量是一种建议锁,因为如果不使用互斥量也可以访问共享数据,但是可能是不安全的。
其原语包含:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
静态初始一个互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
销毁一个互斥量
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
初始化一个互斥量
int pthread_mutex_lock(pthread_mutex_t *mutex);
互斥量加锁操作,在这个函数调用下的临界区只允许一个线程进行访问,如果不能获得锁则堵塞等待
int pthread_mutex_trylock(pthread_mutex_t *mutex);
互斥量加锁操作,在这个函数调用下的临界区只允许一个线程进行访问,如果获得不了锁则放弃
int pthread_mutex_unlock(pthread_mutex_t *mutex);
互斥量解锁操作,一般用于在临界区数据操作完成后解锁

而条件变量cond则代表当某个条件不满足的情况下,本线程应该放弃锁,并且将本线程堵塞。典型的
生产者消费者问题,如果生产者还没来得及生产东西,消费者则不应该进行消费操作,应该放弃锁,将
自己堵塞,直到条件满足被生产者唤醒。原语包含:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
静态初始一个条件变量
int pthread_cond_init(pthread_cond_t *cond,const pthread_condattr_t *cattr);
初始化一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond)
销毁一个条件变量量
int   pthread_cond_wait(pthread_cond_t   *cond,   pthread_mutex_t   *mutex)   
由于某种条件不满足,解锁和他绑定互斥量,本线程堵塞等待条件成熟被其他线程唤醒,如消费者等待生产者生成完成后被唤醒
注意这里就绑定了一个互斥量,也就是说条件变量一般和某个互斥量配套使用,因为单独的条件变量达不到任何堵塞线程的目的
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
和上面一样,只是加入了堵塞的时间
int pthread_cond_signal(pthread_cond_t *cond)
条件满足唤醒由于wait在这个条件变量上某个线程,一般用于生产者唤醒消费者
int pthread_cond_broadcast(pthread_cond_t *cond)
条件满足唤醒由于wait在这个条件变量上全部线程,一般用于生产者唤醒消费者

下面一张自己画的图希望对大家有所帮助。

最后是我写的一个,用一个链表栈结构,来模拟一个生产者生产数据,N个消费者并发进行消费的
一个生产者消费者程序。生产者一次生产一批数据,消费者一次只消费一个数据。
当生产者一次生产10个数据的时候,输出是这样的。
p:thread:140034536785664 prod 1 data is:data A:0 data B:0
p:thread:140034536785664 prod 2 data is:data A:1 data B:1
p:thread:140034536785664 prod 3 data is:data A:2 data B:2
p:thread:140034536785664 prod 4 data is:data A:3 data B:3
p:thread:140034536785664 prod 5 data is:data A:4 data B:4
p:thread:140034536785664 prod 6 data is:data A:5 data B:5
p:thread:140034536785664 prod 7 data is:data A:6 data B:6
p:thread:140034536785664 prod 8 data is:data A:7 data B:7
p:thread:140034536785664 prod 9 data is:data A:8 data B:8
p:thread:140034536785664 prod 10 data is:data A:9 data B:9
c:thread:140034520000256 cost 10 data is:data A:9 data B:9
c:thread:140034520000256 cost 9 data is:data A:8 data B:8
c:thread:140034520000256 cost 8 data is:data A:7 data B:7
c:thread:140034520000256 cost 7 data is:data A:6 data B:6
c:thread:140034520000256 cost 6 data is:data A:5 data B:5
c:thread:140034520000256 cost 5 data is:data A:4 data B:4
c:thread:140034520000256 cost 4 data is:data A:3 data B:3
c:thread:140034520000256 cost 3 data is:data A:2 data B:2
c:thread:140034520000256 cost 2 data is:data A:1 data B:1
c:thread:140034520000256 cost 1 data is:data A:0 data B:0
也可以看到他确实是一个后进先出的栈模型
这个时候并没有观察到多个线程竞争的问题,如果我将生产者定义为一次生产10000个数据,就可以
看到多个线程交替使用生产者数据的情况。
c:thread:140270541522688 cost 9959 data is:data A:9958 data B:9958 c:thread:140270667347712 cost 9952 data is:data A:9951 data B:9951
我们可以看到不同的线程ID的线程在竞争这把锁
下面是代码,临界区应该尽量选择小,我这里临界区选择较大,比如链表的创建
c:thread:140270692525824 cost 9962 data is:data A:9961 data B:9961
c:thread:140270692525824 cost 9961 data is:data A:9960 data B:9960
c:thread:140270692525824 cost 9960 data is:data A:9959 data B:9959
c:thread:140270541522688 cost 9959 data is:data A:9958 data B:9958
c:thread:140270541522688 cost 9958 data is:data A:9957 data B:9957
c:thread:140270541522688 cost 9957 data is:data A:9956 data B:9956
c:thread:140270541522688 cost 9956 data is:data A:9955 data B:9955
c:thread:140270541522688 cost 9955 data is:data A:9954 data B:9954
c:thread:140270541522688 cost 9954 data is:data A:9953 data B:9953
c:thread:140270541522688 cost 9953 data is:data A:9952 data B:9952
c:thread:140270667347712 cost 9952 data is:data A:9951 data B:9951
其实不需要放到临界区中,还比如内存的delete也不需要放到临界区,处于测试
目的已经达到,就不在纠结这个问题了:

点击(此处)折叠或打开

  1. 头文件:
  2. /*************************************************************************
  3.   > File Name: chain.h
  4.   > Author: gaopeng QQ:22389860 all right reserved
  5.   > Mail: gaopp_200217@163.com
  6.   > Created Time: Sun 04 Jun 2017 06:27:38 AM CST
  7.  ************************************************************************/
  8. #include<iostream>
  9. #include<stdio.h>
  10. #include <stdlib.h>
  11. #include <unistd.h>
  12. #define MAX_TID 10
  13. using namespace std;
  14. class t1data
  15. {
  16.         private:
  17.                 int a;
  18.                 int b;
  19.         public:
  20.                 t1data(){}
  21.                 t1data(int i)
  22.                 {
  23.                         this->a = i;
  24.                         this->b = i;
  25.                 }
  26.                 virtual void prin(void)
  27.                 {
  28.                         cout<<"data A:"<<a<<" data B:"<<b;
  29.                 }
  30. };
  31. typedef struct queue_s 
  32. {
  33.         t1data data;
  34.         queue_s *next,*priv;
  35.         queue_s(int i)
  36.         {
  37.                 data = t1data(i);
  38.                 next = NULL;
  39.                 priv = NULL;
  40.         }
  41. } QUE_S,*QUE_S_P ;
  42. typedef struct queue_h
  43. {
  44.         QUE_S_P head_p;
  45.         QUE_S_P last_p;
  46.         unsigned int len;
  47.         pthread_mutex_t pmut;
  48.         pthread_cond_t pcon;
  49.         queue_h()
  50.         {
  51.                 head_p=NULL;
  52.                 last_p=NULL;
  53.                 len = 0;
  54.         }
  55. } QUE_H;
  56. int debug_return(const int ret)
  57. {
  58.         if(ret != 0)
  59.         {
  60.                 strerror(ret);
  61.                 exit(-1);
  62.         }
  63.         return 0;
  64. }

主文件:

点击(此处)折叠或打开

  1. /*************************************************************************
  2.   > File Name: main.cpp
  3.   > Author: gaopeng QQ:22389860 all right reserved
  4.   > Mail: gaopp_200217@163.com
  5.   > Created Time: Sun 04 Jun 2017 07:18:28 AM CST
  6.  ************************************************************************/
  7. #include<iostream>
  8. #include<stdio.h>
  9. #include <pthread.h>
  10. #include<string.h>
  11. #include"chain.h"
  12. #define MAXDATA 10000
  13. //双向链表栈结构模型
  14. using namespace std;
  15. static int COUNT = 0;
  16. void* pro(void* arg)
  17. {
  18.         QUE_H* my_head;
  19.         my_head = (QUE_H*)arg;
  20.         QUE_S_P sp =NULL;
  21.         int ret = 0;
  22.         while(1)
  23.         {
  24.                 ret = pthread_mutex_lock(&my_head->pmut);
  25.                 debug_return(ret);
  26.                 if(my_head->head_p != NULL)//如果元素没有消费完放弃MUTEX,进入下次循环
  27.                 {
  28.                         //cout<<"pro head != NULL unlock\n";
  29.                         ret = pthread_mutex_unlock(&my_head->pmut);
  30.                         debug_return(ret);
  31.                         continue;
  32.                 }
  33.                 //如果消费完进行生产10000个元素
  34.                 for(;COUNT<MAXDATA;COUNT++)//生产10000个元素
  35.                 {
  36.                         if(my_head->head_p == NULL)
  37.                         {
  38.                                 int tm = 0;
  39.                                 QUE_S_P sp = new QUE_S(COUNT);
  40.                                 my_head->head_p = sp;
  41.                                 my_head->last_p = sp;
  42.                                 cout<<"p:thread:"<<pthread_self()<<" prod "<<++my_head->len<<" data is:";
  43.                                 (my_head->last_p->data).prin();
  44.                                 cout<<"\n";
  45.                                 sp = NULL;
  46.                         }
  47.                         else
  48.                         {
  49.                                 QUE_S_P sp = new QUE_S(COUNT);
  50.                                 my_head->last_p->next = sp;
  51.                                 sp->priv = my_head->last_p;
  52.                                 my_head->last_p = sp;
  53.                                 cout<<"p:thread:"<<pthread_self()<<" prod "<<++my_head->len<<" data is:";
  54.                                 my_head->last_p->data.prin();
  55.                                 cout<<"\n";
  56.                                 sp = NULL;
  57.                         }
  58.                 }
  59.                 //cout<<"pro unlock:\n";
  60.                 ret = pthread_mutex_unlock(&my_head->pmut);
  61.                 debug_return(ret);
  62.                 //cout<<"pro signal:\n";
  63.                 ret = pthread_cond_signal(&my_head->pcon);
  64.                 debug_return(ret);
  65.         }
  66. }
  67. void* cus(void* arg)
  68. {
  69.         int ret = 0;
  70.         QUE_H* my_head;
  71.         my_head = (QUE_H*)arg;
  72.         QUE_S_P tmp=NULL;
  73.         while(1)//消费方式为一个消费线程只消费一个元素,来模拟多消费线程竞争
  74.         {
  75.                 ret = pthread_mutex_lock(&my_head->pmut);
  76.                 debug_return(ret);
  77.                 while(my_head->head_p == NULL) //如果已经消费完放弃锁,等到条件及有元素供消费
  78.                 {
  79.                         //cout<<"cus cond wait\n";
  80.                         ret = pthread_cond_wait(&my_head->pcon,&my_head->pmut);
  81.                         debug_return(ret);
  82.                 }
  83.                 if(my_head->len == 1)//消费如果只剩下一个元素处理
  84.                 {
  85.                         cout<<"c:thread:"<<pthread_self()<<" cost "<<my_head->len--<<" data is:";
  86.                         my_head->last_p->data.prin();
  87.                         delete my_head->last_p;
  88.                         my_head->head_p = NULL;
  89.                         my_head->last_p = NULL;
  90.                         COUNT--;
  91.                 }
  92.                 else//否则处理如下
  93.                 {
  94.                         cout<<"c:thread:"<<pthread_self()<<" cost "<<my_head->len--<<" data is:";
  95.                         my_head->last_p->data.prin();
  96.                         cout<<"\n";
  97.                         tmp = my_head->last_p->priv;
  98.                         delete my_head->last_p;
  99.                         my_head->last_p= tmp;
  100.                         tmp=NULL;
  101.                         COUNT--;
  102.                 }
  103.                 ret = pthread_mutex_unlock(&my_head->pmut);
  104.                 debug_return(ret);
  105.         }
  106. }
  107. int main(void)
  108. {
  109.         QUE_H my_head;
  110.         int ret = 0;
  111.         pthread_t tid[MAX_TID];
  112.         int tid_num = 0;
  113.         int i = 0;
  114.         ret = pthread_mutex_init(&my_head.pmut,NULL);
  115.         debug_return(ret);
  116.         ret = pthread_cond_init(&my_head.pcon,NULL);
  117.         debug_return(ret);
  118.         //一个生产者
  119.         ret = pthread_create(tid+tid_num,NULL,pro,(void*)&my_head);
  120.         debug_return(ret);
  121.         tid_num++;
  122.         //n个消费者
  123.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  124.         debug_return(ret);
  125.         tid_num++;
  126.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  127.         debug_return(ret);
  128.         tid_num++;
  129.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  130.         debug_return(ret);
  131.         tid_num++;
  132.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  133.         debug_return(ret);
  134.         tid_num++;
  135.         ret = pthread_create(tid+tid_num,NULL,cus,(void*)&my_head);
  136.         debug_return(ret);
  137.     //堵塞回收
  138.         for(i = 0;i<=tid_num;i++)
  139.         {
  140.                 ret = pthread_join( *(tid+i) , NULL );
  141.                 debug_return(ret);
  142.         }
  143.         ret=pthread_mutex_destroy(&my_head.pmut);
  144.         debug_return(ret);
  145.         ret=pthread_cond_destroy(&my_head.pcon);
  146.         debug_return(ret);
  147. }

作者微信:

               

时间: 2024-09-19 23:57:50

二、(LINUX 线程同步) 互斥量、条件变量以及生产者消费者问题的相关文章

并发编程(一): POSIX 使用互斥量和条件变量实现生产者/消费者问题

    boost的mutex,condition_variable非常好用.但是在Linux上,boost实际上做的是对pthread_mutex_t和pthread_cond_t的一系列的封装.因此通过对原生态的POSIX 的mutex,cond的生成者,消费者的实现,我们可以再次体会boost带给我们的便利. 1. 什么是互斥量        互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁.对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线程将

Linux线程同步之递归锁

概述 最常见的进程/线程的同步方法有互斥锁(或称互斥量Mutex),读写锁(rdlock),条件变量(cond),信号量(Semophore)等.在Windows系统中,临界区(Critical Section)和事件对象(Event)也是常用的同步方法. 简单的说,互斥锁保护了一个临界区,在这个临界区中,一次最多只能进入一个线程.如果有多个进程在同一个临界区内活动,就有可能产生竞态条件(race condition)导致错误. 读写锁从广义的逻辑上讲,也可以认为是一种共享版的互斥锁.如果对一个

Linux线程同步之条件变量

条件变量是线程可用的另一种同步机制.条件变量给多个线程提供了一个会合的场所.条件本身是由互斥量保护的.线程在改变 条件状态前必须首先锁住互斥量. 条件变量的初始化 pthread_cond_init 去除初始化 pthread_cond_destroy 等待 pthread_cond_wait 满足条件给向进程发送信号 pthread_cond_signal 下面程序展示了利用条件变量等待另外两个线程满足条件时,第三个进程继续向前执行 #include <stdio.h> #include &

Linux多线程使用互斥量同步线程_Linux

本文将会给出互斥量的详细解说,并用一个互斥量解决上一篇文章中,要使用两个信号量才能解决的只有子线程结束了对输入的处理和统计后,主线程才能继续执行的问题. 一.什么是互斥量 互斥量是另一种用于多线程中的同步访问方法,它允许程序锁住某个对象,使得每次只能有一个线程访问它.为了控制对关键代码的访问,必须在进入这段代码之前锁住一个互斥量,然后在完成操作之后解锁. 二.互斥量的函数的使用 它们的定义与使用信号量的函数非常相似,它们的定义如下: #include <pthread.h> int pthre

四种进程或线程同步互斥的控制方法

进程|控制 很想整理一下自己对进程线程同步互斥的理解.正巧周六一个刚刚回到学校的同学请客吃饭.在吃饭的过程中,有两个同学,为了一个问题争论的面红耳赤.一个认为.Net下的进程线程控制模型更加合理.一个认为Java下的线程池策略比.Net的好.大家的话题一下转到了进程线程同步互斥的控制问题上.回到家,想了想就写了这个东东.  现在流行的进程线程同步互斥的控制机制,其实是由最原始最基本的4种方法实现的.由这4种方法组合优化就有了.Net和Java下灵活多变的,编程简便的线程进程控制手段.  这4种方

linux C++ 多线程使用pthread_cond 条件变量

1. 背景 多线程中经常需要使用到锁(pthread_mutex_t)来完成多个线程之间的互斥操作. 但是互斥锁有一个明显到缺点: 只有两种状态,锁定和非锁定. 而条件变量则通过允许线程阻塞并等待另一个线程发送唤醒信号的方法弥补了互斥锁的不足,它常和互斥锁一起使用.   2. 条件变量涉及到的主要函数 2.1 pthread_cond_wait 线程阻塞在条件变量    int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mute

同步-互斥量 临界区 信号量 条件变量 效率对比

问题描述 互斥量 临界区 信号量 条件变量 效率对比 windows 下,如下四种同步方式中,互斥量 临界区 信号量 条件变量, 效率对比,求指导,谢谢 解决方案 据我自己在windows下的测试,信号量比条件变量高效,大概是其6倍 解决方案二: 条件变量效率应该最高,但是可能不是那么线程安全 临界区效率第二好 解决方案三: 临界区(Critical Section)(同一个进程内,实现互斥) 保证在某一时刻只有一个线程能访问数据的简便办法.在任意时刻只允许一个线程对共享资源进行访问.如果有多个

linux内核同步之每CPU变量、原子操作、内存屏障、自旋锁【转】

转自:http://blog.csdn.net/goodluckwhh/article/details/9005585 版权声明:本文为博主原创文章,未经博主允许不得转载.   目录(?)[-] 一每CPU变量 二原子操作 三优化和内存屏障 四自旋锁 自旋锁 自旋锁的数据结构和宏函数 读写自旋锁 读写自旋锁的相关函数   linux内核中的各种"任务"都能看到内核地址空间,因而它们之间也需要同步和互斥.linux内核支持的同步/互斥手段包括:   技术 功能 作用范围 每CPU变量 为

C#中的线程(二)线程同步

Keywords:C# 线程Source:http://www.albahari.com/threading/Author: Joe AlbahariTranslator: Swanky WuPublished: http://www.cnblogs.com/txw1958/Download:http://www.albahari.info/threading/threading.pdf    第二部分:线程同步基础   同步要领 下面的表格列展了.NET对协调或同步线程动作的可用的工具: 简易