Ceph源码分析
上QQ阅读APP看书,第一时间看更新

第2章
Ceph通用模块

本章介绍Ceph源代码通用库中的一些比较关键而又比较复杂的数据结构。Object和Buffer相关的数据结构是普遍使用的。线程池ThreadPool可以提高消息处理的并发能力。Finisher提供了异步操作时来执行回调函数。Throttle在系统的各个模块各个环节都可以看到,它用来限制系统的请求,避免瞬时大量突发请求对系统的冲击。SafteTimer提供了定时器,为超时和定时任务等提供了相应的机制。理解这些数据结构,能够更好理解后面章节的相关内容。

2.1 Object

对象Object是默认为4MB大小的数据块。一个对象就对应本地文件系统中的一个文件。在代码实现中,有object、sobject、hobject、ghobject等不同的类。

结构object_t对应本地文件系统的一个文件,name就是对象名:

        struct object_t {
          string name;
          ......
        }

sobject_t在object_t之上增加了snapshot信息,用于标识是否是快照对象。数据成员snap为快照对象的对应的快照序号。如果一个对象不是快照对象(也就是head对象),那么snap字段就被设置为CEPH_NOSNAP值。

        struct sobject_t {
          object_t oid;
          snapid_t snap;
          ......
        }

hobject_t是名字应该是hash object的缩写。

        struct hobject_t {
          object_t oid;
          snapid_t snap;
        private:
          uint32_t hash;
          bool max;
          uint32_t nibblewise_key_cache;
          uint32_t hash_reverse_bits;
          ……
        public:
          int64_t pool;
          string  nspace;
        private:
          string key;
          ......
        }

其在sobject_t的基础上增加了一些字段:

❑ int64_t pool:所在的pool的id。

❑ string nspace:nspace一般为空,它用于标识特殊的对象。

❑ string key:对象的特殊标记。

❑ string hash:hash和key不能同时设置,hash值一般设置为就是pg的id值。

ghobject_t在对象hobject_t的基础上,添加了generation字段和shard_id字段,这个用于ErasureCode模式下的PG:

❑ shard_id用于标识对象所在的osd在EC类型的PG中的序号,对应EC来说,每个osd在PG中的序号在数据恢复时非常关键。如果是Replicate类型的PG,那么字段就设置为NO_SHARD(-1),该字段对于replicate是没用。

❑ generation用于记录对象的版本号。当PG为EC时,写操作需要区分写前后两个版本的object,写操作保存对象的上一个版本(generation)的对象,当EC写失败时,可以rollback到上一个版本。

        struct ghobject_t {
          hobject_t hobj;
          gen_t generation;
          shard_id_t shard_id;
          bool max;
        public:
          static const gen_t NO_GEN = UINT64_MAX;
          ......
        }

2.2 Buffer

Buffer就是一个命名空间,在这个命名空间下定义了Buffer相关的数据结构,这些数据结构在Ceph的源代码中广泛使用。下面介绍的buffer::raw类是基础类,其子类完成了Buffer数据空间的分配,buffer::ptr类实现了Buffer内部的一段数据,buffer::list封装了多个数据段。

2.2.1 buffer::raw

类buffer::raw是一个原始的数据Buffer,在其基础之上添加了长度、引用计数和额外的crc校验信息,结构如下:

        class buffer::raw {
          public:
            char *data;      //数据指针
            unsigned len;    //数据长度
            atomic_t nref;   //引用计数
            mutable RWLock crc_lock;   //读写锁,保护crc_map
            map<pair<size_t, size_t>, pair<uint32_t, uint32_t> > crc_map;
            //crc校验信息,第一个pair为数据段的起始和结束(from, to),第二个pair是crc32校验
              码,pair的第一字段为base crc32校验码,第二个字段为加上数据段后计算出的crc32校验码。
        ……
        }

下列类都继承了buffer::raw,实现了data对应内存空间的申请:

❑ 类raw_malloc实现了用malloc函数分配内存空间的功能。

❑ 类class buffer::raw_mmap_pages实现了通过mmap来把内存匿名映射到进程的地址空间。

❑ 类class buffer::raw_posix_aligned调用了函数posix_memalign来申请内存地址对齐的内存空间。

