《Ceph源码分析》——第2章,第3节线程池

2.3 线程池
线程池(ThreadPool)在分布式存储系统的实现中是必不可少的,在Ceph的代码中广泛用到。Ceph中线程池的实现也比较复杂,结构如下:

class ThreadPool : public md_config_obs_t {
  CephContext *cct;
  string name;         //线程池的名字
  string lockname;     //锁的名字
  Mutex _lock;         //线程互斥的锁,也是工作队列访问互斥的锁
  Cond _cond;          //锁对应的条件变量
  bool _stop;          //线程池是否停止的标志
  int _pause;          //暂时中止线程池的标志
  int _draining;
  Cond _wait_cond;
  int ioprio_class, ioprio_priority;

  vector<WorkQueue_*> work_queues;     //工作队列
  int last_work_queue;                 //最后访问的工作队列

  set<WorkThread*> _threads;           //线程池中的工作线程
  list<WorkThread*> _old_threads;      //等待进joined操作的线程
  int processing;
}

类ThreadPool里包函一些比较重要的数据成员:
工作线程集合_threads。
等待Join操作的旧线程集合_old_threads。
工作队列集合,保存所有要处理的任务。一般情况下,一个工作队列对应一个类型的处理任务,一个线程池对应一个工作队列,专门用于处理该类型的任务。如果是后台任务,又不紧急,就可以将多个工作队列放置到一个线程池里,该线程池可以处理不同类型的任务。
线程池的实现主要包括:线程池的启动过程,线程池对应的工作队列的管理,线程池对应的执行函数如何执行任务。下面分别介绍这些实现,然后介绍一些Ceph线程池实现的超时检查功能,最后介绍ShardedThreadpool的实现原理。

2.3.1 线程池的启动
函数ThreadPool::start() 用来启动线程池,其在加锁的情况下,调用函数start_threads,该函数检查当前线程数,如果小于配置的线程池,就创建新的工作线程。

2.3.2 工作队列
工作队列(WorkQueue)定义了线程池要处理的任务。任务类型在模板参数中指定。在构造函数里,就把自己加入到线程池的工作队列集合中:

template<class T>
class WorkQueue : public WorkQueue_ {
  ThreadPool *pool;
  WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_
           (n, ti, sti), pool(p) {
    pool->add_work_queue(this);
  }
  ……
}

WorkQueue实现了一部分功能:进队列和出队列,以及加锁,并用通过条件变量通知相应的处理线程:

bool queue(T *item) {
  pool->_lock.Lock();
  bool r = _enqueue(item);
  pool->_cond.SignalOne();
  pool->_lock.Unlock();
  return r;
  }
  void dequeue(T *item) {
    pool->_lock.Lock();
    _dequeue(item);
    pool->_lock.Unlock();
  }
  void clear() {
    pool->_lock.Lock();
    _clear();
    pool->_lock.Unlock();
}

还有一部分功能,需要使用者自己定义。需要自己定义实现保存任务的容器,添加和删除的方法,以及如何处理任务的方法:

virtual bool _enqueue(T *) = 0;
    //从提交的任务中去除一个项
virtual void _dequeue(T *) = 0;
    //去除一个项并返回原始指针
virtual T *_dequeue() = 0;
virtual void _process(T *t) { assert(0); }
virtual void _process(T *t, TPHandle &) {
      _process(t);

2.3.3 线程池的执行函数
函数worker为线程池的执行函数:
void ThreadPool::worker(WorkThread *wt)
其处理过程如下:
1)首先检查_stop标志,确保线程池没有关闭。
2)调用函数join_old_threads把旧的工作线程释放掉。检查如果线程数量大于配置的数量_num_threads,就把当前线程从线程集合中删除,并加入_old_threads队列中,并退出循环。
3)如果线程池没有暂时中止,并且work_queues不为空,就从last_work_queue开始,遍历每一个工作队列,如果工作队列不为空,就取出一个item,调用工作队列的处理函数做处理。

