刘素云网站脱孝怎样做,广州网站建设制作的公司,wordpress里的发消息给我,深圳网站建设专业的公司文章目录 1、抛砖引玉2、内核无锁队列kfifo2.1 kfifo结构2.2 kfifo分配内存2.3 kfifo初始化2.4 kfifo释放2.5 kfifo入队列2.6 kfifo出队列2.7 kfifo的判空和判满2.8 关于内存屏障 1、抛砖引玉
昨天遇到这样一个问题#xff0c;有多个生产者#xff0c;多个消费者#xff0c… 文章目录 1、抛砖引玉2、内核无锁队列kfifo2.1 kfifo结构2.2 kfifo分配内存2.3 kfifo初始化2.4 kfifo释放2.5 kfifo入队列2.6 kfifo出队列2.7 kfifo的判空和判满2.8 关于内存屏障 1、抛砖引玉
昨天遇到这样一个问题有多个生产者多个消费者一个公共的消息队列生产者向消息队列中写数据消费者从消息队列中读数据因为消息队列是临界资源因此需要加锁 这样做的话锁竞争太严重必定会影响效率有没有一种办法消费者在从消息队列中读取数据时不需要加锁
当然有就是为每个消费者都建立一个自己的消息队列生产者共用一个消息队列。生产者互斥的向消息队列中写数据负载均衡器将数据分发到每个消费者的消息队列中消息消费者再从自己的消息队列中读取数据这样就形成了单读单写这样的消息队列有一个名字叫ringbuffer(环形缓冲区)适用于单生产者单消费者的场景虽然是两个线程但是却不用加锁可以用数组或者链表实现 有人可能会疑问消费者自己的消息队列(ringbuffer)也是临界资源也会被消费者和负载均衡器共同访问难道不需要加锁控制吗
其实在RingBuffer中设置了两个指针head和tail。head指向下一次读的位置tail指向的是下一次写的位置。RingBuffer可用一个数组进行存储。 在进行读操作的时候我们只修改head的值而在写操作的时候我们只修改tail的值。在写操作时我们在写入内容到buffer之后才修改tail的值而在进行读操作的时候我们会读取tail的值并将其赋值给copyTail。赋值操作是原子操作。所以在读到copyTail之后从head到copyTail之间一定是有数据可以读的不会出现数据没有写入就进行读操作的情况。同样的读操作完成之后才会修改head的数值而在写操作之前会读取head的值来判断是否有空间可以用来写数据。所以这时候tail到head-1之间一定是有空间可以写数据的而不会出现一个位置的数据还没有读出就被写操作覆盖的情况。这样就保证了RingBuffer的线程安全性。
理论证明在一个生产者和一个消费者的情况下两者之间的同步无需加锁即可并发访问。
2、内核无锁队列kfifo
在Linux当中单生产者单消费者的应用场景有很多例如每个socket都对应着一个接受缓冲区和发送缓冲区上层应用向发送缓冲区写数据再将数据拷贝到网卡发送给对方接受缓冲区则相反。又比如一个进程A产生数据发给另外一个进程B进程B需要对进程A传的数据进行处理并写入文件如果B没有处理完则A要延迟发送。为了保证进程A减少等待时间可以在A和B之间采用一个缓冲区A每次将数据存放在缓冲区中B每次冲缓冲区中取。
因此Linux中有自己的ringbuffer不过它的名字叫kfifokfifo设计的非常巧妙代码很精简以下为kfifo的相关源码内核版本为4.9.145
2.1 kfifo结构
struct __kfifo {unsigned int in; //数据到来时存放数据的位置unsigned int out; //读取数据的位置unsigned int mask; //mask1表示缓冲区data的容量能容纳多少个元素unsigned int esize; //每个元素的大小void *data; //缓冲区的起始位置
};请注意这里的 inout 均是无符号的整数类型。
2.2 kfifo分配内存
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,size_t esize, gfp_t gfp_mask)
{/** round down to the next power of 2, since our let the indices* wrap technique works only in this case.*/size roundup_pow_of_two(size);fifo-in 0;fifo-out 0;fifo-esize esize;if (size 2) {fifo-data NULL;fifo-mask 0;return -EINVAL;}fifo-data kmalloc(size * esize, gfp_mask);if (!fifo-data) {fifo-mask 0;return -ENOMEM;}fifo-mask size - 1;return 0;
}在为kfifo分配内存之前需要检测传入的size是否为2的整数次幂roundup_pow_of_two用于计算最接近且大于等于n的2的整数次幂它的定义如下
#define roundup_pow_of_two(n) \
( \__builtin_constant_p(n) ? ( \(n 1) ? 1 : \(1UL (ilog2((n) - 1) 1)) \) : \__roundup_pow_of_two(n) \)它可以等价为以下代码方便理解
unsigned int roundup_pow_of_two(unsigned int n)
{if(n 1) return 1;int i 0;for(; n ! 0; i){n 1;}return 1U i;
}假设n为5二进制位0101在for循环内需要循环3次n向右移3位才为0
最后1向左移3为二进制为1000十进制为8为2的整数次幂并且离5最近且大于5为什么要这么做呢因为kfifo是环形队列它的可读可写位置必然会回到初始位置因此就需要用到取余操作那么 m%n 在CPU看来就等价于 m-n*floor(m/n)其中乘法最终是通过加法和移位操作完成的而除法首先转变为乘法减法又会通过补码转变为加法因此效率就比较低。
但是如果缓冲区的长度为2的整数次幂m%n m (n - 1)只有减法和位运算效率就提高了所以才会将缓冲区的长度设置为2的整数次幂并且将 mask 设置为 size(容量) - 1方便后续进行位运算
计算完size后接着将 in/out 指向的位置初始化为0因为此刻队列还未准备好里面并没有任何数据。
esize 赋值给 fifo-esize 这个是代表了队列中数据的类型的 size比如队列数据类型如果为 int则 esize 等于 4队列数据类型为char则 esize 等于 1
接着调用 kmalloc_array 接口分配一个 esize * size 大小的空间作为缓冲区
最后将 fifo-mask 赋值为 size - 1
分配好队列后实际情况如下所示 2.3 kfifo初始化
这里跟kfifo分配内存有点不同kfifo初始化是使用自己定义的buffer不在需要调用 kmalloc_array 接口申请空间了
int __kfifo_init(struct __kfifo *fifo, void *buffer,unsigned int size, size_t esize)
{size / esize;size roundup_pow_of_two(size);fifo-in 0;fifo-out 0;fifo-esize esize;fifo-data buffer;if (size 2) {fifo-mask 0;return -EINVAL;}fifo-mask size - 1;return 0;
}这里依旧需要检测传入的size是否为2的整数次幂roundup_pow_of_two用于计算最接近且大于等于n的2的整数次幂其他部分都大致类似
2.4 kfifo释放
void __kfifo_free(struct __kfifo *fifo)
{kfree(fifo-data);fifo-in 0;fifo-out 0;fifo-esize 0;fifo-data NULL;fifo-mask 0;
}释放kfifo比较简单直接释放data缓冲区将所有的数据都置为0即可
2.5 kfifo入队列
static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{return (fifo-mask 1) - (fifo-in - fifo-out);
}static void kfifo_copy_in(struct __kfifo *fifo, const void *src,unsigned int len, unsigned int off)
{unsigned int size fifo-mask 1;unsigned int esize fifo-esize;unsigned int l;off fifo-mask;if (esize ! 1) {off * esize;size * esize;len * esize;}l min(len, size - off);memcpy(fifo-data off, src, l);memcpy(fifo-data, src l, len - l);/** make sure that the data in the fifo is up to date before* incrementing the fifo-in index counter*/smp_wmb();
}unsigned int __kfifo_in(struct __kfifo *fifo,const void *buf, unsigned int len)
{unsigned int l;l kfifo_unused(fifo);if (len l)len l;kfifo_copy_in(fifo, buf, len, fifo-in);fifo-in len;return len;
}数据入队列时调用了unsigned int __kfifo_in(struct __kfifo *fifo, const void *buf, unsigned int len)
在函数内部首先调用了kfifo_unused来判断当前还剩多少空间可以使用
前面我们已经提到过fifo-mask 在初始化的时候被赋值成为 size - 1 所以这里 (fifo-mask 1) 就等于申请的时候的 size 值。size 的值代表着总的存储对象的个数而每次在推数据进入 fifo 的时候in 都会增加取出数据的时候out 都会增加。所以计算当前 fifo 中还剩余多少空间就使用了
(fifo-mask 1) - (fifo-in - fifo-out) 注意这里的 in/out 是不断增加的无符号整形
接着会调用函数 static void kfifo_copy_in(struct __kfifo *fifo, const void *src, unsigned int len, unsigned int off)
首先还是通过 fifo-mask 得到了整个 size 的大小。然后是用
off fifo-mask;
展开就是
fifo-in fifo-in fifo-mask;
因为fifo-in是一直在增加的但是缓冲区的容量是不变的所以在写入数据之前就需要找到具体在哪个位置写也就是需要知道在缓冲区中in的偏移量因此就需要取余操作但是呢前面申请的空间已经是2的整数次幂因此这里的位运算就替代了取余操作提高了运算效率
接着判断 esize 的值就是每个元素的占用内存的情况如果不是 1 的话一个字节则需要对 offsizelen 分别乘以 esize。因为在使用 memcpy 时需要以1个字节为单位进行数据拷贝。
举个例子 接着使用
l min(len, size - off);
取得复制数据的长度 len 和 size-off(末尾的剩余空间) 之间的最小值由是环形的缓冲区 所以在此处存在两种情况
① 复制数据的长度len要小于size-off(末尾的剩余空间) ② 复制数据的长度len要大于size-off(末尾的剩余空间) 所以在这个地方先去取一个 len 和 size-off 之间最小的那个值 l即先打算尝试把尾巴上能用的空间先用完如果不够再去用从头部开始的剩余空间这两个 memcpy 用得十分巧妙不需要做额外的判断
针对第一种情况(复制数据的长度len要小于size-off(末尾的剩余空间))
第一条 memcpy将 len 的数据 memcpy 到以 fifo-data 之前用过 kmalloc 分配的内存起始地址加上 off 偏移in 对应的偏移的地方开始copy 进 src 数据。
第二条 memcpylen -l 等于0memcpy 什么都不会做
针对第二种情况(复制数据的长度len要大于size-off(末尾的剩余空间))
第一条 memcpy和前一种情况一样
第二条 memcpylen -l 大于0将剩余的数据拷贝到fifo-data的头部
最终整个环形缓冲区的数据拷贝完成
最后在退出 kfifo_copy_in 后在 __kfifo_in 函数中对 fifo-in 做累加
fifo-in len
做完上述的拷贝后对于上述两种情况最后体现出来的是
① 复制数据的长度len要小于等于size-off(末尾的剩余空间) ② 复制数据的长度len要大于size-off(末尾的剩余空间) 前面谈到的入队列都是out 在前in在后假设in在前out在后呢
复制数据的长度len要小于size-off(in和out之间的剩余空间) 第一条 memcpy将 len 的数据 memcpy 到以 fifo-data加上 off 偏移in 对应的偏移的地方开始copy 进 src 数据。
第二条 memcpylen -l 等于0memcpy 什么都不会做 复制数据的长度len要大于size-off(in和out之间的剩余空间) 第一条 memcpy将 len 的数据 memcpy 到以 fifo-data加上 off 偏移in 对应的偏移的地方开始copy 进 src 数据。
第二条 memcpylen -l 等于0memcpy 什么都不会做 到现在有人可能会有疑问为什么在拷贝数据时为什么没有判断缓冲区是否满了呢
其实这里调用了kfifo_unused函数计算剩余空间如果剩余空间为0虽然依旧会进入kfifo_copy_in函数l min(len, size - off)但这里是取了剩余空间和需要拷贝数据的最小值即为0两个memcpy什么都不会做
注意如果拷贝的数据量大于剩余空间会用数据将剩余的空间填充满返回值就是拷贝了多少字节的数据
2.6 kfifo出队列
出队列和入队列的逻辑是差不多的这里就不多赘述了
调用顺序是__kfifo_out–》__kfifo_out_peek–》kfifo_copy_out
static void kfifo_copy_out(struct __kfifo *fifo, void *dst,unsigned int len, unsigned int off)
{unsigned int size fifo-mask 1;unsigned int esize fifo-esize;unsigned int l;off fifo-mask;if (esize ! 1) {off * esize;size * esize;len * esize;}l min(len, size - off);memcpy(dst, fifo-data off, l);memcpy(dst l, fifo-data, len - l);/** make sure that the data is copied before* incrementing the fifo-out index counter*/smp_wmb();
}unsigned int __kfifo_out_peek(struct __kfifo *fifo,void *buf, unsigned int len)
{unsigned int l;l fifo-in - fifo-out;if (len l)len l;kfifo_copy_out(fifo, buf, len, fifo-out);return len;
}
EXPORT_SYMBOL(__kfifo_out_peek);unsigned int __kfifo_out(struct __kfifo *fifo,void *buf, unsigned int len)
{len __kfifo_out_peek(fifo, buf, len);fifo-out len;return len;
}2.7 kfifo的判空和判满
源代码中并没有判断空和判断满的函数但是对于入队列时有一个计算剩余空间的函数前面也提到过
static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{return (fifo-mask 1) - (fifo-in - fifo-out);
}它的判满主要是看in - out 的值是等于 mask (size - 1)
对于出队列时在__kfifo_out_peek 函数内有l fifo-in - fifo-out当in和out相等时就表示空的也就是empty
关于in/out溢出问题 kfifo中的in和out只会一直增加因为它俩是无符号整数因此最终就会回到0即使到in出现溢出在out之前in-out的值仍然为无符号整数依旧能表示已经使用的buffer的长度这点无需担心。这正是这个机制的精妙之处。
2.8 关于内存屏障
尽管单消费者和单生产者能够对kfifo的进行无锁并发访问但是在源码中入队列和出队列依旧使用了smp_wmb()也就是内存屏障
编译器编译源代码时会将源代码进行优化将源代码的指令进行重排序以适合于CPU的并行执行。然而内核同步必须避免指令重新排序优化屏障避免编译器的重排序优化操作保证编译程序时在优化屏障之前的指令不会在优化屏障之后执行。
软件可通过读写屏障强制内存访问次序。读写屏障像一堵墙所有在设置读写屏障之前发起的内存访问必须先于在设置屏障之后发起的内存访问之前完成确保内存访问按程序的顺序完成。Linux内核提供的内存屏障API函数说明如下表。内存屏障可用于多处理器和单处理器系统如果仅用于多处理器系统就使用smp_xxx函数在单处理器系统上它们什么都不要。
内存屏障含义smp_rmb适用于多处理器的读内存屏障smp_wmb适用于多处理器的写内存屏障smp_mb适用于多处理器的内存屏障
所以在 kfifo_copy_in 和 kfifo_copy_out 的尾部都插入了 smp_wmb() 的写内存屏障的代码
它的作用是确保 fifo-in 和 fifo-out 增加 len 的这个操作在内存屏障之后也就是保证了在 SMP 多处理器下一定是先完成了 fifo 的内存操作然后再进行变量的增加。以免被优化后的混乱访问导致策略失败
不过多个消费者、生产者的并发访问还是需要加锁限制
最后再提一句pthread_mutex中不包含内存屏障而spin_lock中包含内存屏障