一、简介
以前我发了一篇文章,拷贝版本循环buffer,是 C 语言实现的循环 buffer,那个版本是拷贝读取数据的,即每次读取数据都要使用 memcpy 将数据拷贝到用户缓存地址,这种拷贝在数据量大的时候比较耗费性能,在性能要求比较高的场景下就不适用了,所以我在拷贝的基础上增加免拷贝的功能,即将要读取的地址返回到用户,用户操作完,刷新读数据指针。
👀
二、设计
循环 buffer,拷贝与免拷贝版本实现原理都是一样的,只是返回数据的方式不同,若不清楚,参考上一篇文章。
正常情况下,比如用户要读取256字节数据(绿色区域),空间中数据足够,直接返回读指针位置的内存地址给用户便可以使用,如下图。
当读指针右边的数据不够时候,即我们要读取256字节(红色区域),此时若直接将读指针位置内存返回给用户,用户是访问不了256字节数据的,因为这是不连续的。
针对上述问题,采用预留空间解决。最开始创建循环buffer时候便预留256字节空间,这个空间留作处理边界情况。
将边界情况的数据拷贝到预留空间。将图中黄色部分128字节拷贝到预留空间。
返回指针。此时我们可以返回粗箭头所指位置内存地址,这样保证用户访问连续256字节数据。
代码设计实现
定义的循环buffer结构体如下:
typedef struct ringbuffer_t{ uint8_t *buffer; //缓冲区 int buffer_size; //缓冲区大小 int reserve_size; //预留空间大小 volatile int rpos; //读指针 volatile int wpos; //写指针 int (*rb_write)(struct ringbuffer_t *rb, uint8_t *buffer, int len);//写数据函数指针 uint8_t* (*rb_read)(struct ringbuffer_t *rb, int len); //读数据函数指针 void (*rb_refreash)(struct ringbuffer_t *rb, int len); //刷新读指针位置 void (*rb_free)(struct ringbuffer_t *rb); //释放内存}ringbuffer_t;
对应函数实现:
#define _min(x, y) ((x) < (y)) ? (x) : (y)
#define smp_mb() __asm__ __volatile__ ('' : : : 'memory') //内存屏障
//往循环buffer写数据static int ringbuffer_write(struct ringbuffer_t *rb, uint8_t *buffer, int len){ int space_len=0, left_len=0;
if(!rb || !buffer || len>rb->reserve_size) return -1;
space_len = ((rb->rpos - rb->wpos)+rb->buffer_size)%rb->buffer_size; //计算长度 if(rb->rpos==rb->wpos) space_len=rb->buffer_size; if(space_len <= len) return -1; //避免读写指针相遇 left_len = rb->buffer_size - rb->wpos; //右边的长度
left_len = _min(len, left_len); //求出最小值
memcpy(rb->buffer+rb->wpos, buffer, left_len); //若是最小值等于len则不执行下面的拷贝 memcpy(rb->buffer, buffer+left_len, len-left_len);//过边界拷贝,将剩下的拷贝到最前面
smp_mb(); rb->wpos = (rb->wpos+len)%rb->buffer_size;//取余
return len;}
//从循环buffer读数据,返回指针static uint8_t* ringbuffer_read(struct ringbuffer_t *rb, int len){ int space_len=0, left_len=0;
if(!rb || len>rb->reserve_size) return NULL; space_len = ((rb->wpos - rb->rpos)+rb->buffer_size)%rb->buffer_size; //求出当前可读取的空间长度 if(space_len <= len) return NULL; //判断加上等于条件为了避免读写指针相遇
left_len = rb->buffer_size - rb->rpos;//求出右边可读长度
if(left_lenbuffer-left_len, rb->buffer+rb->rpos, left_len); return rb->buffer-left_len; }
return rb->buffer + rb->rpos;}
//用户读完数据,用了以后,调用刷新接口刷新读指针static void ringbuffer_refreash(struct ringbuffer_t *rb, int len){ if(!rb || len>rb->reserve_size) return; rb->rpos = (rb->rpos+len)%rb->buffer_size; //刷新指针位置}
//是否内存static void ringbuffer_free(struct ringbuffer_t *rb){ uint8_t* buffer=NULL; if(!rb) return; buffer = rb->buffer-rb->reserve_size; //将指针偏移空间起始地址 free(buffer); free(rb);}
//创建循环bufferringbuffer_t* create_ringbuffer(int size, int reserve_size){ ringbuffer_t* rb=NULL; uint8_t* allbuffer=NULL;
if(size < 0 || reserve_size<0) return NULL; rb = (ringbuffer_t*)malloc(sizeof(ringbuffer_t)); allbuffer = (uint8_t*)malloc(size+reserve_size); rb->buffer = allbuffer+reserve_size; //偏移预留大小 rb->buffer_size = size; rb->reserve_size = reserve_size; rb->rpos = 0; rb->wpos = 0; rb->rb_write = ringbuffer_write; rb->rb_read = ringbuffer_read; rb->rb_refreash = ringbuffer_refreash; rb->rb_free = ringbuffer_free;
return rb;}
👀
三、测试、
使用随机生成一个500M的源数据文件,开启两个线程,一个线程从数据文件中随机长度读取数据,然后往循环buffer里面写数据,一个线程从循环buffer里面随机长度读取出数据,然后写入文件,最后使用Beyond Compare软件对两个文件进行比较,查看数据是否相同。 1. 线程1 从源文件中读数据写入循环buffer:
void* write_data_task(void* args) //线程1 往循环buffer中写数据{ FILE *fp = fopen('../src.dat', 'rb'); if(!fp){ printf('open src.dat error!\n'); is_runing=0; return NULL; } uint8_t buf[1024]={0}; ringbuffer_t *rb = (ringbuffer_t*)args; //获取传入进来的循环buffer参数 int ret = -1; int data_len = 0;
while(!feof(fp)) {
data_len = 512+rand()%512; //获取随机长度写入循环buffer fread(buf, data_len, 1, fp); //根据长度从文件中读出原始数据写入循环buffer do{ ret = rb->rb_write(rb, buf, data_len); //往循环buffer中写数据 }while(ret == -1); //阻塞等待写入成功 } is_runing=0;
fclose(fp); return NULL;}
1. 线程2 从循环buffer中读取数据写入另外一个文件
void* read_data_task(void* args){ #if ENABLE_WRITE_OUT_FILE FILE *fp = fopen('./data/out.dat', 'wb');
if(!fp){ printf('Open out.dat error! \n'); is_runing=0; return NULL; } #endif
ringbuffer_t *rb = (ringbuffer_t*)args; //获取传入进来的循环buffer参数 uint8_t *buf=NULL; long start_time = get_sys_time(); //获取系统时间 int data_len=0; while (is_runing) { data_len = 512+rand()%512; //获取随机长度从循环buffer中读取数据 do{ buf = rb->rb_read(rb, data_len); //从循环buffer中读数据 }while(buf==NULL); //阻塞等待读取数据成功 #if ENABLE_WRITE_OUT_FILE fwrite(buf, data_len, 1, fp); //将从循环buffer中读取的数据写入文件 #endif rb->rb_refreash(rb, data_len); //刷新指针 } long end_time = get_sys_time(); //获取系统时间 int use_time = end_time-start_time; double use_s = ((double)use_time/1000000.0); double rate = ((500*1024*1024*8.0)/use_s)/(1024*1024*1024.0); printf('500M Data Use time=%dus(%.2lfS) rate=%.2lfGbps\n', use_time, use_s, rate);
#if ENABLE_WRITE_OUT_FILE fclose(fp); #endif return NULL;}
main函数:
int main(void){ pthread_t th[2]; ringbuffer_t* rb = create_ringbuffer(50*1024, 2048); srand(time(NULL)); //初始化随机数 is_runing = 1; pthread_create(&th[0], NULL, write_data_task, rb); pthread_create(&th[1], NULL, read_data_task, rb);
pthread_join(th[0], NULL); pthread_join(th[1], NULL); printf('Finish Test Ringbuffer...\n');
return 0;}测试结果:
功能:从循环buffer中读取出的数据写入out.dat(500M), 该文件与源数据文件src.dat完全一致,除了末尾一点没有写完,这是因为循环buffer针对处理流式数据,只要数据流一直来,数据就没有末尾这种情况,即读写的数据就完全一致。500M数据使用工具比对情况如下。
运行速率:相对于拷贝读写版本提高了近10倍,不同电脑有所差异。
👀
四、总结
1.注意使用时候,用户读完,使用完数据要调用刷新接口函数将读指针刷新。本例中使用数据便是写入文件,将读出来的数据写入文件,便可以调用refreash函数刷新读指针了。
2.创建循环buffer需要填写两个参数,第一个是循环buffer空间大小,第二个是预留空间大小,第二个参数填写参考值是一次最大的读取数据长度,预留空间小了会出问题。
ringbuffer_t* create_ringbuffer(int size, int reserve_size)
3.本例中没有使用锁,但是依然使用多线程读写,只是因为没有多线程对同一个临界值同时操作,可以不用加锁,数据依然正常,只是读写指针使用volatile修饰,避免被编译器优化。
拷贝版本与免拷贝版本完整代码已经放到github上面,欢迎自取:https://github.com/young-1-code/data_structure.git,取代码时候点一个star吧~
联系客服