龙盟编程博客 | 无障碍搜索 | 云盘搜索神器
快速搜索
主页 > 软件开发 > C/C++开发 >

c++版线程池和任务池示例

时间:2014-05-15 17:41来源:网络整理 作者:网络 点击:
分享到:
这篇文章主要介绍了c++版线程池和任务池,实现任务执行完毕线程退出.在linux下压力测试通过

commondef.h

代码如下:

//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁
const int CHECK_IDLE_TASK_INTERVAL = 300;
//单位秒,任务自动销毁时间间隔
const int TASK_DESTROY_INTERVAL = 60;

//监控线程池是否为空时间间隔,微秒
const int IDLE_CHECK_POLL_EMPTY = 500;

//线程池线程空闲自动退出时间间隔 ,5分钟
const int  THREAD_WAIT_TIME_OUT = 300;

taskpool.cpp

代码如下:

#include "taskpool.h"

#include <string.h>

#include <stdio.h>
#include <pthread.h>

    TaskPool::TaskPool(const int & poolMaxSize)
    : m_poolSize(poolMaxSize)
      , m_taskListSize(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_lock, NULL);
    pthread_mutex_init(&m_idleMutex, NULL);
    pthread_cond_init(&m_idleCond, NULL);

    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 让线程独立运行
    pthread_create(&m_idleId, &attr, CheckIdleTask, this); //创建监测空闲任务进程
    pthread_attr_destroy(&attr);
}

