MSIPO技术圈 首页 IT技术 查看内容

<Linux> 线程池

2024-03-29

目录

前言:

一、线程池概念

(一)池化技术

(二)优点

(三)应用场景

二、线程池的实现

(一)线程池_V1(朴素版)

(二)线程池_V2(封装版)

(三)线程池_V3(优化版)

三、单例模式

(一)什么是单例模式

(二)单例模式特点

(三)单例模式的简单实现

1. 饿汉模式

2. 懒汉模式

3. 懒汉模式(线程安全版)

(四)线程池_V4(最终版)

四、线程周边问题

(一)STL线程安全问题

(二)智能指针线程安全问题

(三)其他常见锁概念

五、读者写者模型


前言:

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

一、线程池概念

(一)池化技术

所谓的 线程池 就是 提前创建一批线程,当任务来临时,线程直接从任务队列中获取任务执行,可以提高整体效率;同时一批线程会被合理维护,从而避免频繁地创建和销毁对象所带来的开销。

像这种把未来会高频使用到,并且创建较为麻烦的资源提前申请好的技术称为 池化技术池化技术 可以极大地提高性能,最典型的就是 线程池,常用于各种涉及网络连接相关的服务中,比如 MySQL 连接池、HTTP 连接池、Redis 连接池 等。

除了线程池外还有内存池,比如 STL 中的容器在进行空间申请时,都是直接从 空间配置器 allocator 中获取的,并非直接使用系统调用来申请空间。

池化技术 的本质:空间换时间

池化技术 就好比你把钱从银行提前取出一部分放在支付宝中,可以随时使用,十分方便和高效,总不至于需要用钱时还得跑到银行排队取钱。

(二)优点

线程池 的优点在于 高效、方便

  1. 线程在使用前就已经创建好了,使用时直接将任务交给线程完成
  2. 线程会被合理调度,确保 任务与线程 间能做到负载均衡

线程池 中的线程数量不是越多越好,因为线程增多会导致调度变复杂,具体创建多少线程取决于具体业务场景,比如 处理器内核、剩余内存、网络中的 socket 数量等。

线程池 还可以配合 「生产者消费者模型」 一起使用,做到 解耦与提高效率。

  • 可以把 任务队列 换成 「生产者消费者模型」

(三)应用场景

线程池 有以下几种应用场景:

  1. 存在大量且短小的任务请求:比如 Web 服务器中的网页请求,使用 线程池 就非常合适,因为网页点击量众多,并且大多都没有长时间连接访问。
  2. 对性能要求苛刻,力求快速响应需求:比如游戏服务器,要求对玩家的操作做出快速响应。
  3. 突发大量请求,但不至于使服务器产生过多的线程:短时间内,在服务器创建大量线程会使得内存达到极限,造成出错,可以使用 线程池 规避问题。
  4. 定时任务调度:对于需要定时执行的任务,线程池也能发挥重要作用。比如,定期发送邮件、更新数据等任务可以通过线程池来调度和执行。

二、线程池的实现

(一)线程池_V1(朴素版)

朴素版:实现最基本的线程池功能,直接使用系统提供的接口。

所谓朴素版就是不加任何优化设计,只实现 线程池 最基础的功能,便于理解 线程池

创建 ThreadPool_v1.hpp 头文件

将 线程池 实现为一个类,提供接口供外部调用

首先要明白 线程池 的两大核心:一批线程 与 任务队列,客户端发出请求,新增任务,线程获取任务,执行任务,因此 ThreadPool_v1.hpp 的大体框架如下:

  • 一批线程,通过容器管理
  • 任务队列,存储就绪的任务
  • 互斥锁
  • 条件变量

互斥锁 的作用是 保证多个线程并发访问任务队列时的线程安全,而 条件变量 可以在 任务队列 为空时,让一批线程进入等待状态,也就是线程同步。

#pragma once

#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>

#define THREAD_NUM 5

