title: C++ 语言实现动态变化的线程池 date: 2021-10-13 21:52:28 categories: 学习&开发&实现 tag:
Job 作为任务的类型
class Job {
void *data;
function<void(void *data)> func;
public:
Job(void *data, function<void(void *data)> func);
void exec();
};
其中定义两个变量,data
,和 func
。
func
用来保存需要调用的方法,当执行任务时,调用此函数即可。考虑到需要传递参数的可能,所以定义参数为一个指针,而另一个变量 data 则为需要传递给 func 的参数指针
函数的实现为
Job::Job(void *data, function<void(void *)> func) {
this->data = data;
this->func = move(func);
}
void Job::exec() {
func(data);
}
class ThreadPool {
private:
Mutex<map<pthread_t, thread *>> threadPool; // 线程池
Mutex<queue<Job *>> enqueue; // 任务队列
Mutex<vector<thread *>> deathThread; // 已经死亡的线程
Mutex<int> needKill; // 需要杀死的线程数量
condition_variable noTaskCv; // 无任务时的条件信号量
mutex noTaskCvMutex; // 无任务的条件信号量的锁
int maxCore; // 核心线程数
bool killed; // 已经终止了
Job *takeJob(); // 获取一个任务
virtual void addThread(); // 添加一个线程
void clean(); // 清理所有死亡的线程
public:
explicit ThreadPool(int core); // 构造函数
void submit(Job *job); // 提交任务,需要提交一个指针类型,且不需要主动 delete,当任务完成后,会被线程池 delete 掉
int getAccumulation(); // 获取当前堆积任务数量
void updateCore(int newCount); // 更新核心线程数,若增加则会新增线程,若减少则会在空闲时间关闭部分线程
void wait(); // 设定线程池为终止,不再可以提交任务,并等待所有任务完成
void close(); // 强制关闭线程池,不等待任务完成
};
建议通过 wait 来实现终止线程池
```cpp ThreadPool::ThreadPool(int core) : maxCore(core), killed(false), needKill(0) { for (int i = 0; i < core; ++i) addThread(); }
Job ThreadPool::takeJob() { Job cur = nullptr; enqueue.run(& { if (q.empty()) return; cur = q.front(); q.pop(); }); return cur; }
void ThreadPool::addThread() { auto work = & { while (true) { Job *cur = takeJob(); if (cur != nullptr) { cur->exec(); delete cur; } else { bool dead = false; needKill.run(& { if (count <= 0) return; dead = true; count--; }); if (dead) break;
clean();
unique_lock<mutex> lk(noTaskCvMutex);
noTaskCv.wait(lk);
}
}
threadPool.run([&](map<pthread_t, thread *> &data) {
auto iter = data.find(pthread_self());
deathThread.run([&](vector<thread *> &data) {
data.push_back(iter->second);
});
data.erase(iter);
});
};
auto *newThread = new thread(work);
threadPool.run([&](map<pthread_t, thread *> &data) {
data.insert({newThread->native_handle(), newThread});
});
}
void ThreadPool::clean() { if (deathThread.get().empty()) return; deathThread.run(& { for (auto &item: data) delete item; }); }
void ThreadPool::submit(Job job) { if (killed) return; enqueue.run([&](queue<Job > &q) { q.push(job); noTaskCv.notify_one(); }); }
int ThreadPool::getAccumulation() { return (int) enqueue.get().size(); }
void ThreadPool::updateCore(int newCount) { if (killed) return; needKill.run(& { if (newCount > this->maxCore) for (int i = this->maxCore; i < newCount; ++i) addThread(); else { cleaned += this->maxCore - newCount; noTaskCv.notify_all(); } }); }
void ThreadPool::wait() { updateCore(0); killed = true; map<pthread_t, thread *> tmp = threadPool.get(); for (auto &item: tmp) item.second->join(); }
void ThreadPool::close() { killed = true; map<pthread_t, thread *> tmp = threadPool.get(); for (auto &item: tmp) { pthread_kill(item.first, SIGKILL); delete item.second; } } ```