TaskPool::~TaskPool()
{
    if(!m_bStop)
    {
        StopPool();
    }
    if(!m_taskList.empty())
    {
        std::list<Task*>::iterator it = m_taskList.begin();
        for(; it != m_taskList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_taskList.clear();
        m_taskListSize = 0;
    }
    if(!m_idleList.empty())
    {
        std::list<Task*>::iterator it = m_idleList.begin();
        for(; it != m_idleList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_idleList.clear();
    }


    pthread_mutex_destroy(&m_lock);
    pthread_mutex_destroy(&m_idleMutex);
    pthread_cond_destroy(&m_idleCond);
}

void * TaskPool::CheckIdleTask(void * arg)
{
    TaskPool * pool = (TaskPool*)arg;
    while(1)
    {
        pool->LockIdle();
        pool->RemoveIdleTask();
        if(pool->GetStop())
        {
            pool->UnlockIdle();
            break;
        }
        pool->CheckIdleWait();
        pool->UnlockIdle();
    }
}

void TaskPool::StopPool()
{
    m_bStop = true;
    LockIdle();
    pthread_cond_signal(&m_idleCond); //防止监控线程正在等待,而引起无法退出的问题
    UnlockIdle();
    pthread_join(m_idleId, NULL);
}

bool TaskPool::GetStop()
{
    return m_bStop;
}

void TaskPool::CheckIdleWait()
{
    struct timespec timeout;
    memset(&timeout, 0, sizeof(timeout));
    timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
    timeout.tv_nsec = 0;
    pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}

int TaskPool::RemoveIdleTask()
{
    int iRet = 0;
    std::list<Task*>::iterator it, next;
    std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();
    time_t curTime = time(0);
    for(; rit != m_idleList.rend(); )
    {
        it = --rit.base();
        if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
        {
            iRet++;
            delete *it;
            *it = NULL;
            next = m_idleList.erase(it);
            rit = std::list<Task*>::reverse_iterator(next);
        }
        else
        {
            break;   
        }
    }
}

int TaskPool::AddTask(task_fun fun, void *arg)
{
    int iRet = 0;
    if(0 != fun)
    {
        pthread_mutex_lock(&m_lock);
        if(m_taskListSize >= m_poolSize)
        {
            pthread_mutex_unlock(&m_lock);
            iRet = -1; //task pool is full;
        }
        else
        {
            pthread_mutex_unlock(&m_lock);
            Task * task = GetIdleTask();
            if(NULL == task)
            {
                task = new Task;
            }
            if(NULL == task)
            {
                iRet = -2; // new failed
            }
            else
            {
                task->fun = fun;
                task->data = arg;
                pthread_mutex_lock(&m_lock);
                m_taskList.push_back(task);
                ++m_taskListSize;
                pthread_mutex_unlock(&m_lock);
            }
        }
    }
    return iRet;
}

Task* TaskPool::GetTask()
{
    Task *task = NULL;
    pthread_mutex_lock(&m_lock);
    if(!m_taskList.empty())
    {
        task =  m_taskList.front();
        m_taskList.pop_front();
        --m_taskListSize;
    }
    pthread_mutex_unlock(&m_lock);
    return task;
}

void TaskPool::LockIdle()
{
    pthread_mutex_lock(&m_idleMutex);
}

void TaskPool::UnlockIdle()
{
    pthread_mutex_unlock(&m_idleMutex);
}

Task * TaskPool::GetIdleTask()
{
    LockIdle();
    Task * task = NULL;
    if(!m_idleList.empty())
    {
        task = m_idleList.front();
        m_idleList.pop_front();
    }
    UnlockIdle();
    return task;
}

void TaskPool::SaveIdleTask(Task*task)
{
    if(NULL != task)
    {
        task->fun = 0;
        task->data = NULL;
        task->last_time = time(0);
        LockIdle();
        m_idleList.push_front(task);
        UnlockIdle();
    }
}

taskpool.h

代码如下:

#ifndef TASKPOOL_H
#define TASKPOOL_H
/* purpose @ 任务池,主要是缓冲外部高并发任务数,有manager负责调度任务
 *          任务池可自动销毁长时间空闲的Task对象
 *          可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间
 *          TASK_DESTROY_INTERVAL 设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include <list>
#include <pthread.h>
#include "commondef.h"

//所有的用户操作为一个task,
typedef void (*task_fun)(void *);
struct Task
{
    task_fun fun; //任务处理函数
    void* data; //任务处理数据
    time_t last_time; //加入空闲队列的时间,用于自动销毁
};

//任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池
class TaskPool
{
public:
 /* pur @ 初始化任务池,启动任务池空闲队列自动销毁线程
     * para @ maxSize 最大任务数,大于0
    */
    TaskPool(const int & poolMaxSize);
    ~TaskPool();

    /* pur @ 添加任务到任务队列的尾部
     * para @ task, 具体任务
     * return @ 0 添加成功,负数 添加失败
    */   
    int AddTask(task_fun fun, void* arg);

    /* pur @ 从任务列表的头获取一个任务
     * return @  如果列表中有任务则返回一个Task指针,否则返回一个NULL
    */   
    Task* GetTask();

    /* pur @ 保存空闲任务到空闲队列中
     * para @ task 已被调用执行的任务
     * return @
    */
    void SaveIdleTask(Task*task);

    void StopPool();
public:
    void LockIdle();
    void UnlockIdle();
    void CheckIdleWait();
    int RemoveIdleTask();
    bool GetStop();
private:
    static void * CheckIdleTask(void *);
    /* pur @ 获取空闲的task
     * para @
     * para @
     * return @ NULL说明没有空闲的,否则从m_idleList中获取一个
    */
    Task* GetIdleTask();
    int GetTaskSize();
private:
    int m_poolSize; //任务池大小
    int m_taskListSize; // 统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加
    bool m_bStop; //是否停止
    std::list<Task*> m_taskList;//所有待处理任务列表
    std::list<Task*> m_idleList;//所有空闲任务列表
    pthread_mutex_t m_lock; //对任务列表进行加锁,保证每次只能取一个任务
    pthread_mutex_t m_idleMutex; //空闲任务队列锁
    pthread_cond_t m_idleCond; //空闲队列等待条件
    pthread_t m_idleId;;
};
#endif

threadpool.cpp

代码如下:

/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)
 * date    @ 2014.01.03
 * author  @ haibin.wang
 */

#include "threadpool.h"
#include <errno.h>
#include <string.h>

/*
#include <iostream>
#include <stdio.h>
*/

Thread::Thread(bool detach, ThreadPool * pool)
    : m_pool(pool)
{
    pthread_attr_init(&m_attr);
    if(detach)
    {
        pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 让线程独立运行
    }
    else
    {
         pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
    }

    pthread_mutex_init(&m_mutex, NULL); //初始化互斥量
    pthread_cond_init(&m_cond, NULL); //初始化条件变量
    task.fun = 0;
    task.data = NULL;
}

Thread::~Thread()
{
    pthread_cond_destroy(&m_cond);
    pthread_mutex_destroy(&m_mutex);
    pthread_attr_destroy(&m_attr);
}

    ThreadPool::ThreadPool()
    : m_poolMax(0)
    , m_idleNum(0)
    , m_totalNum(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_mutex, NULL);
    pthread_mutex_init(&m_runMutex,NULL);
    pthread_mutex_init(&m_terminalMutex, NULL);
    pthread_cond_init(&m_terminalCond, NULL);
    pthread_cond_init(&m_emptyCond, NULL);
}