template<class T>
class MyThreadPool
{
public:
    MyThreadPool(int num = THREAD_NUM)
        :_threads(num), _num(num)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    
    ~MyThreadPool()
    {
        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    void init()
    {
        // 其他信息初始化...
    }

    void start()
    {
        // 启动线程池...
    }

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理...
    }
private:
    std::vector<pthread_t> _threads;
    int _num; // 线程数量
    std::queue<T> _tasks; // 利用STL自动扩容的特性,无须担心容量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;
};

注意:

  • 需要提前给 vector 扩容,避免后面使用时发生越界访问。
  • 提供给线程的回调函数需要设置为静态,否则线程调不动(参数不匹配)。

填补函数体

初始化线程池 init()

当前场景只需要初始化 互斥锁 和 条件变量,在 构造函数 中完成就行了,所以这里的 init() 函数不需要补充

启动线程池 start()

启动 线程池 需要先创建出一批线程,这里直接循环创建即可

    void start()
    {
        // 启动线程池
        for(int i = 0; i < _num; i++)
            pthread_create(&_threads[i], nullptr, threadRoutine, this);
        
    }

这里进行简单打印,打印当前线程的线程 ID 就行了,并且直接 detach,主线程无需等待次线程运行结束

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        while(true)
        {
            sleep(1);

            // std::cout << " thread running..." << pthread_self()  << std::endl;
            printf("thread running... %lu\n", (unsigned long)pthread_self());
            printf("================================\n");
        }
    }

创建 main.cc 源文件,测试线程池的代码

#include "ThreadPool.hpp"
#include <memory>

int main()
{
    std::unique_ptr<MyThreadPool<int>> ptr(new MyThreadPool<int>());

    ptr->init();
    int cnt = 3;
    while(cnt--)
    {
        sleep(1);
        ptr->start();
    }

    return 0;
}

编译并运行代码,可以看到 确实创建了一批线程,当主线程退出后,其他次线程也就跟着终止了 


线程池 还需要提供一个重要的接口 pushTask(),将用户需要执行的业务装载至 任务队列 中,等待线程执行

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护
        pthread_mutex_lock(&_mtx);
        _tasks.push(task);

        // 唤醒消费者消费
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_mtx);
    }

装载任务的本质就是在生产任务,相当于用户充当生产者,通过这个接口将任务生产至任务队列中,而线程充当消费者,从任务队列中获取任务并消费。

所以线程的回调函数需要从 任务队列 中获取任务,进行消费

  1. 检测是否有任务
  2. 有 -> 消费
  3. 没有 -> 等待

修改回调函数

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        while(true)
        {
            // sleep(1);

            // 任务队列是临界资源,需要保护
            pthread_mutex_lock(&_mtx);

            // 等待条件满足
            while(_tasks.empty())
                pthread_cond_wait(&_cond, &_mtx);

            T task = _tasks.front();
            _tasks.pop();
            // task(); // 进行消费,稍后实现

            pthread_mutex_unlock(&_mtx);
        }
    }

注意: 判断任务队列是否为空需要使用 while,确保在多线程环境中不会出现问题。

因为 任务队列、互斥锁、条件变量 是类内成员,而这里的 threadRoutine() 函数是一个静态函数,并没有 this 指针以访问类内成员,可以采取传递 this 指针的方式解决问题

    void start()
    {  
        // 启动线程池
        for(int i = 0; i < _num; i++)
            pthread_create(&_threads[i], nullptr, threadRoutine, this);
    }

threadRoutine() 函数需要将参数 void* 转化为所在类对象的指针,并通过该指针访问类内成员

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        auto ptr = static_cast<MyThreadPool<T>*>(args);
        
        while(true)
        {
            sleep(1);

            // 任务队列是临界资源,需要保护
            pthread_mutex_lock(&ptr->_mtx);

            // 等待条件满足
            while(ptr->_tasks.empty())
                pthread_cond_wait(&ptr->_cond, &ptr->_mtx);

            T task = ptr->_tasks.front();
            ptr->_tasks.pop();
            // task(); // 进行消费

            pthread_mutex_unlock(&ptr->_mtx);
        }
    }

为了使得提高代码的可阅读性及可拓展性,这里将会封装一批接口,供函数调用

加解锁

    void lockQueue()
    {
        pthread_mutex_lock(&_mtx);
    }
    void unlockQueue()
    {
        pthread_mutex_unlock(&_mtx);
    }

