打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
ceph源码分析之读写操作流程(2)

上一篇介绍了ceph存储在上两层的消息逻辑,这一篇主要介绍一下读写操作在底两层的流程。下图是上一篇消息流程的一个总结。


ceph中,读写操作由于分布式存储的原因,故走了不同流程。

对于读操作而言:

1.客户端直接计算出存储数据所属于的主osd,直接给主osd上发送消息。

2.osd收到消息后,可以调用Filestore直接读取处在底层文件系统中的主pg里面的内容然后返回给客户端。具体调用函数在ReplicatedPG::do_osd_ops中实现。


读操作代码流程如图:


如我们之前说的,当确定读操作为主osd的消息时(CEPH_MSG_OSD_OP类型),会调用到ReplicatePG::do_osd_op函数,该函数对类型做进一步判断,当发现为读类型(CEPH_OSD_OP_READ)时,会调用FileStore中的函数对磁盘上数据进行读。

  1. int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)  
  2. {  
  3.     ……  
  4.     switch (op.op) {  
  5.         ……  
  6.         case CEPH_OSD_OP_READ:  
  7.             ++ctx->num_read;  
  8.         {  
  9.             // read into a buffer  
  10.             bufferlist bl;  
  11.             int r = osd->store->read(coll, soid, op.extent.offset, op.extent.length, bl);  
  12.             // 调用FileStore::read从底层文件系统读取  
  13.             ……  
  14.         }  
  15.         case CEPH_OSD_OP_WRITE:  
  16.             ++ctx->num_write;  
  17.         {  
  18.             ……//写操作只是做准备工作,并不实际的写  
  19.         }  
  20.         ……  
  21.     }  
  22. }  


FileStore::read函数是底层具体的实现,会通过调用系统函数如::open,::pread,::close等函数来完成具体的操作。

  1. int FileStore::read(  
  2.   coll_t cid,  
  3.   const ghobject_t& oid,  
  4.   uint64_t offset,  
  5.   size_t len,  
  6.   bufferlist& bl,  
  7.   bool allow_eio)  
  8. {  
  9.     ……  
  10.     int r = lfn_open(cid, oid, false, &fd);  
  11.     ……  
  12.     got = safe_pread(**fd, bptr.c_str(), len, offset);  
  13.     //FileStore::safe_pread中调用了::pread  
  14.     ……  
  15.     lfn_close(fd);  
  16.     ……  
  17. }  


而对于写操作而言,由于要保证数据写入的同步性就会复杂很多:

1.首先客户端会将数据发送给主osd

2.osd同样要先进行写操作预处理,完成后它要发送写消息给其他的从osd,让他们对副本pg进行更改,

3.osd通过FileJournal完成写操作到Journal中后发送消息告诉主osd说完成,进入5

4.当主osd收到所有的从osd完成写操作的消息后,会通过FileJournal完成自身的写操作到Journal中。完成后会通知客户端,已经完成了写操作。  

5.osd,从osd的线程开始工作调用FilestoreJournal中的数据写入到底层文件系统中。


在介绍写操作的流程前,需要先介绍一下ceph中的callback函数。

Context类定义在src/include文件中,该类是一个回调函数类的抽象类,继承它的类只要在子类实现它的finish函数,在finish函数调用自己需要回调的函数,就可以完成回调。

  1. class Context {  
  2.     Context(const Context& other);  
  3.     const Context& operator=(const Context& other);  
  4. protected:  
  5.     virtual void finish(int r) = 0;  
  6. public:  
  7.     Context() {}  
  8.     virtual ~Context() {}         
  9.     // we want a virtual destructor!!!  
  10.     virtual void complete(int r) {  
  11.     finish(r);  
  12.     delete this;  
  13.     }  
  14. };  


