Linux 多线程编程(二)2019-08-10

Linux 多线程编程(二)2019-08-10,第1张

三种专门用于线程同步的机制:POSIX信号量,互斥量和条件变量.

在Linux上信号量API有两组,一组是System V IPC信号量,即PV操作,另外就是POSIX信号量,POSIX信号量的名字都是以sem_开头.

phshared参数指定信号量的类型,若其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量可以在多个进程之间共享.value值指定信号量的初始值,一般与下面的sem_wait函数相对应.

其中比较重要的函数sem_wait函数会以原子操作的方式将信号量的值减一,如果信号量的值为零,则sem_wait将会阻塞,信号量的值可以在sem_init函数中的value初始化sem_trywait函数是sem_wait的非阻塞版本sem_post函数将以原子的操作对信号量加一,当信号量的值大于0时,其他正在调用sem_wait等待信号量的线程将被唤醒.

这些函数成功时返回0,失败则返回-1并设置errno.

生产者消费者模型:

生产者对应一个信号量:sem_t producer

消费者对应一个信号量:sem_t customer

sem_init(&producer,2)----生产者拥有资源,可以工作

sem_init(&customer,0)----消费者没有资源,阻塞

在访问公共资源前对互斥量设置(加锁),确保同一时间只有一个线程访问数据,在访问完成后再释放(解锁)互斥量.

互斥锁的运行方式:串行访问共享资源

信号量的运行方式:并行访问共享资源

互斥量用pthread_mutex_t数据类型表示,在使用互斥量之前,必须使用pthread_mutex_init函数对它进行初始化,注意,使用完毕后需调用pthread_mutex_destroy.

pthread_mutex_init用于初始化互斥锁,mutexattr用于指定互斥锁的属性,若为NULL,则表示默认属性。除了用这个函数初始化互斥所外,还可以用如下方式初始化:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER。

pthread_mutex_destroy用于销毁互斥锁,以释放占用的内核资源,销毁一个已经加锁的互斥锁将导致不可预期的后果。

pthread_mutex_lock以原子操作给一个互斥锁加锁。如果目标互斥锁已经被加锁,则pthread_mutex_lock则被阻塞,直到该互斥锁占有者把它给解锁.

pthread_mutex_trylock和pthread_mutex_lock类似,不过它始终立即返回,而不论被操作的互斥锁是否加锁,是pthread_mutex_lock的非阻塞版本.当目标互斥锁未被加锁时,pthread_mutex_trylock进行加锁操作;否则将返回EBUSY错误码。注意:这里讨论的pthread_mutex_lock和pthread_mutex_trylock是针对普通锁而言的,对于其他类型的锁,这两个加锁函数会有不同的行为.

pthread_mutex_unlock以原子操作方式给一个互斥锁进行解锁操作。如果此时有其他线程正在等待这个互斥锁,则这些线程中的一个将获得它.

三个打印机轮流打印:

输出结果:

如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量就是用于在线程之间同步共享数据的值.条件变量提供了一种线程之间通信的机制:当某个共享数据达到某个值时,唤醒等待这个共享数据的线程.

条件变量会在条件不满足的情况下阻塞线程.且条件变量和互斥量一起使用,允许线程以无竞争的方式等待特定的条件发生.

其中pthread_cond_broadcast函数以广播的形式唤醒所有等待目标条件变量的线程,pthread_cond_signal函数用于唤醒一个等待目标条件变量线程.但有时候我们可能需要唤醒一个固定的线程,可以通过间接的方法实现:定义一个能够唯一标识目标线程的全局变量,在唤醒等待条件变量的线程前先设置该变量为目标线程,然后采用广播的方式唤醒所有等待的线程,这些线程被唤醒之后都检查该变量以判断是否是自己.

采用条件变量+互斥锁实现生产者消费者模型:

运行结果:

阻塞队列+生产者消费者

运行结果:

这个问题需要的知识主要包括:

1 多进程间进行通信;

2 使用同步信号量(semaphore)和互斥信号量(mutex)进行数据保护。

参考代码如下,可以参照注释辅助理解:

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <pthread.h>

#include <semaphore.h>

#define N 2   // 消费者或者生产者的数目

#define M 10 // 缓冲数目

int in = 0   // 生产者放置产品的位置

int out = 0 // 消费者取产品的位置

int buff[M] = {0} // 缓冲初始化为0, 开始时没有产品

sem_t empty_sem // 同步信号量, 当满了时阻止生产者放产品

sem_t full_sem   // 同步信号量, 当没产品时阻止消费者消费

pthread_mutex_t mutex // 互斥信号量, 一次只有一个线程访问缓冲

int product_id = 0   //生产者id

int prochase_id = 0 //消费者id

/* 打印缓冲情况 */

void print()

{

int i

for(i = 0 i < M i++)

   printf("%d ", buff[i])

printf("\n")

}

/* 生产者方法 */ 

void *product()

{

int id = ++product_id

while(1)

{

   // 用sleep的数量可以调节生产和消费的速度,便于观察

   sleep(1)

   //sleep(1)

  

   sem_wait(&empty_sem)

   pthread_mutex_lock(&mutex)

  

   in = in % M

   printf("product%d in %d. like: \t", id, in)

  

   buff[in] = 1  

   print()  

   ++in

  

   pthread_mutex_unlock(&mutex)

   sem_post(&full_sem)  

}

}

/* 消费者方法 */

void *prochase()