等待和唤醒

    void threadWait()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }
    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

判空、获取任务

    void isEmpty()
    {
        return _tasks.empty();
    }

    T popTask()
    {
        T task = _tasks.front();
        _tasks.pop();
        return task;
    }

修改装载任务 pushTask()

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护
        lockQueue();
        _tasks.push(task);

        // 唤醒消费者消费
        threadWakeup();
        threadWait();
    }

以及 消费者 threadRountine()

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        auto ptr = static_cast<MyThreadPool<T>*>(args);
        
        while(true)
        {
            sleep(1);

            // 任务队列是临界资源,需要保护
            ptr->lockQueue();

            // 等待条件满足
            while(ptr->isEmpty())
                ptr->threadWait();

            T task = ptr->popTask();

            pthread_mutex_unlock(&ptr->_mtx);

            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            // task(); 
        }
    }

细节: 轮到线程执行任务时,不需要加锁,这就好比你买桶泡面回家,是不必担心别人会和你争抢,可以慢慢消费;同样的,你也不应该占用锁资源,主动让出锁资源以提高整体效率。

task() 表示执行任务,这里实际是一个 operator()() 的重载,详见 <Linux> 生产者消费者模型 中关于 Task.hpp 的设计,因为我们这里也需要使用任务,所以可以直接把之前写的代码拷贝过来

#pragma once

#include <string>

template<class T>
class Task
{
public:
    Task(T x = 0, T y = 0, char op = '+')
        :_x(x), _y(y), _op(op), _result(0), _err(0)
    {}

    // 重载运算操作
    void operator()()
    {
        // 加减乘除
        switch(_op)
        {
            case '+': 
                _result = _x + _y;
            break;
            case '-': 
                _result = _x - _y;
            break;
            case '*': 
                _result = _x * _y;
            break;
            case '/': 
                _result = _x / _y;
            break;
            case '%':
                if(_y == 0) _err = -2;
                else _result = _x % _y;
            break;
            default:
                _err = -3;
            break;
        }
    }

    // 获取计算结果
    std::string getResult()
    {
        std::string ret = std::to_string(_x) + " " + _op
                        + " " + std::to_string(_y);

        if(_err)
        {
            ret += " error ";

            // 判断是 / 错误还是 % 错误
            if(_err == -1) ret += " [-1]  / 0 引发了错误";
            if(_err == -2) ret += " [-2]  % 0 引发了错误";
            else ret += " [-3]  不合法的操作符,只能为 [+-*/%]";
        }
        else
        {
            ret += " = " + std::to_string(_result);
        }
        return ret;
    }
private:
    T _x;
    T _y;
    char _op; // 运算符
    T _result; // 结果
    int _err; // 错误标识
};

轮到 Main.cc 进行操作了,逻辑很简单:创建线程池对象,初始化线程池,启动线程池,装载任务,等待运行结果

#include "ThreadPool.hpp"
#include <memory>

int main()
{
    std::unique_ptr<MyThreadPool<int>> ptr(new MyThreadPool<int>());

    ptr->init();
    ptr->start();

    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入x: ";
        std::cin >> x;
        std::cout << "输入y: ";
        std::cin >> y;
        std::cout << "输入op: ";
        std::cin >> op;

        // 构建任务对象
        Task task(x, y, op);
        
        // 装载任务
        ptr->pushTask(task);
    }
    return 0;
}

现在还有最后一个问题:如何获取计算结果?可以在 线程 执行完任务后,直接显示计算结果,也可以通过传入回调函数的方式,获取计算结果,前者非常简单,只需要在 threadRoutine() 中加入这行代码即可:

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
            // ...
            
            // 显示计算结果
            std::cout << task.getResult() << std::endl;
    }

除此之外,我们也可以通过 回调函数 的方式获取计算结果

目标:给线程传入一个回调函数,线程执行完 任务后,将任务传给回调函数,回调函数结合业务逻辑,灵活处理结果。

这里我们选择使用回调函数打印结果,在主函数中很容易就可以写出这个回调函数

