系统运转中枢,本文介绍了一个基于环形队列的消息队列实现方法,给出了它的数据结构、主要操作流程和核心代码。
环形队列
环行队列是一种首尾相连的队列数据结构,遵循先进先出原则,如下图所示:
ring buffer
示意图
在环形队列中用一组连续地址的存储单元依次存放从队列头到队列尾的元素,通过两个指针
read_pos 和 write_pos 分别指向读取位置和写入位置。
初始化队列时,令 read_pos = write_pos = 0,每当写入一个新元素时,
write_pos 增 1;每当读取一个元素时,read_pos 增 1 。若队列已满,不能往队列写入数据;若队列为空,则不能读取数据。判断对列是否为满的的方法是看 (write_pos + 1)% QUEUE_SIZE == read_pos
是否成立,判断队列是否为空的方法是看 write_pos == read_pos 是否成立。
鉴于多个线程同时访问环形队列,需要考虑线程之间的互斥和同步问题,拟采用锁控制多个线程互斥访问环形队列,使用信号量控制线程之间的同步。
一段时间内只能有一个线程获得锁,当它持有锁时,其它线程要访问环形队列必须等待,直到前者释放锁。由此,锁可以保证多个线程互斥的访问环形队列。
线程从队列对数据前首先判断信号量是否大于 1
,若是,则从队列读数据;否则,进入等待状态,直到信号量大于 1 为止;线程往队列写入一个数据后,会将信号量增 1
,若有线程在等待,则会被唤醒。由此,信号量实现了多线程同步访问环形队列。
流程图
下图是环形缓冲区的初始化、读数据、写数据的主要流程。
ring buffer
流程图
初始化时为环形队列分配内存空间,并完成锁和信号量的初始化;
若往环形队列写数据,首先要获得锁, 若锁已被占用,则进入等待状态,否
则进一步去判断环形队列是否已满。若满了,则释放锁并返回;若队列未满,将 数据写入 write_pos 位置,write_pos 增 1,释放锁并将信号量增
1,表示 已写入一个数据;
若从环形队列读数据,首先判断信号量是否大于 1 ,若不是,则等待,否则
去获取锁,若锁已被占用,则等待,否则从 read_pos 位置读取数据,将 read_pos 增 1 ,释放锁,读取完毕。
数据结构
环形队列的数据结构如下所示:
typedef _MSG {
int message
void* param
} MSG
typedef _MSGQUE {
pthread_mutex_t lock
sem_t wait
MSG* msg
int size
int read_ops
int write_ops
} MSGQUEUE
环形队列包括如下数据:
lock:互斥锁;
wait:信号量
msg:指向数据区的指针;
size:环形队列数据最大个数;
read_ops:读取位置;
write_ops:写入位置。
队列初始化
初始化主要完成三个任务:
为环形队列分配内存;
初始化互斥锁,用 pthread_mutex_init
完成;
初始化信号量,用 sem_init
完成。
/* Create message queue */
_msg_queue = malloc (sizeof (MSGQUEUE))
/* init lock and sem */
pthread_mutex_init (&_msg_queue->lock, NULL)
sem_init (&_msg_queue->wait, 0, 0)
/* allocate message memory */
_msg_queue ->msg = malloc (sizeof(MSG) * nr_msg)
_msg_queue ->size = nr_msg
写操作
如上面的流程图介绍,写操作主要包括如下几步: - 获取锁;
判断队列是否已满;
若没满,将数据写入 write_pos 处,将 write_pos 增 1,并判断
write_pos 是否越界;
释放锁,并将信号量增 1。
/* lock the message queue */
pthread_mutex_lock (_msg_queue->lock)
/* check if the queue is full. */
if ((_msg_queue->write_pos + 1)% _msg_queue->size == _msg_queue->read_pos) {
/* Message queue is full. */
pthread_mutex_unlock (_msg_queue->lock)
return
}
/* write a data to write_pos. */
_msg_queue ->msg [write_pos] = *msg
write_pos ++
/* check if write_pos if overflow. */
if (_msg_queue->write_pos >= _msg_queue->size)
_msg_queue->write_pos = 0
/* release lock */
pthread_mutex_unlock (_msg_queue->lock)
sem_post (_msg_queue->wait)
读操作
同理,读操作分如下几个步骤:
检查信号量;
获取锁;
判断队列是否为空;
若不为空,则读取 read_ops 处的数据,将 read_ops 增 1,并判断
read_pos 是否越界;
并释放锁。
sem_wait (_msg_queue->wait)
/* lock the message queue */
pthread_mutex_lock (_msg_queue->lock)
/* check if queue is empty */
if (_msg_queue->read_pos != _msg_queue->write_pos) {
msg = _msg_queue->msg + _msg_queue->read_pos
/* read a data and check if read_pos is overflow */
_msg_queue->read_pos ++
if (_msg_queue->read_pos >= _msg_queue->size)
_msg_queue->read_pos = 0
return
}
/* release lock*/
pthread_mutex_unlock (_msg_queue->lock)
你就一个信号量,而且两个线程都是先sem_wait,你的两个线程都会卡在sem_wait(&sem1)你可以把其中给一个线程的sem_wait跟sem_post换个位置试一下
编译时加上参数:-lpthread要看报错的阶段,是在编译还是链接阶段.
如果编译时函数没有找到,那是头文件的问题,如果链接时未定义引用,那是c库的问题.
如果你的头文件都正常包含了,那可能你的c库没有使能semaphore的支持.
欢迎分享,转载请注明来源:夏雨云
评论列表(0条)