2.3.4 超时检查
TPHandle是一个有意思的事情。每次线程函数执行时,都会设置一个grace超时时间,当线程执行超过该时间,就认为是unhealthy的状态。当执行时间超过suicide_grace时,OSD就会产生断言而导致自杀,代码如下:

struct heartbeat_handle_d {
  const std::string name;
  atomic_t timeout, suicide_timeout;
  time_t grace, suicide_grace;
  std::list<heartbeat_handle_d*>::iterator list_item;
}

class TPHandle {
  friend class ThreadPool;
  CephContext *cct;
  heartbeat_handle_d *hb;   //心跳
  time_t grace;             //超时
  time_t suicide_grace;     //自杀的超时时间
}

结构heartbeat_handle_d记录了相关信息,并把该结构添加到HeartbeatMap的系统链表中保存。OSD会有一个定时器,定时检查是否超时。

2.3.5 ShardedThreadPool
这里简单介绍一个ShardedThreadPool。在之前的介绍中,ThreadPool实现的线程池,其每个线程都有机会处理工作队列的任意一个任务。这就会导致一个问题,如果任务之间有互斥性,那么正在处理该任务的两个线程有一个必须等待另一个处理完成后才能处理,从而导致线程的阻塞,性能下降。
例2-1 如表2-1所示,线程Thread1和Thread2分别正在处理Job1和Job2。
由于Job1和Job2的关联性,二者不能并发执行,只能顺序执行,二者之间用一个互斥锁来控制。如果Thread1先获得互斥锁就先执行,Thread2必须等待,直到Thread1执行完Job1后释放了该互斥锁,Thread2获得该互斥锁后才能执行Job2。显然,这种任务的调度方式应对这种不能完全并行的任务是有缺陷的。实际上Thread2可以去执行其他任务,比如Job5。Job1和Job2既然是顺序的,就都可以交给Thread1执行。
表2-1 ThreadPool的处理模型示列

因此,引入了Sharded ThreadPool进行管理。ShardedThreadPool对上述的任务调度方式做了改进,其在线程的执行函数里,添加了表示线程的thread_index:
void shardedthreadpool_worker(uint32_t thread_index);
具体如何实现Shard方式,还需要使用者自己去实现。其基本的思想就是:每个线程对应一个任务队列,所有需要顺序执行的任务都放在同一个线程的任务队列里,全部由该线程执行。

时间: 2024-10-01 13:27:34

《Ceph源码分析》——第2章,第3节线程池的相关文章

《Ceph源码分析》——导读

目 录序言第1章 Ceph整体架构1.1 Ceph的发展历程1.2 Ceph的设计目标1.3 Ceph基本架构图1.4 Ceph客户端接口 1.4.1 RBD 1.4.2 CephFS1.4.3 RadosGW 1.5 RADOS 1.5.1 Monitor 1.5.2 对象存储1.5.3 pool和PG的概念1.5.4 对象寻址过程1.5.5 数据读写过程1.5.6 数据均衡1.5.7 Peering 1.5.8 Recovery和Backfill 1.5.9 纠删码1.5.10 快照和克隆1

《Ceph源码分析》——第2章,第1节Object

第2章Ceph通用模块本章介绍Ceph源代码通用库中的一些比较关键而又比较复杂的数据结构.Object和Buffer相关的数据结构是普遍使用的.线程池ThreadPool可以提高消息处理的并发能力.Finisher提供了异步操作时来执行回调函数.Throttle在系统的各个模块各个环节都可以看到,它用来限制系统的请求,避免瞬时大量突发请求对系统的冲击.SafteTimer提供了定时器,为超时和定时任务等提供了相应的机制.理解这些数据结构,能够更好理解后面章节的相关内容. 2.1 Object对象

《Ceph源码分析》——第3章,第1节Ceph网络通信框架