// 回调函数
void CallBack(Task<int> &task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();
    std::cout << "计算结果为:" << ret << std::endl;;
}

为了能让 线程 在执行任务后能回调,需要将这个函数对象作为参数,传递给 ThreadPool 对象

int main()
{
    std::unique_ptr<MyThreadPool<Task<int>>> ptr(new MyThreadPool<Task<int>>(CallBack));

    // ...
}

当然,这边传递了一个对象,那边就得接收此对象,为了存储该函数对象,ThreadPool 新增一个类成员:_func,函数对象类型为 void (T&)

#include <functional>

#define THREAD_NUM 5

template<class T>
class MyThreadPool
{
    using func_t = std::function<void(T&)>;

public:
    MyThreadPool(func_t func, int num = THREAD_NUM)
        :_threads(num), _num(num), _func(func)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

private:
    func_t _func;
};

修改完成后,创建 ThreadPool 对象时,支持传入一个类型为 void(T&) 的函数对象

获取函数对象后,需要让 线程 在执行完任务后进行回调,但又因为这玩意是一个类内成员,同样需要借助外部传入的 this 指针进行访问,这里直接封装成一个接口,顺便进行调用

private: 
   func_t callBack(T &task)
    {
        _func(task);
    }

最后补充任务处理与 threadRoutine

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
            // ...

            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            task();
            ptr->callBack(task);
        }
    }

程序结果正常,不必在意打印问题,因为屏幕也是被多线程并发访问的资源,没加锁保护,导致出现问题。

(二)线程池_V2(封装版)

引入自己封装实现的线程库 Thread.hpp,支持对线程做出更多操作

#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>

using namespace std;

class Thread
{
public:
    // 状态表
    typedef enum
    {
        NEW = 0,
        RUNNING,
        EXITED
    }ThreadStatus;
    typedef void (*func_t)(void *);
public:
    Thread(int num, func_t func, void *args)
        :_tid(0), _func(func), _status(NEW), _args(args)
    {
        // 根据ID写入名字
        char name[128];
        snprintf(name, sizeof(name), "thread-%d", num);
        _name = name;
    }
    
    ~Thread()
    {}
    
    // 获取线程ID
    pthread_t getID() 
    { 
        if (_status == RUNNING)
            return _tid;
        else
            return 0;
     }

    // 获取线程名
    string getName() { return _name; }

    // 获取线程状态
    int getStatus() { return _status; }

    // 启动线程
    void run()
    {
        int n = pthread_create(&_tid, nullptr, runHelper, this/*需考虑*/);
        if(n != 0)
        {
            cerr << "create thread fail" << endl;
            exit(1);
        }
        _status = RUNNING;// 线程跑起来状态为运行中
    }

    // // 回调函数
    // static void *runHelper(void *args)
    // {
    //     Thread *ts = static_cast<Thread*>(args);

    //     ts->_func(ts->_args); 
    //     // return nullptr;
    // }

    static void *runHelper(void *args)
    {
        Thread *ts = (Thread*)args; // 就拿到了当前对象
        // _func(_args);
        (*ts)();
        return nullptr;
    }
    void operator ()() //仿函数
    {
        if(_func != nullptr) _func(_args);
    }

    // 线程等待
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        if(n != 0)
        {
            cerr << "join thread fail" << endl;
            exit(1);
        }
        _status = EXITED;// 线程等待成功后状态为退出
    }
private:
    pthread_t _tid; // 线程ID
    func_t _func; // 线程回调函数
    ThreadStatus _status; // 线程状态
    void *_args; // 回调函数的参数,可以设置成模板
    string _name; // 线程名
};

不再直接使用原生线程库,转而使用自己封装的线程库

#pragma once
#include "Task.hpp"
#include "Thread.hpp"

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <functional>

#define THREAD_NUM 5

template<class T>
class ThreadPool
{
    using func_t = std::function<void(T&)>; // 包装器
public:
    void lockQueue()
    {
        pthread_mutex_lock(&_mtx);
    }
    void unlockQueue()
    {
        pthread_mutex_unlock(&_mtx);
    }