❑ 类class buffer::raw_hack_aligned是在系统不支持内存对齐申请的情况下自己实现了内存地址的对齐。

❑ 类class buffer::raw_pipe实现了pipe做为Buffer的内存空间。

❑ 类class buffer::raw_char使用了C++的new操作符来申请内存空间。

2.2.2 buffer::ptr

类buffer::ptr就是对于buffer::raw的一个部分数据段。结构如下:

        class CEPH_BUFFER_API ptr {
          raw *_raw;
          unsigned _off, _len;
          ……
        }

ptr是raw里的一个任意的数据段,_off是在_raw里的偏移量,_len是ptr的长度。raw和ptr的示意图如图2-1所示。

图2-1 raw和ptr示意图

2.2.3 buffer::list

类buffer::list是一个使用广泛的类,它是多个buffer::ptr的列表,也就是多个内存数据段的列表。结构如下:

    class CEPH_BUFFER_API list {
      std::list<ptr> _buffers; //所有的ptr
      unsigned _len;            //所有的ptr的数据总长度
      unsigned _memcopy_count; //当调用函数rebuild用来内存对齐时,需要内存拷贝的数据量
      ptr append_buffer;       //当有小的数据就添加到这个buffer里
      mutable iterator last_p; //访问list的迭代器
      ……
    }

buffer::list的重要的操作如下所示。

❑ 添加一个ptr到list的头部:

      void push_front(ptr& bp) {
        if (bp.length() == 0)
          return;
        _buffers.push_front(bp);
        _len += bp.length();
      }

❑ 添加一个raw到list头部中,先构造一个ptr,后添加list中:

      void push_front(raw *r) {
        ptr bp(r);
        push_front(bp);
      }

❑ 判断内存是否以参数align对齐,每一个ptr都必须以align对齐:

      bool buffer::list::is_aligned(unsigned align) const
      {
        for (std::list<ptr>::const_iterator it = _buffers.begin();
          it ! = _buffers.end();
          ++it)
            if (! it->is_aligned(align))
          return false;
          return true;
      }

❑ 添加一个字符到list中,先查看append_buffer是否有足够的空间,如果没有,就新申请一个4KB大小的空间:

      void buffer::list::append(char c)
      {
        // 检查当前的append_buffer是否有足够的空间
        unsigned gap = append_buffer.unused_tail_length();
            if (! gap) {
              // 如果没有空间,就申请一个append_buffer!
            append_buffer = create_aligned(CEPH_BUFFER_APPEND_SIZE,
                                            CEPH_BUFFER_APPEND_SIZE);
              append_buffer.set_length(0);   //到目前为止,没有用到
            }
              append(append_buffer, append_buffer.append(c) - 1, 1);
              // 把该数据段添加到append_buffer中
          }

❑ 内存对齐:有些情况下,需要内存地址对齐,例如当以directIO方式写入数据至磁盘时,需要内存地址按内存页面大小(page)对齐,也即buffer::list的内存地址都需按page对齐。函数rebuild用来完成对齐的功能。其实现的方法也比较简单,检查没有对齐的ptr,申请一块新对齐的内存,把数据拷贝过去,释放内存空间就可以了。

❑ buffer::list还集成了其他额外的一些功能:

● 把数据写入文件或从文件读取数据的功能。

● 计算数据的crc32校验。

2.3 线程池

线程池(ThreadPool)在分布式存储系统的实现中是必不可少的,在Ceph的代码中广泛用到。Ceph中线程池的实现也比较复杂,结构如下:

        class ThreadPool : public md_config_obs_t {
          CephContext *cct;
          string name;          //线程池的名字
          string lockname;     //锁的名字
          Mutex _lock;          //线程互斥的锁,也是工作队列访问互斥的锁
          Cond _cond;           //锁对应的条件变量
          bool _stop;           //线程池是否停止的标志
          int _pause;           //暂时中止线程池的标志
          int _draining;
          Cond _wait_cond;
          int ioprio_class, ioprio_priority;
          vector<WorkQueue_*> work_queues;     //工作队列
          int last_work_queue;                  //最后访问的工作队列
          set<WorkThread*> _threads;            //线程池中的工作线程
          list<WorkThread*> _old_threads;      //等待进joined操作的线程
          int processing;
        }