ThreadPool::~ThreadPool()
{
    /*if(!m_threads.empty())
    {
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it != m_threads.end(); ++it)
        {
            if(*it != NULL)
            {
                pthread_cond_destroy( &((*it)->m_cond) );
                pthread_mutex_destroy( &((*it)->m_mutex) );
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
    }*/
    pthread_mutex_destroy(&m_runMutex);
    pthread_mutex_destroy(&m_terminalMutex);
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_terminalCond);
    pthread_cond_destroy(&m_emptyCond);
}

int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
    if(poolMax < poolPre
            || poolPre < 0
            || poolMax <= 0)
    {
        return -1;
    }
    m_poolMax = poolMax;

    int iRet = 0;
    for(int i=0; i<poolPre; ++i)
    {
        Thread * thread = CreateThread();
        if(NULL == thread)
        {
            iRet = -2;
        }
    }

    if(iRet < 0)
    { 
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it!= m_threads.end(); ++it)
        {
            if(NULL != (*it) )
            {
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
        m_totalNum = 0;
    }
    return iRet;
}

void ThreadPool::GetThreadRun(task_fun fun, void* arg)
{
    //从线程池中获取一个线程
    pthread_mutex_lock( &m_mutex);
    if(m_threads.empty())
    {
        pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空闲线程
    }

    Thread * thread = m_threads.front();
    m_threads.pop_front();
    pthread_mutex_unlock( &m_mutex);

    pthread_mutex_lock( &thread->m_mutex );
    thread->task.fun = fun;
    thread->task.data = arg;       
    pthread_cond_signal(&thread->m_cond); //触发线程WapperFun循环执行
    pthread_mutex_unlock( &thread->m_mutex );
}

int ThreadPool::Run(task_fun fun, void * arg)
{
    pthread_mutex_lock(&m_runMutex); //保证每次只能由一个线程执行
    int iRet = 0;
    if(m_totalNum <m_poolMax) //
    {
        if(m_threads.empty() && (NULL == CreateThread()) )
        {
            iRet = -1;//can not create new thread!
        }
        else
        {
            GetThreadRun(fun, arg);
        }
    }
    else
    {
        GetThreadRun(fun, arg);
    }
    pthread_mutex_unlock(&m_runMutex);
    return iRet;
}