第3章Ceph网络通信本章介绍Ceph网络通信模块,这是客户端和服务器通信的底层模块,用来在客户端和服务器之间接收和发送请求.其实现功能比较清晰,是一个相对较独立的模块,理解起来比较容易,所以首先介绍它. 3.1 Ceph网络通信框架一个分布式存储系统需要一个稳定的底层网络通信模块,用于各节点之间的互联互通.对于一个网络通信系统,要求如下:高性能.性能评价的两个指标:带宽和延迟.稳定可靠.数据不丢包,在网络中断时,实现重连等异常处理.网络通信模块的实现在源代码src/msg的目录下,其首先定义了

《Ceph源码分析》——第1章,第5节RADOS

1.5 RADOS RADOS是Ceph存储系统的基石,是一个可扩展的.稳定的.自我管理的.自我修复的对象存储系统,是Ceph存储系统的核心.它完成了一个存储系统的核心功能,包括:Monitor模块为整个存储集群提供全局的配置和系统信息:通过CRUSH算法实现对象的寻址过程:完成对象的读写以及其他数据功能:提供了数据均衡功能:通过Peering过程完成一个PG内存达成数据一致性的过程:提供数据自动恢复的功能:提供克隆和快照功能:实现了对象分层存储的功能:实现了数据一致性检查工具Scrub.下面分

《Ceph源码分析》——第2章,第7节本章小结

2.7 本章小结 本章介绍了src/common目录下的一些公共库中比较常见的类的实现.BufferList在数据读写.序列化中使用比较多,它的各种不同成员函数的使用方法需要读者自己进一步了解.对于ShardedThreadPool,本章只介绍了实现的原理,具体实现在不同的场景会有不同,需要读者面对具体的代码自己去分析.

《Ceph源码分析》——第2章,第2节Buffer

2.2 BufferBuffer就是一个命名空间,在这个命名空间下定义了Buffer相关的数据结构, 这些数据结构在Ceph的源代码中广泛使用.下面介绍的buffer::raw类是基础类,其子类完成了Buffer数据空间的分配,buffer::ptr类实现了Buffer内部的一段数据,buffer::list封装了多个数据段. 2.2.1 buffer::raw类buffer::raw是一个原始的数据Buffer,在其基础之上添加了长度.引用计数和额外的crc校验信息,结构如下:`class b

《Ceph源码分析》——第1章,第2节Ceph的设计目标

1.2 Ceph的设计目标Ceph的设计目标是采用商用硬件(Commodity Hardware)来构建大规模的.具有高可用性.高可扩展性.高性能的分布式存储系统.商用硬件一般指标准的x86服务器,相对于专用硬件,性能和可靠性较差,但由于价格相对低廉,可以通过集群优势来发挥高性能,通过软件的设计解决高可用性和可扩展性.标准化的硬件可以极大地方便管理,且集群的灵活性可以应对多种应用场景.系统的高可用性指的是系统某个部件失效后,系统依然可以提供正常服务的能力.一般用设备部件和数据的冗余来提高可用性.

《Ceph源码分析》——第3章,第2节Simple实现

3.2 Simple实现Simple在Ceph里实现比较早,目前也比较稳定,是在生产环境中使用的网络通信模块.如其名字所示,实现相对比较简单.下面具体分析一下,Simple如何实现Ceph网络通信框架的各个模块. 3.2.1 SimpleMessager类SimpleMessager实现了Messager接口.class SimpleMessenger : public SimplePolicyMessenger { Accepter accepter; //用于接受客户端的链接请求 Dispa

《Ceph源码分析》——第3章,第3节本章小结

3.3 本章小结本章介绍了Ceph的网络通信模块的框架,及目前生产环境中使用的Simple实现.它对每个链接都会有一个发送线程和接收线程用来处理发送和接收.实现的难点还在于网络链接出现错误时的各种错误处理.