Finisher类是在src/common中定义的一个专门查看操作是否结束的一个类。在这个类里面拥有一个线程finisher_thread和一个类型为Context指针的队列finisher_queue。当一个操作线程完成自己的操作后,会将Context类型对象送入队列。此时finisher_thread线程循环监视着自己的finisher_queue队列,当发现了有新进入的Context时,会调用这个Context::complete函数,这个函数则会调用到Context子类自己实现的finish函数。来处理操作完成后的后续工作。

  1. class Finisher {  
  2.     CephContext *cct;  
  3.     ……  
  4.     vector<Context*> finisher_queue;  
  5.     ……  
  6.     void *finisher_thread_entry();  
  7.     struct FinisherThread : public Thread {  
  8.         Finisher *fin;      
  9.         FinisherThread(Finisher *f) : fin(f) {}  
  10.         void* entry() { return (void*)fin->finisher_thread_entry(); }  
  11.     } finisher_thread;  
  12.     ……  
  13. }  
  14.   
  15. void *Finisher::finisher_thread_entry()  
  16. {  
  17.     ……  
  18.     while(!finisher_stop){  
  19.         while(!finisher_queue.empty()){  
  20.             ……  
  21.             vector<Context*> ls  
  22.             ls.swap(finisher_queue);  
  23.             for (vector<Context*>::iterator p = ls.begin();  
  24.               p != ls.end();  
  25.               ++p) {  
  26.                 if (*p) {  
  27.                     //这里面调用Context子类实现的finish函数  
  28.                     (*p)->complete(0);  
  29.                 }  
  30.             }  
  31.         }  
  32.     }  
  33. }  


在写操作中涉及了多个线程和消息队列的协同工作,需要注意的是一个类拥有一个Finisher成员时,以为着它同时获得了一个队列和一个执行线程。

OSD中处理读写操作是线程池和消息队列(有很多,其他暂时不讨论):

  1. ThreadPool  op_tp;    
  2. ThreadPool::WorkQueueVal<pair<PGRef,OpRequestRef>, PGRef>  &op_wq;  

FileJournal中拥有的线程和消息队列:

  1. Write  write_thread;    
  2. deque<write_item> writeq;   

其父类Journal中拥有线程和消息队列(引用自之前说的JournalingObjectStore类中):

  1. Finisher        finisher_thread;  

FileStore中拥有的线程和消息队列:

  1. ThreadPool  op_tp;    
  2. OpWQ        op_wq;//Filestore中实现,继承自ThreadPool::WorkQueue<OpSequencer>    
  3. Finisher    ondisk_finisher;    
  4. Finisher    op_finisher;    

前一章说过在前两层,OSD根据不同的消息类型,选择了主OSD处理和从OSD的处理,以下介绍的写流程,就是在收到具体写操作以后,本地OSD开始的工作。

写的逻辑流程图如图:


从图中我们可以看到写操作分为以下几步:
1.OSD::op_tp线程从OSD::op_wq中拿出来操作如本文开始的图上描述,具体代码流是


    ReplicatePG::apply_repop中创建回调类C_OSD_OpCommitC_OSD_OpApplied

    FileStore::queue_transactions中创建了回调类C_JournaledAhead

2.FileJournal::write_thread线程从FileJournal::writeq中拿出来操作,主要就是写数据到具体的journal中,具体代码流:


3.Journal::Finisher.finisher_thread线程从Journal::Finisher.finish_queue中拿出来操作,通过调用C_JournalAhead留下的回调函数FileStore:_journaled_ahead,该线程开始工作两件事:首先入底层FileStore::op_wq通知开始写,再入FileStore::ondisk_finisher.finisher_queue通知可以返回。具体代码流:


4.FileStore::ondisk_finisher.finisher_thread线程从FileStore::ondisk_finisher.finisher_queue中拿出来操作,通过调用C_OSD_OpCommit留下来的回调函数ReplicatePG::op_commit,通知客户端写操作成功


5.FileStore::op_tp线程池从FileStore::op_wq中拿出操作(此处的OP_WQ继承了父类ThreadPool::WorkQueue重写了_process_process_finish等函数,所以不同于OSD::op_wq,它有自己的工作流程),首先调用FileStore::_do_op,完成后调用FileStore::_finish_op


6. FileStore::op_finisher.finisher_thread线程从FileStore::op_finisher.finisher_queue中拿出来操作,通过调用C_OSD_OpApplied留下来的回调函数ReplicatePG::op_applied,通知数据可读。


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
滴滴Ceph分布式存储系统优化之锁优化
ceph安装教程
OSD Config Reference
CRUSH算法介绍
ceph的CRUSH数据分布算法介绍 | Way Forever
ceph工作原理和安装
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服