打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
C语言封装disruptor

参考:http://www.oschina.net/p/disruptor
在理解了disruptor的原理之后,然后参考一个开源的disruptor for c(https://github.com/colding/disruptorC)
后写了一个disruptor库,通过该库可以更容易地使用这个框架。


考:http://www.oschina.net/p/disruptor在理解了disruptor的原理之后,然后参考一个开源的disruptor for c(https://github.com/colding/disruptorC)后写了一个disruptor库,通过该库可以更容易地使用这个框架。

头文件disruptor.h:

#ifndef DISRUPTOR_H#define DISRUPTOR_Hstruct disruptor;struct disruptor_config{    unsigned int reader_count;    unsigned int ele_size;    /* element count must be a power of 2 */    unsigned long ele_cnt;};struct disruptor*disruptor_create(struct disruptor_config* cfg);/* return reader id */unsigned intdisruptor_add_reader(struct disruptor* dis);voiddisruptor_del_reader(struct disruptor* dis, unsigned int reader_id);voiddisruptor_read(struct disruptor* dis, unsigned int reader_id, void* dst);voiddisruptor_write(struct disruptor* dis, void* src);voiddisruptor_destroy(struct disruptor* dis);#endif

源文件:

#include <stdlib.h>#include <stdio.h>#include <string.h>#include "disruptor.h"#define MAX_READER_COUNT 128#define INV_POS (unsigned long)-1struct disruptor{    unsigned long write_pos;    unsigned long slowest_read_pos;    unsigned long max_readable_pos;    unsigned long read_pos[MAX_READER_COUNT];    unsigned int reader_cnt;    char* data_buf;    unsigned int data_item_size;    /* data_item_count must be a power of 2 */    unsigned long data_item_count;    unsigned long mask;};struct disruptor*    disruptor_create(struct disruptor_config* cfg){    int i;    long size;    struct disruptor* dis;    if (cfg->reader_count > MAX_READER_COUNT)    {        return NULL;    }    dis = (struct disruptor*)malloc(sizeof(*dis));    if (NULL == dis)    {        return NULL;    }    memset(dis, 0, sizeof(*dis));    size = cfg->ele_cnt * cfg->ele_size;    dis->data_buf = (char*)malloc(size);    if (NULL == dis->data_buf)    {        goto err_exit;    }    dis->data_item_count = cfg->ele_cnt;    dis->mask = cfg->ele_cnt - 1;    dis->data_item_size = cfg->ele_size;    dis->reader_cnt = cfg->reader_count;    for (i = 0; i < dis->reader_cnt; i++)    {        dis->read_pos[i] = INV_POS;    }    return dis;err_exit:    if (dis->data_buf != NULL)    {        free(dis->data_buf);    }    if (dis != NULL)    {        free(dis);    }    return NULL;}/* return reader id */unsigned intdisruptor_add_reader(struct disruptor* dis){    int i;    unsigned long inv_pos = INV_POS;    do     {        for (i = 0; i < dis->reader_cnt; i++)        {            if (__atomic_compare_exchange_n(&dis->read_pos[i],              // type* ptr                &inv_pos,                                                   // type* expected                __atomic_load_n(&dis->slowest_read_pos, __ATOMIC_CONSUME),  // type* desired                1,                                                          // weak                __ATOMIC_RELEASE,                                           // success_memorder                __ATOMIC_RELAXED))                                          // failure_memorder            {                goto out;            }            inv_pos = INV_POS;        }    } while (1);out:    /* set init read_pos 1 if it's 0. */    if (0 == dis->read_pos[i])    {        __atomic_store_n(&dis->read_pos[i], 1, __ATOMIC_RELEASE);    }    return i;}voiddisruptor_del_reader(struct disruptor* dis, unsigned int reader_id){    __atomic_store_n(&dis->read_pos[reader_id], INV_POS, __ATOMIC_RELAXED);}voiddisruptor_read(struct disruptor* dis, unsigned int reader_id, void* dst){    unsigned long cur_pos;    cur_pos = dis->read_pos[reader_id];    while (cur_pos > __atomic_load_n(&dis->max_readable_pos, __ATOMIC_RELAXED))    {        usleep(1);    }    memcpy(dst, dis->data_buf + (cur_pos & dis->mask) * dis->data_item_size, dis->data_item_size);    cur_pos++;    __atomic_store_n(&dis->read_pos[reader_id], cur_pos, __ATOMIC_RELAXED);}unsigned longdisruptor_next_write_pos(struct disruptor* dis){    unsigned int i;    unsigned long slowest_read_pos;    unsigned long pos;    unsigned long next_write_pos;    next_write_pos = 1 + __atomic_fetch_add(&dis->write_pos, 1, __ATOMIC_RELAXED);    do     {        /* get the slowest reader */        slowest_read_pos = INV_POS;        for (i = 0; i < dis->reader_cnt; i++)        {            pos = __atomic_load_n(&dis->read_pos[i], __ATOMIC_RELAXED);            if (pos < slowest_read_pos)            {                slowest_read_pos = pos;            }        }        /* all readers exit, or no reader join in */        if (INV_POS == slowest_read_pos)        {            slowest_read_pos = next_write_pos - (dis->mask & next_write_pos);        }        if ((next_write_pos - slowest_read_pos) <= dis->mask)        {            return next_write_pos;        }        usleep(1);    } while (1);}voiddisruptor_write_done(struct disruptor* dis, unsigned long pos){    unsigned long max_readable_pos = pos - 1;    while (__atomic_load_n(&dis->max_readable_pos, __ATOMIC_RELAXED) != max_readable_pos)    {        usleep(1);    }    __atomic_fetch_add(&dis->max_readable_pos, 1, __ATOMIC_RELEASE);}voiddisruptor_write(struct disruptor* dis, void* src){    unsigned long pos;    pos = disruptor_next_write_pos(dis);    memcpy(dis->data_buf + (pos & dis->mask) * dis->data_item_size, src, dis->data_item_size);    disruptor_write_done(dis, pos);}voiddisruptor_destroy(struct disruptor* dis){    free(dis->data_buf);    free(dis);}

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Ext3文件系统读写过程分析
System.arraycopy源码分析 | 勇点
Linux2.6.30内核的sock结构(3)-1/2 - TCP-IP结构体 - 978...
linux内核sk_buff的结构分析 - 但行好事 莫问前程 - JavaEye技术网站
内核同步技术二
进程(二):进程创建
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服