void ThreadPool::StopPool(bool bStop)
{
    m_bStop = bStop;
    if(bStop)
    {
        //启动监控所有空闲线程是否退出的线程
        Thread thread(false, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程
        //阻塞等待所有空闲线程退出
        pthread_join(thread.m_threadId, NULL);
    }
    /*if(bStop)
    {
        pthread_mutex_lock(&m_terminalMutex);
        //启动监控所有空闲线程是否退出的线程
        Thread thread(true, this);
        pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程
        //阻塞等待所有空闲线程退出
        pthread_cond_wait(&m_terminalCond, & m_terminalMutex);
        pthread_mutex_unlock(&m_terminalMutex);
    }*/
}

bool ThreadPool::GetStop()
{
    return m_bStop;
}

Thread * ThreadPool::CreateThread()
{
    Thread * thread = NULL;
    thread = new Thread(true, this);
    if(NULL != thread)
    {
        int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通过WapperFun将线程加入到空闲队列中
        if(0 != iret)
        {
            delete thread;
            thread = NULL;
        }
    }
    return thread;
}

void * ThreadPool::WapperFun(void*arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    pool->IncreaseTotalNum();
    struct timespec abstime;
    memset(&abstime, 0, sizeof(abstime));
    while(1)
    {
        if(0 != thread->task.fun)
        {
            thread->task.fun(thread->task.data);
        }

        if( true == pool->GetStop() ) 
        {
            break; //确定当前任务执行完毕后再判定是否退出线程
        }
        pthread_mutex_lock( &thread->m_mutex );
        pool->SaveIdleThread(thread); //将线程加入到空闲队列中
        abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
        abstime.tv_nsec = 0;
        if(ETIMEDOUT  == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待线程被唤醒 或超时自动退出
        {
            pthread_mutex_unlock( &thread->m_mutex );
            break;
        }
        pthread_mutex_unlock( &thread->m_mutex );
    }

    pool->LockMutex();
    pool->DecreaseTotalNum();
    if(thread != NULL)
    {
        pool->RemoveThread(thread);
        delete thread;
        thread = NULL;
    }
    pool->UnlockMutex();
    return 0;
}

void ThreadPool::SaveIdleThread(Thread * thread )
{
    if(thread)
    {
        thread->task.fun = 0;
        thread->task.data = NULL;
        LockMutex();
        if(m_threads.empty())
        {
            pthread_cond_broadcast(&m_emptyCond); //发送不空的信号,告诉run函数线程队列已经不空了
        }
        m_threads.push_front(thread);
        UnlockMutex();
    }
}

int ThreadPool::TotalThreads()
{
    return m_totalNum;
}


void ThreadPool::SendSignal()
{
    LockMutex();
    std::list<Thread*>::iterator it = m_threads.begin();
    for(; it!= m_threads.end(); ++it)
    {
        pthread_mutex_lock( &(*it)->m_mutex );
        pthread_cond_signal(&((*it)->m_cond));
        pthread_mutex_unlock( &(*it)->m_mutex );
    }
    UnlockMutex();
}

void * ThreadPool::TerminalCheck(void* arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    while((false == pool->GetStop()) || pool->TotalThreads() >0 )
    {
        pool->SendSignal();

        usleep(IDLE_CHECK_POLL_EMPTY);
    }
    //pool->TerminalCondSignal();
    return 0;
}

void ThreadPool::TerminalCondSignal()
{
    pthread_cond_signal(&m_terminalCond);
}

void ThreadPool::RemoveThread(Thread* thread)
{
    m_threads.remove(thread);
}

void ThreadPool::LockMutex()
{
    pthread_mutex_lock( &m_mutex);
}

void ThreadPool::UnlockMutex()
{
    pthread_mutex_unlock( &m_mutex );
}

void ThreadPool::IncreaseTotalNum()
{
    LockMutex();
    m_totalNum++;
    UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
    m_totalNum--;
}

threadpool.h

代码如下:

#ifndef THREADPOOL_H
#define THREADPOOL_H
/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a
 *          当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include <list>
#include <string>
#include "taskpool.h"
//通过threadmanager来控制任务调度进程
//threadpool的TerminalCheck线程负责监测线程池所有线程退出


class ThreadPool;
class Thread
{
public:
    Thread(bool detach, ThreadPool * pool);
    ~Thread();
    pthread_t  m_threadId; //线程id
    pthread_mutex_t m_mutex; //互斥锁
    pthread_cond_t m_cond; //条件变量
    pthread_attr_t m_attr; //线程属性
 Task  task; //
    ThreadPool * m_pool; //所属线程池
};

//线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中
class ThreadPool
{
public:
    ThreadPool();
    ~ThreadPool();

    /* pur @ 初始化线程池
     * para @ poolMax 线程池最大线程数
     * para @ poolPre 预创建线程数
     * return @ 0:成功
     *          -1: parameter error, must poolMax > poolPre >=0
     *          -2: 创建线程失败
    */
    int InitPool(const int & poolMax, const int & poolPre);

    /* pur @ 执行一个任务
     * para @ task 任务指针
     * return @ 0任务分配成功,负值 任务分配失败,-1,创建新线程失败
    */
    int Run(task_fun fun, void* arg);

 /* pur @ 设置是否停止线程池工作
     * para @ bStop true停止,false不停止
    */
 void StopPool(bool bStop);

public: //此公有函数主要用于静态函数调用
    /* pur @ 获取进程池的启停状态
     * return @
    */
    bool GetStop();   
 void SaveIdleThread(Thread * thread );
    void LockMutex();
    void UnlockMutex();
    void DecreaseTotalNum();
    void IncreaseTotalNum();
    void RemoveThread(Thread* thread);
    void TerminalCondSignal();
    int TotalThreads();
    void SendSignal();
private:
 /* pur @ 创建线程
     * return @ 非空 成功,NULL失败,
    */
 Thread * CreateThread();

    /* pur @ 从线程池中获取一个一个线程运行任务
     * para @ fun 函数指针
     * para @ arg 函数参数
     * return @
    */
    void GetThreadRun(task_fun fun, void* arg);

 static void * WapperFun(void*);
 static void * TerminalCheck(void*);//循环监测是否所有线程终止线程

private:
    int m_poolMax;//线程池最大线程数
    int m_idleNum; //空闲线程数
    int m_totalNum; //当前线程总数 小于最大线程数 
 bool m_bStop; //是否停止线程池
 pthread_mutex_t m_mutex; //线程列表锁
 pthread_mutex_t m_runMutex; //run函数锁

    pthread_mutex_t m_terminalMutex; //终止所有线程互斥量
    pthread_cond_t  m_terminalCond; //终止所有线程条件变量
    pthread_cond_t  m_emptyCond; //空闲线程不空条件变量

    std::list<Thread*> m_threads; // 线程列表
};
#endif

threadpoolmanager.cpp

代码如下:

#include "threadpoolmanager.h"
#include "threadpool.h"
#include "taskpool.h"

#include <errno.h>
#include <string.h>

/*#include <string.h>
#include <sys/time.h>
#include <stdio.h>*/
 //   struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager()
    : m_threadPool(NULL)
    , m_taskPool(NULL)
    , m_bStop(false)
{
    pthread_mutex_init(&m_mutex_task,NULL);
    pthread_cond_init(&m_cond_task, NULL);

   /* memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);*/
}

ThreadPoolManager::~ThreadPoolManager()
{
    StopAll();
    if(NULL != m_threadPool)
    {
        delete m_threadPool;
        m_threadPool = NULL;
    }
    if(NULL != m_taskPool)
    {
        delete m_taskPool;
        m_taskPool = NULL;
    }

    pthread_cond_destroy( &m_cond_task);
    pthread_mutex_destroy( &m_mutex_task );

    /*gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    printf("manager total time = %d\n", total);
    gettimeofday(&time_beg, NULL);*/
}

int ThreadPoolManager::Init(
        const int &tastPoolSize,
        const int &threadPoolMax,
        const int &threadPoolPre)
{
    m_threadPool = new ThreadPool();
    if(NULL == m_threadPool)
    {
        return -1;
    }
    m_taskPool = new TaskPool(tastPoolSize);
    if(NULL == m_taskPool)
    {
        return -2;
    }

    if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
    {
        return -3;
    }
    //启动线程池
    //启动任务池
    //启动任务获取线程,从任务池中不断拿任务到线程池中
    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
    pthread_create(&m_taskThreadId, &attr, TaskThread, this); //创建获取任务进程
    pthread_attr_destroy(&attr);
    return 0;
}

void ThreadPoolManager::StopAll()
{
    m_bStop = true;
    LockTask();
    pthread_cond_signal(&m_cond_task);
    UnlockTask();
    pthread_join(m_taskThreadId, NULL);
    //等待当前所有任务执行完毕
    m_taskPool->StopPool();
    m_threadPool->StopPool(true); // 停止线程池工作
}

void ThreadPoolManager::LockTask()
{
    pthread_mutex_lock(&m_mutex_task);
}

void ThreadPoolManager::UnlockTask()
{
    pthread_mutex_unlock(&m_mutex_task);
}

void* ThreadPoolManager::TaskThread(void* arg)
{
    ThreadPoolManager * manager = (ThreadPoolManager*)arg;
    while(1)
    {
        manager->LockTask(); //防止任务没有执行完毕发送了停止信号
        while(1) //将任务队列中的任务执行完再退出
        {
            Task * task = manager->GetTaskPool()->GetTask();
            if(NULL == task)
            {
                break;
            }
            else
            {
                manager->GetThreadPool()->Run(task->fun, task->data);
                manager->GetTaskPool()->SaveIdleTask(task);
            }
        }

        if(manager->GetStop())
        {
            manager->UnlockTask();
            break;
        }
        manager->TaskCondWait(); //等待有任务的时候执行
        manager->UnlockTask();
    }
    return 0;
}

ThreadPool * ThreadPoolManager::GetThreadPool()
{
    return m_threadPool;
}

TaskPool * ThreadPoolManager::GetTaskPool()
{
    return m_taskPool;
}

int  ThreadPoolManager::Run(task_fun fun,void* arg)
{
    if(0 == fun)
    {
        return 0;
    }
    if(!m_bStop)
    {  
        int iRet =  m_taskPool->AddTask(fun, arg);

        if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )
        {
            pthread_cond_signal(&m_cond_task);
            UnlockTask();
        }
        return iRet;
    }
    else
    {
        return -3;
    }
}

bool ThreadPoolManager::GetStop()
{
    return m_bStop;
}

void ThreadPoolManager::TaskCondWait()
{
    struct timespec to;
    memset(&to, 0, sizeof to);
    to.tv_sec = time(0) + 60;
    to.tv_nsec = 0;

    pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超时
}

threadpoolmanager.h

代码如下:

#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
 *      基本流程:
 *          管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中
 *      基本功能:
 *          1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程
 *          2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)
 *          3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)
 *      线程资源:
 *          如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定
 *          当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁
 *          线程最大数为:1个TaskPool线程 + 1个manager任务调度线程 + ThreadPool最大线程数 + 1个manager退出监控线程 + 1线程池所有线程退出监控线程
 *          线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程 + 1个manager创建任务调度线程
 *      使用方法:
 *          ThreadPoolManager manager;
 *          manager.Init(100000, 50, 5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器
 *          manager.run(fun, data); //添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL
 *
 * date    @ 2013.12.23
 * author  @ haibin.wang
 *
 *  详细参数控制可以修改commondef.h中的相关变量值
 */

#include <pthread.h>
typedef void (*task_fun)(void *);

class ThreadPool;
class TaskPool;

class ThreadPoolManager
{
public:
    ThreadPoolManager();
    ~ThreadPoolManager();

    /* pur @ 初始化线程池与任务池,threadPoolMax > threadPoolPre > threadPoolMin >= 0
     * para @ tastPoolSize 任务池大小
     * para @ threadPoolMax 线程池最大线程数
     * para @ threadPoolPre 预创建线程数
     * return @ 0:初始化成功,负数 初始化失败
     *          -1:创建线程池失败
     *          -2:创建任务池失败
     *          -3:线程池初始化失败
    */
    int Init(const int &tastPoolSize,
            const int &threadPoolMax,
            const int &threadPoolPre);

    /* pur @ 执行一个任务
     * para @ fun 需要执行的函数指针
     * para @ arg fun需要的参数,默认为NULL
     * return @ 0 任务分配成功,负数 任务分配失败
     *          -1:任务池满
     *          -2:任务池new失败
     *          -3:manager已经发送停止信号,不再接收新任务
    */
    int Run(task_fun fun,void* arg=NULL);

public: //以下public函数主要用于静态函数调用
    bool GetStop();
    void TaskCondWait();
    TaskPool * GetTaskPool();
    ThreadPool * GetThreadPool();
    void LockTask();
    void UnlockTask();
    void LockFull();

private:
 static void * TaskThread(void*); //任务处理线程
 void StopAll();

private:
    ThreadPool *m_threadPool; //线程池
    TaskPool * m_taskPool; //任务池
    bool m_bStop; // 是否终止管理器

    pthread_t m_taskThreadId; // TaskThread线程id
 pthread_mutex_t m_mutex_task;
    pthread_cond_t m_cond_task;
};
#endif

main.cpp

代码如下:

#include <iostream>
#include <string>
#include "threadpoolmanager.h"
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>


using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
    pthread_mutex_lock(&m_mutex);
    seq++;
    if(seq%inter == 0 )
    {
        cout << "fun 1=" << seq << endl;
    }
    if(seq>=1000000000)
    {
        cout << "billion" << endl;
        seq = 0;
        billNum++;
    }
    pthread_mutex_unlock(&m_mutex);
    //sleep();
}