    void threadWait()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }
    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    bool isEmpty()
    {
        return _tasks.empty();
    }

    T popTask()
    {
        T task = _tasks.front();
        _tasks.pop();
        return task;
    }

    func_t callBack(T &task)
    {
        _func(task);
    }
public:
    ThreadPool(func_t func, int num = THREAD_NUM)
        : _num(num), _func(func)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        // 等待线程退出
        for(auto &t : _threads)
            t.join();

        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    void init()
    {
        for(int i = 0; i < _num; i++)
            _threads.push_back(Thread(i, threadRoutine, this));
    }

    void start()
    {
        // 启动线程
        for(int i = 0; i < _num; i++)
            _threads[i].run();
    }

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护
        lockQueue();
        _tasks.push(task);

        // 唤醒消费者消费
        threadWakeup();
        unlockQueue();
    }

    // 给线程的回调函数
    static void threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        ThreadPool<T> *ptr = static_cast<ThreadPool<T> *>(args);

        while(true)
        {
            // 任务队列是临界资源,需要保护
            ptr->lockQueue();

            // 等待条件满足
            while(ptr->isEmpty())
                ptr->threadWait();

            T task = ptr->popTask();

            ptr->unlockQueue();

            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            task();
            ptr->callBack(task);
        }
    }
private:
    func_t _func;
    std::vector<Thread> _threads;
    int _num; // 线程数量
    std::queue<T> _tasks; // 利用STL自动扩容的特性,无须担心容量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;
};

涉及修改的内容:

  • _threads 类型由 vector<pthread_t> 变为 vector<Thread>
  • init() 函数用于创建线程,注册线程信息
  • start() 函数用于启动线程
  • ~ThreadPool() 中新增等待线程退出
  • 线程回调函数 threadRoutinue() 返回值改为 void
  • 新增函数对象 _func


 (三)线程池_V3(优化版)

 优化版:引入 RAII 风格的锁,实现自动化加锁与解锁。

手动 加锁、解锁 显得不够专业,并且容易出问题,比如忘记释放锁资源而造成死锁,因此我们可以设计一个小组件 LockGuard,实现 RAII 风格的锁:初始化创建,析构时销毁

#pragma once

#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *pmtx)
        :_pmtx(pmtx)
    {
        // 加锁
        pthread_mutex_lock(_pmtx);
    }

    ~LockGuard()
    {
        // 解锁
        pthread_mutex_unlock(_pmtx);
    }
private:
    pthread_mutex_t *_pmtx;
};

将这个锁类加入 ThreadPool_V3.hpp 中,可以得到以下代码:

#pragma once
#include "Task.hpp"
#include "Thread.hpp"
#include "Mutex.hpp"

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <functional>

#define THREAD_NUM 5

template<class T>
class ThreadPool
{
    using func_t = std::function<void(T&)>; // 包装器
public:
    pthread_mutex_t* getlock()
    {
        return &_mtx;
    }

    void threadWait()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }

    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    bool isEmpty()
    {
        return _tasks.empty();
    }

    T popTask()
    {
        T task = _tasks.front();
        _tasks.pop();
        return task;
    }

    func_t callBack(T &task)
    {
        _func(task);
    }
public:
    ThreadPool(func_t func, int num = THREAD_NUM)
        : _num(num), _func(func)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        // 等待线程退出
        for(auto &t : _threads)
            t.join();

        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    void init()
    {
        for(int i = 0; i < _num; i++)
            _threads.push_back(Thread(i, threadRoutine, this));
    }

    void start()
    {
        // 启动线程
        for(int i = 0; i < _num; i++)
            _threads[i].run();
    }

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护,自动加锁解锁
        LockGuard lockgrard(&_mtx);
        _tasks.push(task);

        // 唤醒消费者消费
        threadWakeup();
    }

    // 给线程的回调函数
    static void threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        // pthread_detach(pthread_self());

        ThreadPool<T> *ptr = static_cast<ThreadPool<T> *>(args);

        while(true)
        {
            T task;
            {
                //  自动解锁加锁
                LockGuard lockguard(ptr->getlock());
                // 等待条件满足
                while(ptr->isEmpty())
                    ptr->threadWait();

                task = ptr->popTask();
            }
            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            task();
            ptr->callBack(task);
        }
    }