{

int id = ++prochase_id

while(1)

{

   // 用sleep的数量可以调节生产和消费的速度,便于观察

   sleep(1)

//sleep(1)

  

   sem_wait(&full_sem)

   pthread_mutex_lock(&mutex)

  

   out = out % M

   printf("prochase%d in %d. like: \t", id, out)

  

   buff[out] = 0

   print()

   ++out

  

   pthread_mutex_unlock(&mutex)

   sem_post(&empty_sem)

}

}

int main()

{

pthread_t id1[N]

pthread_t id2[N]

int i

int ret[N]

// 初始化同步信号量

int ini1 = sem_init(&empty_sem, 0, M) 

int ini2 = sem_init(&full_sem, 0, 0)  

if(ini1 && ini2 != 0)

{

   printf("sem init failed \n")

   exit(1)

//初始化互斥信号量 

int ini3 = pthread_mutex_init(&mutex, NULL)

if(ini3 != 0)

{

   printf("mutex init failed \n")

   exit(1)

// 创建N个生产者线程

for(i = 0 i < N i++)

{

   ret[i] = pthread_create(&id1[i], NULL, product, (void *)(&i))

   if(ret[i] != 0)

   {

    printf("product%d creation failed \n", i)

    exit(1)

   }

}

//创建N个消费者线程

for(i = 0 i < N i++)

{

   ret[i] = pthread_create(&id2[i], NULL, prochase, NULL)

   if(ret[i] != 0)

   {

    printf("prochase%d creation failed \n", i)

    exit(1)

   }

}

//销毁线程

for(i = 0 i < N i++)

{

   pthread_join(id1[i],NULL)

   pthread_join(id2[i],NULL)

}

exit(0) 

}

在Linux下编译的时候,要在编译命令中加入选项-lpthread以包含多线程支持。比如存储的C文件为demo.c,要生成的可执行文件为demo。可以使用命令:

gcc demo.c -o demo -lpthread

程序中为便于观察,使用了sleep(1)来暂停运行,所以查看输出的时候可以看到,输出是每秒打印一次的。

#include<stdio.h>  

#include<stdlib.h>  

#include<unistd.h>  

#include<semaphore.h>  

#include<pthread.h>  

  

#define PRODUCER 10//生产者数量  

#define CONSUMER 8//消费者数量  

#define BUFFER 20//缓冲区数量  

  

sem_t empty,full//同步信号量  

pthread_mutex_t mutex//互斥信号量  

int buffer[BUFFER] //缓冲区  

  

int producer_id=0,consumer_id=0//生产者消费者ID  

int index_in=0,index_out=0//生产者 消费者 存放 消费的位置  

  

void print()//输出缓冲区  

{  

   int i  

   printf("Buffer:\n")  

   for(i=0i<20i++)  

   {  

      printf("___")  

   }  

   printf("\n")  

   for(i=0i<20i++)  

      printf("|%d|",buffer[i])  

   printf("\n")  

   for(i=0i<20i++)  

   {  

      printf("———")  

   }   

   printf("\n")  

}  

void *Producer()//生产者函数  

{  

   int ID=++producer_id  

  

   while(1)  

   {  

     sleep(3)  

     sem_wait(&empty)  

     pthread_mutex_lock(&mutex)  

     index_in=index_in%BUFFER  

  

     printf("Producer %d in %d.\n",ID,index_in)  

     buffer[index_in]=1//缓冲区置0  

     print()//输出缓冲区情况  

     index_in++  

     pthread_mutex_unlock(&mutex)  

     sem_post(&full)  

   }  

}  

void *Consumer()//消费者函数  

{  

   int ID=++consumer_id  

  

   while(1)  

   {  

     sleep(3)  

     sem_wait(&full)  

     pthread_mutex_lock(&mutex)  

     index_out=index_out%BUFFER  

  

     printf("\033[0134mConsumer %d in %d\033[0m\n",ID,index_out)  

     buffer[index_out]=0//缓冲区置0  

     print()//输出缓冲区情况  

     index_out++  

     pthread_mutex_unlock(&mutex)  

     sem_post(&empty)  

   }  

}  

  

int main()  

{  

   //freopen("text.txt","w",stdout)  

   int rthread[18],i  

   pthread_t producer[PRODUCER]//生产者  

   pthread_t consumer[CONSUMER]//消费者  

  

   int sinit1=sem_init(&empty,0,BUFFER)//初始化同步信号量  

   int sinit2=sem_init(&full,0,0)  

   int minit =pthread_mutex_init(&mutex,NULL)//初始化互斥信号量  

   if(sinit1 && sinit2)  

   {  

     printf("sem initialize failed /n")  

     exit(1)  

   }  

   if(minit)  

   {  

     printf("sem initialize failed /n")  

     exit(1)  

   }  

   for(i=0i<PRODUCERi++)//创建生产者线程  

   {  

      rthread[i]=pthread_create(&producer[i], NULL, Producer, NULL)  

      if(rthread[i])  

      {  

          printf("producer %d create failed /n", i)  

          exit(1)  

      }  

   }  

   for(i=0i<CONSUMERi++)//创建消费者线程  

   {  

      rthread[i]=pthread_create(&consumer[i], NULL, Consumer,NULL)  

      if(rthread[i])  

      {  

          printf("consumer %d create failed /n", i)  

          exit(1)  

      }  

   }  

   for(i=0i<PRODUCERi++)//销毁生产者线程  

   {  

      pthread_join(producer[i],NULL)  

   }  

   for(i=0i<CONSUMERi++)//销毁生产者线程  

   {  

      pthread_join(consumer[i],NULL)  

   }  

   exit(0)  

}


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/478027.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-06-08
下一篇2023-06-08

发表评论

登录后才能评论

评论列表(0条)

    保存