int main(int argc, char** argv)
{
    if(argc != 6)
    {
        cout << "必须有5个参数 任务执行次数 任务池大小 线程池大小 预创建线程数 输出间隔" << endl;
        cout << "eg: ./test 999999 10000 100 10 20" << endl;
        cout << "上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999" << endl;
        return 0;
    }
    double loopSize = atof(argv[1]);
    int taskSize = atoi(argv[2]);
    int threadPoolSize = atoi(argv[3]);
    int preSize = atoi(argv[4]);
    inter = atoi(argv[5]);

    pthread_mutex_init(&m_mutex,NULL);
    ThreadPoolManager manager;
    if(0>manager.Init(taskSize,  threadPoolSize, preSize))
    {
        cout << "初始化失败" << endl;
        return 0;
    }
    cout << "*******************初始化完成*********************" << endl;
    struct timeval time_beg, time_end;
    memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);
    double i=0;
    for(; i<loopSize; ++i)
    {
        while(0>manager.Run(myFunc,NULL))
        {
            usleep(100);
        }
    }
    gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    cout << "total time =" << total << endl;
    cout << "total num =" << i  << " billion num=" << billNum<< endl;
    cout << __FILE__ << "将关闭所有线程" << endl;
    //pthread_mutex_destroy(&m_mutex);
    return 0;
}

精彩图集

赞助商链接