private:
    func_t _func;
    std::vector<Thread> _threads;
    int _num; // 线程数量
    std::queue<T> _tasks; // 利用STL自动扩容的特性,无须担心容量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;
};

如何证明现在有一批线程在运行呢?

通过指令查看,当程序运行后,再新开一个终端,并输入以下命令

ps -aL | grep threadPool

可以看到:除了主线程 20251外,其他次线程都在等待任务就绪,当大量并发任务来临时,线程池是能大大提高效率的 :

三、单例模式

(一)什么是单例模式

代码构建类,类实例化出对象,这个实例化出的对象也可以称为 实例,比如常见的 STL 容器,在使用时,都是先根据库中的类,形成一个 实例 以供使用;正常情况下,一个类可以实例化出很多很多个对象,但对于某些场景来说,是不适合创建出多个对象的。

比如本文中提到的 线程池,当程序运行后,仅需一个 线程池对象 来进行高效任务计算,因为多个 线程池对象 无疑会大大增加调度成本,因此需要对 线程池类 进行特殊设计,使其只能创建一个 对象,换句话说就是不能让别人再创建对象。

正如 一山不容二虎 一样,线程池 对象在一个程序中是不推荐出现多个的,在一个程序中只允许实例化出一个对象,可以通过 单例模式 来实现,单例模式 是非常 经典、常用、常考 的设计模式。

什么是设计模式?
设计模式就是计算机大佬们在长时间项目实战中总结出来的解决方案,是帮助菜鸡编写高质量代码的利器,常见的设计模式有 单例模式、建造者模式、工厂模式、代理模式等

(二)单例模式特点

单例模式 最大的特点就是 只允许存在一个对象(实例),这就好比现在的 一夫一妻制 一样,要是在古代,单例模式 肯定不被推崇

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百 GB) 到内存中,此时往往要用一个 单例 的类来管理这些数据;在我们今天的场景中,也需要一个 单例线程池 来协同生产者与消费者。

(三)单例模式的简单实现

单例模式 有两种实现方向:饿汉 与 懒汉,它们避免类被再次创建出对象的手段是一样的:构造函数私有化、删除拷贝构造。

只要外部无法访问 构造函数,那么也就无法构建对象了,比如下面这个类 Signal

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
};

当外界试图创建对象时

#include <iostream>
#include "Signal.hpp"

int main()
{
    Signal s;

    return 0;
}

当然这只实现了一半,还有另一半是 创建一个单例对象,既然外部受权限约束无法创建对象,那么类内是肯定可以创建对象的,只需要创建一个指向该类对象的 静态指针 或者一个 静态对象,再初始化就好了;因为外部无法访问该指针,所以还需要提供一个静态函数 getInstance() 以获取单例对象句柄,至于具体怎么实现,需要分不同方向(饿汉 or 懒汉)

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    // 获取单例对象的句柄
    static Signal *getInstance()
    {
        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    static Signal *_sigptr;
};

注意: 构造函数不能只声明,需要实现,即使什么都不写。

为什么要删除拷贝构造?
如果不删除拷贝构造,那么外部可以借助拷贝构造函数,拷贝构造出一个与 单例对象 一致的 “对象”,此时就出现两个对象,这是不符合 单例模式 特点的。

为什么要创建一个静态函数?
单例对象也需要被初始化,并且要能被外部使用
调用链逻辑:通过静态函数获取句柄(静态单例对象地址)-> 通过地址调用该对象的其他函数

1. 饿汉模式

张三总是很饿,尽管饭菜还没准备好,他就已经早早的把碗洗好了,等到开饭时,直接开干。

饿汉模式 也是如此,在程序加载到内存时,就已经早早的把 单例对象 创建好了(此时程序服务还没有完全启动),也就是在外部直接通过 new 实例化一个对象,具体实现如下

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    // 获取单例对象的句柄
    static Signal *getInstance()
    {
        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    static Signal *_sigptr;
};

Signal *_sigptr = new Signal();