类ThreadPool里包函一些比较重要的数据成员:

❑ 工作线程集合_threads。

❑ 等待Join操作的旧线程集合_old_threads。

❑ 工作队列集合,保存所有要处理的任务。一般情况下,一个工作队列对应一个类型的处理任务,一个线程池对应一个工作队列,专门用于处理该类型的任务。如果是后台任务,又不紧急,就可以将多个工作队列放置到一个线程池里,该线程池可以处理不同类型的任务。

线程池的实现主要包括:线程池的启动过程,线程池对应的工作队列的管理,线程池对应的执行函数如何执行任务。下面分别介绍这些实现,然后介绍一些Ceph线程池实现的超时检查功能,最后介绍ShardedThreadpool的实现原理。

2.3.1 线程池的启动

函数ThreadPool::start() 用来启动线程池,其在加锁的情况下,调用函数start_threads,该函数检查当前线程数,如果小于配置的线程池,就创建新的工作线程。

2.3.2 工作队列

工作队列(WorkQueue)定义了线程池要处理的任务。任务类型在模板参数中指定。在构造函数里,就把自己加入到线程池的工作队列集合中:

        template<class T>
        class WorkQueue : public WorkQueue_ {
          ThreadPool *pool;
          WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_
                    (n, ti, sti), pool(p) {
            pool->add_work_queue(this);
          }
          ……
        }

WorkQueue实现了一部分功能:进队列和出队列,以及加锁,并用通过条件变量通知相应的处理线程:

        bool queue(T *item) {
          pool->_lock.Lock();
          bool r = _enqueue(item);
          pool->_cond.SignalOne();
          pool->_lock.Unlock();
          return r;
          }
          void dequeue(T *item) {
            pool->_lock.Lock();
            _dequeue(item);
            pool->_lock.Unlock();
          }
          void clear() {
            pool->_lock.Lock();
            _clear();
            pool->_lock.Unlock();
        }

还有一部分功能,需要使用者自己定义。需要自己定义实现保存任务的容器,添加和删除的方法,以及如何处理任务的方法:

        virtual bool _enqueue(T *) = 0;
            //从提交的任务中去除一个项
        virtual void _dequeue(T *) = 0;
            //去除一个项并返回原始指针
        virtual T *_dequeue() = 0;
        virtual void _process(T *t) { assert(0); }
        virtual void _process(T *t, TPHandle &) {
              _process(t);

2.3.3 线程池的执行函数

函数worker为线程池的执行函数:

        void ThreadPool::worker(WorkThread *wt)

其处理过程如下:

1)首先检查_stop标志,确保线程池没有关闭。

2)调用函数join_old_threads把旧的工作线程释放掉。检查如果线程数量大于配置的数量_num_threads,就把当前线程从线程集合中删除,并加入_old_threads队列中,并退出循环。

3)如果线程池没有暂时中止,并且work_queues不为空,就从last_work_queue开始,遍历每一个工作队列,如果工作队列不为空,就取出一个item,调用工作队列的处理函数做处理。

2.3.4 超时检查

TPHandle是一个有意思的事情。每次线程函数执行时,都会设置一个grace超时时间,当线程执行超过该时间,就认为是unhealthy的状态。当执行时间超过suicide_grace时,OSD就会产生断言而导致自杀,代码如下:

        struct heartbeat_handle_d {
          const std::string name;
          atomic_t timeout, suicide_timeout;
          time_t grace, suicide_grace;
          std::list<heartbeat_handle_d*>::iterator list_item;
        }
        class TPHandle {
          friend class ThreadPool;
          CephContext *cct;
          heartbeat_handle_d *hb;   //心跳
          time_t grace;              //超时
          time_t suicide_grace;     //自杀的超时时间
        }

结构heartbeat_handle_d记录了相关信息,并把该结构添加到HeartbeatMap的系统链表中保存。OSD会有一个定时器,定时检查是否超时。

2.3.5 ShardedThreadPool

这里简单介绍一个ShardedThreadPool。在之前的介绍中,ThreadPool实现的线程池,其每个线程都有机会处理工作队列的任意一个任务。这就会导致一个问题,如果任务之间有互斥性,那么正在处理该任务的两个线程有一个必须等待另一个处理完成后才能处理,从而导致线程的阻塞,性能下降。