注:在程序加载时,该对象会被创建

这里的 单例对象 本质就有点像 全局变量,在程序加载时就已经创建好了。外部可以直接通过 getInstance() 获取 单例对象 的操作句柄,来调用类中的其他函数。

#include <iostream>
#include "Signal.hpp"

int main()
{
    Signal::getInstance()->print();

    return 0;
}

这就实现了一个简单的 饿汉版单例类,除了创建 static Signal* 静态单例对象指针 外,也可以直接定义一个 静态单例对象,生命周期随进程,不过要注意的是:getInstance() 需要返回的也是该静态单例对象的地址,不能返回值,因为拷贝构造被删除了;并且需要在类的外部初始化该静态单例对象:

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    // 获取单例对象的地址
    static Signal *getInstance()
    {
        return &_sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:    
    // 静态单例对象
    static Signal _sig;
};
// 初始化
Signal Signal::_sig;

饿汉模式 是一个相对简单的单例实现方向,只需要在类中声明,在类外初始化就行了,但它也会带来一定的弊端:延缓服务启动速度。

完全启动服务是需要时间的,创建单例对象也是需要时间的,饿汉模式 在服务正式启动前会先创建对象,但凡这个单例类很大,服务启动时间势必会受到影响,大型项目启动,时间就是金钱。

并且由于饿汉模式每次都会先创建单例对象,再启动服务,如果后续使用单例对象还好说,但如果后续没有使用 单例对象,那么这个对象就是白创建了,在延缓服务启动的同时造成了一定的资源浪费。

综上所述,饿汉模式不是很推荐使用,除非图实现简单,并且服务规模较小;既然 饿汉模式 有缺点,就需要改进,于是就出现了懒汉模式。

2. 懒汉模式

李四也是个很饿的人,他也有一个自己的碗,吃完饭后碗会脏,但他不像张三那样极端,李四比较懒,只有等他吃饭的时候,他才会去洗碗,李四这种做法让他感到无比轻松。

在 懒汉模式 中,单例对象 并不会在程序加载时创建,而是在第一次调用时创建,第一次调用创建后,后续无需再创建,直接使用即可

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    static Signal *getInstance()
    {
        // 第一次调用才创建
        if(_sigptr == nullptr)
        {
            _sigptr = new Signal();
        }
        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    // 指向单例对象的静态指针
    static Signal *_sigptr;
};

// 初始化静态指针
Signal* Signal::_sigptr = nullptr;

注意: 此时的静态指针需要初始化为 nullptr,方便第一次判断 

饿汉模式 中出现的问题这里全都避免了

  • 创建耗时 -> 只在第一次使用时创建
  • 占用资源 -> 如果不使用,就不会被创建

懒汉模式 的核心在于 延时加载,可以优化服务器的速度及资源占用

延时加载这种机制就有点像 写时拷贝,就赌你不会使用,从而节省资源开销,类似的还有 动态库、进程地址空间 等。

当然,懒汉模式 下也是可以正常使用 单例对象 的

这样看来,懒汉模式 确实优秀,实现起来也不麻烦,为什么会说 饿汉模式 更简单呢?

这是因为当前只是单线程场景,程序暂时没啥问题,如果当前是多线程场景,问题就大了,如果一批线程同时调用 getInstance(),同时认定 _sigptr 为空,就会创建多个 单例对象,这是不合理的,也就是说当前实现的 懒汉模式 存在严重的线程安全问题。

如何证明?
简单改一下代码,每创建一个单例对象,就打印一条语句,将代码放入多线程环境中测试

    static Signal *getInstance()
    {
        // 第一次调用才创建
        if(_sigptr == nullptr)
        {
            std::cout << "已创建一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        return _sigptr;
    }

其中使用了 lambda 表达式来作为线程的回调函数,重点在于查看现象 

#include <iostream>
#include <pthread.h>
#include "Signal.hpp"

int main()
{
    // Signal::getInstance()->print();

    // 多线程场景
    pthread_t pd[5];
    for(int i = 0; i < 5; i++)
    {
        pthread_create(pd+i, nullptr, [](void*)->void*
            {
                auto ptr = Signal::getInstance();
                ptr->print();
                return nullptr;
            }, nullptr);
    }

    for(int i = 0; i < 5; i++)
        pthread_join(pd[i], nullptr);

    return 0;
}

当前代码在多线程环境中,同时创建了多个 单例对象,因此是存在线程安全问题的

饿汉模式没有线程安全问题吗?
没有,因为饿汉模式下,单例对象一开始就被创建了,即便是多线程场景中,也不会创建多个对象,它们也做不到

3. 懒汉模式(线程安全版)

解决多线程并发访问的利器是 互斥锁,那就创建 互斥锁 保护单例对象的创建

#pragma once

#include <iostream>
#include <mutex>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    static Signal *getInstance()
    {
        // 加锁解锁
        pthread_mutex_lock(&_mtx);

        // 第一次调用才创建
        if(_sigptr == nullptr)
        {
            std::cout << "已创建一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        pthread_mutex_unlock(&_mtx);

        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    // 指向单例对象的静态指针
    static Signal *_sigptr;   

    static pthread_mutex_t _mtx;
};

// 初始化静态指针
Signal* Signal::_sigptr = nullptr;

// 初始化互斥锁
pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;

注意: getInstance() 是静态函数,互斥锁也要定义为静态的,可以初始化为全局静态锁

结果是没有问题,单例对象也只会创建一个,现在还面临最后一个问题:效率问题

当前代码确实能保证只会创建一个单例对象,但即使后续不会创建单例对象,也需要进行加锁、判断、解锁这个流程,要知道加锁也是有资源消耗的,所以这种写法不妥。

解决方案是:DoubleCheck 双检查加锁

在 加锁 前再增加一层判断,如此一来,N 个线程,顶多只会进行 N 次 加锁与解锁,这是非常优雅的解决方案:

    static Signal *getInstance()
    {
        // 双检查
        if(_sigptr == nullptr)
        {
            // 加锁解锁
            pthread_mutex_lock(&_mtx);

            // 第一次调用才创建
            if(_sigptr == nullptr)
            {
                std::cout << "已创建一个单例对象" << std::endl;
                _sigptr = new Signal();
            }
            pthread_mutex_unlock(&_mtx);
        }
        return _sigptr;
    }

单纯的 if 判断并不会消耗很多资源,但 加锁 行为会消耗资源,延缓程序运行速度,双检查加锁 可以有效避免这个问题

这是个精妙绝伦的代码设计,值得学习。


所以 懒汉模式 麻烦吗?
相比于 饿汉模式,确实挺麻烦的,不仅要判断后创建 单例对象,还需要考虑线程安全问题

值得一提的是,懒汉模式 还有一种非常简单的写法:调用 getInstance() 时创建一个静态单例对象并返回,因为静态单例对象只会初始化一次,所以是可行的,并且在 C++11 之后,可以保证静态变量初始化时的线程安全问题,也就不需要 双检查加锁 了,实现起来非常简单

#pragma once

#include <iostream>
#include <mutex>

// 懒汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造     
    Signal(const Signal&) = delete;
public:
    static Signal *getInstance()
    {
        // 静态单例对象,只会初始化一次,并且生命周期随进程
        static Signal _sig;
        return &_sig;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }
};

所以如果当前的生产环境所支持的 C++ 版本为 C++11 及以后,在实现 懒汉模式 时可以选择这种简便的方式,是非常不错的;如果为了兼容性,也可以选择传统写法。

注意: 静态变量创建时的线程安全问题,在 C++11 之前是不被保障的。

关于 单例模式 的其他问题

new 出来的单例对象不需要销毁吗?
这个单例对象生成周期随进程,进程结束了,资源也就都被销毁了,如果想手动销毁,可以设计一个垃圾回收内部类 GC,主动去销毁单例对象。

(四)线程池_V4(最终版)

最终版:将线程池改为 单例模式,只允许存在一个线程池对象。这里选择 懒汉模式,因为比较优秀,并且为了确保兼

相关阅读

热门文章

    手机版|MSIPO技术圈 皖ICP备19022944号-2

    Copyright © 2024, msipo.com

    返回顶部