例2-1 如表2-1所示,线程Thread1和Thread2分别正在处理Job1和Job2。

表2-1 ThreadPool的处理模型示列

由于Job1和Job2的关联性,二者不能并发执行,只能顺序执行,二者之间用一个互斥锁来控制。如果Thread1先获得互斥锁就先执行,Thread2必须等待,直到Thread1执行完Job1后释放了该互斥锁,Thread2获得该互斥锁后才能执行Job2。显然,这种任务的调度方式应对这种不能完全并行的任务是有缺陷的。实际上Thread2可以去执行其他任务,比如Job5。Job1和Job2既然是顺序的,就都可以交给Thread1执行。

因此,引入了Sharded ThreadPool进行管理。ShardedThreadPool对上述的任务调度方式做了改进,其在线程的执行函数里,添加了表示线程的thread_index:

        void shardedthreadpool_worker(uint32_t thread_index);

具体如何实现Shard方式,还需要使用者自己去实现。其基本的思想就是:每个线程对应一个任务队列,所有需要顺序执行的任务都放在同一个线程的任务队列里,全部由该线程执行。

2.4 Finisher

类Finisher用来完成回调函数Context的执行,其内部有一个FinisherThread线程来用于执行Context回调函数:

        class Finisher {
          ……
          vector<Context*> finisher_queue;
                        // 需要执行的Contex,成功返回值为0
          list<pair<Context*, int> > finisher_queue_rval;  、
                        // 需要执行的Context,返回值为int类型的有效值
          ……
        }

2.5 Throttle

类Throttle用来限制消费的资源数量(也常称为槽位“slot”),当请求的slot数量达到max值时,请求就会被阻塞,直到有新的槽位释放出来,代码如下:

        class Throttle {
          CephContext *cct;
          const std::string name;
          PerfCounters *logger;
          ceph::atomic_t count, max;
          // count:当前占用的slot的数量
          // max:sloct数量的最大值
          Mutex lock;         //等待的锁
          list<Cond*> cond;  //等待的条件变量
          ……
        }

函数get用于获取数量为c个slot,参数c默认为1,参数m默认为0,如果m不为默认的0值,就用m值重新设置slot的max值。如果成功获取数量为c个slot,就返回true,否则就阻塞等待。例如:

        bool Throttle::get(int64_t c, int64_t m)

函数get_or_fail当获取不到数量为c个slot时,就直接返回false,不阻塞等待:

        bool Throttle::get_or_fail(int64_t c)

函数put用于释放数量为c个slot资源:

        int64_t Throttle::put(int64_t c)

2.6 SafeTimer

类SafeTimer实现了定时器的功能,代码如下:

        class SafeTimer
        {
          CephContext *cct;
          Mutex& lock;
          Cond cond;
          bool safe_callbacks;      //是否是safe_callbacks
          SafeTimerThread *thread;  //定时器执行线程
          std::multimap<utime_t, Context*> schedule;
                                      //目标时间和定时任务执行函数Context
          std::map<Context*, std::multimap<utime_t, Context*>::iterator> events;
                                      //定时任务<-->定时任务在shedule中的位置映射
          bool stopping;             //是否停止
        }

添加定时任务的命令如下:

        void SafeTimer::add_event_at(utime_t when, Context *callback)

取消定时任务的命令如下:

        bool cancel_event(Context *callback);

定时任务的执行如下:

        void SafeTimer::timer_thread()

本函数一次检查scheduler中的任务是否到期,其循环检查任务是否到期执行。任务在schedule中是按照时间升序排列的。首先检查,如果第一任务没有到时间,后面的任务就不用检查了,直接终止循环。如果第一任务到了定时时间,就调用callback函数执行,如果是safe_callbacks,就必须在获取lock的情况下执行Callback任务。

2.7 本章小结

本章介绍了src/common目录下的一些公共库中比较常见的类的实现。BufferList在数据读写、序列化中使用比较多,它的各种不同成员函数的使用方法需要读者自己进一步了解。对于ShardedThreadPool,本章只介绍了实现的原理,具体实现在不同的场景会有不同,需要读者面对具体的代码自己去分析。