网站建设课程教学改革,上海网站开发方案,wordpress+左侧导航,服装网站案例文章目录 Linux线程互斥进程线程间的互斥相关背景概念互斥量mutex模拟抢票代码 互斥量的接口初始化互斥量销毁互斥量互斥量加锁和解锁改进模拟抢票代码#xff08;加锁#xff09;小结对锁封装 lockGuard.hpp 互斥量实现原理探究可重入VS线程安全概念常见的线程不安全的情况常… 文章目录 Linux线程互斥进程线程间的互斥相关背景概念互斥量mutex模拟抢票代码 互斥量的接口初始化互斥量销毁互斥量互斥量加锁和解锁改进模拟抢票代码加锁小结对锁封装 lockGuard.hpp 互斥量实现原理探究可重入VS线程安全概念常见的线程不安全的情况常见的线程安全的情况常见不可重入的情况常见可重入的情况可重入与线程安全联系可重入与线程安全区别 常见锁概念死锁死锁四个必要条件避免死锁避免死锁算法 Linux线程同步条件变量同步概念与竞态条件 条件变量函数初始化销毁等待条件满足唤醒等待案例 生产者消费者模型基于BlockingQueue的生产者消费者模型 C queue模拟阻塞队列的生产消费模型blockqueue.hppTask.hppTask.hpp改进版本main.cc POSIX信号量初始化信号量销毁信号量等待信号量发布信号量 基于环形队列的生产消费模型代码验证makefileRingQueue.hppTask.hppMain.cc 线程池makefileThreadPool.hppMain.cpp简单封装一下pthread库 STL、智能指针和线程安全STL中的容器不是线程安全的智能指针是否是线程安全的常见的锁 读者写者问题代码实现的较为简单的读者写者问题 Linux线程互斥
进程线程间的互斥相关背景概念
临界资源多线程执行流共享的资源就叫做临界资源临界区每个线程内部访问临界资源的代码就叫做临界区互斥任何时刻互斥保证有且只有一个执行流进入临界区访问临界资源通常对临界资源起保护作用原子性后面讨论如何实现不会被任何调度机制打断的操作该操作只有两态要么完成要么未完成 互斥量mutex
大部分情况线程使用的数据都是局部变量变量的地址空间在线程栈空间内这种情况变量归属单个 线程其他线程无法获得这种变量。但有时候很多变量都需要在线程间共享这样的变量称为共享变量可以通过数据的共享完成线程之 间的交互。多个线程并发的操作共享变量会带来一些问题。 模拟抢票代码
我们看下面这段代码
#define NUM 4// 线程名字
class ThreadData
{
public:ThreadData(int num){threadname thread- to_string(num);}public:string threadname;
};int tickets 1000;// 买票操作
void *getTicket(void *args)
{ThreadData *td static_castThreadData *(args);const char *name td-threadname.c_str();while (true){if (tickets 0){usleep(1000);printf(who%s, get a ticket: %d\n, name, tickets); // ?tickets--;}elsebreak;}printf(%s ... quit\n, name);return nullptr;
}int main()
{vectorpthread_t tids;vectorThreadData * thread_datas;for (int i 1; i NUM; i){pthread_t tid;ThreadData *td new ThreadData(i);thread_datas.push_back(td);pthread_create(tid, nullptr, getTicket, thread_datas[i - 1]);tids.push_back(tid);}// 等待for (auto e : tids){pthread_join(e, nullptr);}// 释放for (auto e : thread_datas){delete e;}return 0;
}造成了数据不一致问题肯定是和多线程并发有关系 一个全局变量进行多线程并发或者--操作是否安全–不安全 if 语句判断条件为真以后代码可以并发的切换到其他线程 usleep 这个模拟漫长业务的过程在这个漫长的业务过程中可能有很多个线程会进入该代码段 --ticket 操作本身就不是一个原子操作 取出ticket--部分的汇编代码
objdump -d mythread test.objdump14e6: 8b 05 24 5b 00 00 mov 0x5b24(%rip),%eax # 7010 tickets
14ec: 83 e8 01 sub $0x1,%eax
14ef: 89 05 1b 5b 00 00 mov %eax,0x5b1b(%rip) # 7010 tickets-- 操作并不是原子操作而是对应三条汇编指令
load 将共享变量ticket从内存加载到寄存器中update : 更新寄存器里面的值执行-1操作store 将新值从寄存器写回共享变量ticket的内存地址
要解决以上问题需要做到三点
代码必须要有互斥行为当代码进入临界区执行时不允许其他线程进入该临界区。如果多个线程同时要求执行临界区的代码并且临界区没有线程在执行那么只能允许一个线程进入该临界区。如果线程不在临界区中执行那么该线程不能阻止其他线程进入临界区。 要做到这三点本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。 互斥量的接口
在Ubuntu下查看需要先安装手册
sudo apt-get install manpages-posix-dev初始化互斥量
#include pthread.h
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;第一个是动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);第二个是静态分配
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;销毁互斥量
#include pthread.hint pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;销毁互斥量需要注意
使用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁不要销毁一个已经加锁的互斥量已经销毁的互斥量要确保后面不会有线程再尝试加锁
互斥量加锁和解锁
#include pthread.hint pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);调用 pthread_ lock 时可能会遇到以下情况:
互斥量处于未锁状态该函数会将互斥量锁定同时返回成功发起函数调用时其他线程已经锁定互斥量或者存在其他线程同时申请互斥量但没有竞争到互斥量 那么pthread_ lock调用会陷入阻塞(执行流被挂起)等待互斥量解锁。
加锁的本质是用时间来换取安全 加锁的表现线程对于临界区代码串执行 加锁原则尽量的要保证临界区代码越少越好 改进模拟抢票代码加锁
然后就可以使用上面的性质来进行改进模拟抢票代码
#define NUM 4
pthread_mutex_t lock;// 线程名字
class ThreadData
{
public:ThreadData(int num, pthread_mutex_t *mutex){lock mutex; // 初始化锁threadname thread- to_string(num);}public:string threadname;pthread_mutex_t *lock; // 定义一个锁指针
};int tickets 1000;// 买票操作
void *getTicket(void *args)
{ThreadData *td static_castThreadData *(args);const char *name td-threadname.c_str();while (true){// 加锁 --pthread_mutex_lock(td-lock); // 申请锁成功才能往后执行不成功阻塞等待。if (tickets 0){usleep(1000);printf(who%s, get a ticket: %d\n, name, tickets);tickets--;// 解锁 --pthread_mutex_unlock(td-lock);}else{// 解锁 -- 在else执行流要注意break之前要解锁pthread_mutex_unlock(td-lock);break;}usleep(13); // 执行得到票之后的后续动作如果不加usleep会导致 饥饿问题}printf(%s ... quit\n, name);return nullptr;
}int main()
{vectorpthread_t tids;vectorThreadData * thread_datas;// 初始化锁pthread_mutex_init(lock, nullptr);for (int i 1; i NUM; i){pthread_t tid;ThreadData *td new ThreadData(i, lock);thread_datas.push_back(td);pthread_create(tid, nullptr, getTicket, thread_datas[i - 1]);tids.push_back(tid);}// 等待for (auto e : tids){pthread_join(e, nullptr);}// 释放for (auto e : thread_datas){delete e;}return 0;
}或者使用全局初始化也可以
#define NUM 4// 直接初始化锁
pthread_mutex_t lock PTHREAD_MUTEX_INITIALIZER;// 线程名字
class ThreadData
{
public:ThreadData(int num){threadname thread- to_string(num);}public:string threadname;
};int tickets 1000;// 买票操作
void *getTicket(void *args)
{ThreadData *td static_castThreadData *(args);const char *name td-threadname.c_str();while (true){pthread_mutex_lock(lock);if (tickets 0){usleep(1000);printf(who%s, get a ticket: %d\n, name, tickets);tickets--;// 解锁 --pthread_mutex_unlock(lock);}else{// 解锁 --pthread_mutex_unlock(lock);break;}usleep(13); // 执行得到票之后的后续动作如果不加usleep会导致 饥饿问题}printf(%s ... quit\n, name);return nullptr;
}int main()
{vectorpthread_t tids;vectorThreadData * thread_datas;// 初始化锁pthread_mutex_init(lock, nullptr);for (int i 1; i NUM; i){pthread_t tid;ThreadData *td new ThreadData(i);thread_datas.push_back(td);pthread_create(tid, nullptr, getTicket, thread_datas[i - 1]);tids.push_back(tid);}// 等待for (auto e : tids){pthread_join(e, nullptr);}// 释放for (auto e : thread_datas){delete e;}return 0;
}如果纯互斥环境如果锁分配不够合理容易被其他线程的饥饿问题不是说只要有互斥必有饥饿适合纯互斥的场景就用互斥如果是按照一定的顺序性获取资源就叫同步 其中在临界区中线程可以被切换吗 当然可以 在线程被切出去的时候是持有锁走的不在临界区之间照样没有线程可以进入临界区访问临界资源 对于其他线程来讲一个线程要么没有锁要么释放锁。 当线程访问临界资源区的过程对于其他线程是原子的。 小结
所以加锁的范围粒度一定要小任何线程要进行抢票都得先申请锁原则上不应该有例外所有线程申请锁前提是所有线程都得看到这把锁锁本身也是共享资源加锁的过程必须是原子的原子性要么不做要做就做完没有中间状态就是原子性如果线程申请锁失败了我的线程要被阻塞如果线程申请成功了继续向后运行如果线程申请锁成功了执行临界区的代码了执行临界区的代码是可以被切换的其他线程无法进入因为被切换了但是没有释放可以放心的执行完毕没有任何线程能打扰。
结论所有对于其他线程要么没有申请锁要么释放了锁对于其他线程才有意义 什么是线程互斥为什么需要互斥 线程互斥指的是在多个线程间对临界资源进行争抢访问时有可能会造成数据二义因此通过保证同一时间只有一个线程能够访问临界资源的方式实现线程对临界资源的访问安全性 对锁封装 lockGuard.hpp
lockGuard.hpp
#pragma once
#include pthread.h
class Mutex
{public:Mutex(pthread_mutex_t *lock) : _lock(lock){}void Lock(){pthread_mutex_lock(_lock);}void Unlock(){pthread_mutex_unlock(_lock);}~Mutex(){}private:pthread_mutex_t *_lock;
};class LockGuard
{
public:LockGuard(pthread_mutex_t *lock) : _mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex _mutex;
};mythread.cc
void *getTicket(void *args)
{ThreadData *td static_castThreadData *(args);const char *name td-threadname.c_str();while (true){{ // 对锁进行临时对象控制// 定义了一个临时的锁对象LockGuard lockGuard(lock); // RAII风格的锁if (tickets 0){usleep(1000);printf(who%s, get a ticket: %d\n, name, tickets);tickets--;}elsebreak;}usleep(13); // 执行得到票之后的后续动作如果不加usleep会导致 饥饿问题}printf(%s ... quit\n, name);return nullptr;
}互斥量实现原理探究
经过上面的例子已经意识到单纯的 i 或者 i 都不是原子的有可能会有数据一致性问题为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台访问内存的 总线周期也有先后一 个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下 CPU的寄存器只有一套被所有的线程共享但是寄存器里面的数据属于执行流的上下文属于执行流私有的数据。CPU在执行代码的时候一定要有对应的执行载体线程和进程数据在内存中没被所有线程共享
结论把数据从内存移动到CPU寄存器中本质是把数据从共享变成线程私有 竞争锁本质是在谁先把xchgb做完 mutex简单理解就是一个0/1的计数器用于标记资源访问状态 0表示已经有执行流加锁成功资源处于不可访问1表示未加锁资源可访问。
可重入VS线程安全
概念
线程安全多个线程并发同一段代码时不会出现不同的结果。常见对全局变量或者静态变量进行操作 并且没有锁保护的情况下会出现该问题。重入同一个函数被不同的执行流调用当前一个流程还没有执行完就有其他的执行流再次进入我们 称之为重入。一个函数在重入的情况下运行结果不会出现任何不同或者任何问题则该函数被称为可重 入函数否则是不可重入函数。
常见的线程不安全的情况
不保护共享变量的函数函数状态随着被调用状态发生变化的函数返回指向静态变量指针的函数调用线程不安全函数的函数
常见的线程安全的情况
每个线程对全局变量或者静态变量只有读取的权限而没有写入的权限一般来说这些线程是安全的类或者接口对于线程来说都是原子操作多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况
调用了malloc/free函数因为malloc函数是用全局链表来管理堆的调用了标准I/O库函数标准I/O库的很多实现都以不可重入的方式使用全局数据结构可重入函数体内使用了静态的数据结构
常见可重入的情况
不使用全局变量或静态变量不使用用malloc或者new开辟出的空间不调用不可重入函数 不返回静态或全局数据所有数据都有函数的调用者提供使用本地数据或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系
函数是可重入的那就是线程安全的函数是不可重入的那就不能由多个线程使用有可能引发线程安全问题如果一个函数中有全局变量那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别
可重入函数是线程安全函数的一种线程安全不一定是可重入的而可重入函数则一定是线程安全的。如果将对临界资源的访问加上锁则这个函数是线程安全的但如果这个重入函数若锁还未释放则会产生死锁因此是不可重入的。 常见锁概念
死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
死锁四个必要条件
互斥条件一个资源每次只能被一个执行流使用请求与保持条件一个执行流因请求资源而阻塞时对已获得的资源保持不放不剥夺条件一个执行流已获得的资源在末使用完之前不能强行剥夺循环等待条件若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
破坏死锁的四个必要条件加锁顺序一致避免锁未释放的场景资源一次性分配
避免死锁算法
死锁检测算法银行家算法
Linux线程同步
条件变量
当一个线程互斥地访问某个变量时它可能发现在其它线程改变状态之前它什么也做不了。例如一个线程访问队列时发现队列为空它只能等待只到其它线程将一个节点添加到队列中。这种情 况就需要用到条件变量。
同步概念与竞态条件
同步在保证数据安全的前提下让线程能够按照某种特定的顺序访问临界资源从而有效避免饥饿问题叫做同步竞态条件因为时序问题而导致程序异常我们称之为竞态条件。在线程场景下这种问题也不难理解
条件变量函数
初始化
#include pthread.hint pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
pthread_cond_t cond PTHREAD_COND_INITIALIZER;cond要初始化的条件变量attrNULL 销毁
int pthread_cond_destroy(pthread_cond_t *cond);等待条件满足
#include pthread.hint pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);cond要在这个条件变量上等待mutex互斥量后面详细解释 唤醒等待
#include pthread.hint pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);案例
#include iostream
#include unistd.h
#include pthread.h// 临界资源
int cnt 0;// 初始化
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond PTHREAD_COND_INITIALIZER;void *Count(void *args)
{uint64_t number (uint64_t)args;std::cout pthread: number create sucess! std::endl;while (true){// 加锁pthread_mutex_lock(mutex);// 必须先加锁后等待pthread_cond_wait(cond, mutex);std::cout pthread: number , cnt: cnt std::endl;// 解锁pthread_mutex_unlock(mutex);}// 分离线程pthread_detach(pthread_self());
}int main()
{for (uint64_t i 0; i 5; i){pthread_t tid;pthread_create(tid, nullptr, Count, (void *)i); // 注意这里细节不能传i的地址usleep(1000); // 创建出来的线程打印就保持一致了}sleep(3);std::cout main thread ctrl begin: std::endl;while (true){sleep(1);// 唤醒单个线程pthread_cond_signal(cond);std::cout signal thread... std::endl;}return 0;
}也还可以唤醒多个线程
#include iostream
#include unistd.h
#include pthread.h// 临界资源
int cnt 0;// 初始化
pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond PTHREAD_COND_INITIALIZER;void *Count(void *args)
{uint64_t number (uint64_t)args;std::cout pthread: number create sucess! std::endl;while (true){// 加锁pthread_mutex_lock(mutex);// 你怎么知道临界资源是就绪还是不就绪的你判断出来的判断是访问临界资源吗必须是的也就是判断必须在加锁之后// 必须先加锁后等待pthread_cond_wait(cond, mutex); // pthread_cond_wait让线程等待的时候会自动释放锁std::cout pthread: number , cnt: cnt std::endl;// 解锁pthread_mutex_unlock(mutex);}// 分离线程pthread_detach(pthread_self());
}int main()
{for (uint64_t i 0; i 5; i){pthread_t tid;pthread_create(tid, nullptr, Count, (void *)i); // 注意这里细节不能传i的地址usleep(1000); // 创建出来的线程打印就保持一致了}sleep(3);std::cout main thread ctrl begin: std::endl;while (true){sleep(1);// 唤醒多个线程pthread_cond_broadcast(cond);std::cout boardcast thread... std::endl;}return 0;
}为什么 pthread_cond_wait需要互斥量?
条件等待是线程间同步的一种手段如果只有一个线程条件不满足一直等下去都不会满足所以必须要有一个线程通过某些操作改变共享变量使原先不满足的条件变得满足并且友好的通知等待在条件变量上的线程。条件不会无缘无故的突然变得满足了必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。 按照上面的说法我们设计出如下的代码先上锁发现条件不满足解锁然后等待在条件变量上不就行了
由于解锁和等待不是原子操作。调用解锁之后 pthread_cond_wait 之前如果已经有其他线程获取到 互斥量摒弃条件满足发送了信号那么 pthread_cond_wait 将错过这个信号可能会导致线程永远 阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex);进入该函数后 会去看条件量等于0不等于就把互斥量变成1直到cond_ wait返回把条件量改成1把互斥量恢复成原样。
生产者消费者模型 3种关系、2种角色、1个交易场所 3种关系生产者和生产者互斥消费者和消费者互斥生产者和消费者互斥 / 同步 2种角色生产者、消费者线程承担 1个交易场所内存中特定的一种内存结构。 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者 要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队 列就是用来给生产者和消费者解耦的。
优点
支持忙闲不闲生产和消费进行解耦支持并发 生产者vs生产者互斥消费者和消费者互斥生产者和消费者互斥同步
基于BlockingQueue的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于当队列为空时从队列获取元素的操作将会被阻塞直到队列中被放入了元素当队列满时往队列里存放元 素的操作也会被阻塞直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的线程在对阻塞队列进程 操作时会被阻塞) C queue模拟阻塞队列的生产消费模型
blockqueue.hpp
#include iostream
#include queuetemplate class T
class BlockQueue
{static const int defaultnum 5;public:BlockQueue(int maxcap defaultnum): maxcap_(maxcap){// 初始化锁pthread_mutex_init(mutex_, nullptr);pthread_cond_init(c_cond_, nullptr);pthread_cond_init(p_cond_, nullptr);}T Pop(){// 加锁pthread_mutex_lock(mutex_);while (q_.size() 0) // 因为判断临界资源调试是否满足也是在访问临界资源判断资源是否就绪是通过再临界资源内部判断的。{// 进行等待调用的时候自动释放锁因为唤醒而返回的时候重新持有锁pthread_cond_wait(c_cond_, mutex_);}// 取出队头数据进行处理T out q_.front();q_.pop();// 唤醒生产pthread_cond_signal(p_cond_);// 如果线程wait时被误唤醒了呢 --// pthread_cond_broadcast(c_cond_, mutex_); // 唤醒所有线程// 解锁pthread_mutex_unlock(mutex_);return out;}void Push(const T in){// 加锁pthread_mutex_lock(mutex_);while (q_.size() maxcap_) // 因为判断临界资源调试是否满足也是在访问临界资源判断资源是否就绪是通过再临界资源内部判断的。{pthread_cond_wait(p_cond_, mutex_);}// 入队q_.push(in);// 唤醒消费pthread_cond_signal(c_cond_);// 解锁pthread_mutex_unlock(mutex_);}// 析构~BlockQueue(){pthread_cond_destroy(c_cond_);pthread_cond_destroy(p_cond_);pthread_mutex_destroy(mutex_);}private:// 一个队列std::queueT q_;// 一个极值队列到达极值就不可入队int maxcap_;// 一个消费者一个生产者pthread_cond_t c_cond_;pthread_cond_t p_cond_;// 锁pthread_mutex_t mutex_;
};Task.hpp
#pragma once
#include iostream
#include stringstd::string opers -*/%;enum
{DivZero 1,ModZero,Unknown
};class Task
{
public:Task(int x, int y, char op): data1_(x), data2_(y), oper_(op){}void Run(){switch (oper_){case :result_ data1_ data2_;break;case -:result_ data1_ - data2_;break;case *:result_ data1_ * data2_;break;case /:{if (data2_ 0)exitcode_ DivZero;elseresult_ data1_ / data2_;}break;case %:{if (data2_ 0)exitcode_ ModZero;elseresult_ data1_ % data2_;}break;default:exitcode_ Unknown;break;}}void operator()(){Run();}std::string GetResult() // 这里可以使用stringstream{std::string r std::to_string(data1_);r oper_;r std::to_string(data2_);r ;r std::to_string(result_);r [code: ;r std::to_string(exitcode_);r ];return r;}std::string GetTask(){std::string r std::to_string(data1_);r oper_;r std::to_string(data2_);r ?;return r;}~Task(){}private:int data1_; // 操作数int data2_; // 操作数char oper_; // 操作符int result_; // 结果int exitcode_; // 错误码
};Task.hpp改进版本
我们也可以使用包装器和stringstream来改进一下
#pragma once
#include iostream
#include string
#include sstream
#include functional
#include mapstd::string opers -*/%;enum
{DivZero 1,ModZero,Unknown
};class Task
{
public:Task(int x, int y, char op): data1_(x), data2_(y), oper_(op){}void Run(){std::mapchar, std::functionvoid() CmdOp{{, [this](){ result_ data1_ data2_; }},{-, [this](){ result_ data1_ - data2_; }},{*, [this](){ result_ data1_ * data2_; }},{/, [this](){ if (data2_ 0) exitcode_ DivZero; else result_ data1_ / data2_; }},{%, [this](){ if (data2_ 0) exitcode_ ModZero; else result_ data1_ % data2_; }}};// auto it CmdOp.find(oper_);std::mapchar, std::functionvoid()::iterator it CmdOp.find(oper_);if (it ! CmdOp.end())it-second(); // 调用lambda函数elseexitcode_ Unknown; // 如果没有找到操作设置错误代码}void operator()(){Run();}std::string GetResult() // 这里可以使用stringstream{std::stringstream s;s data1_ oper_ data2_ result_ [code: exitcode_ ];std::string r;r s.str();return r;}std::string GetTask(){std::stringstream s;s data1_ oper_ data2_ ?;std::string r;r s.str();return r;}~Task(){}private:int data1_; // 操作数int data2_; // 操作数char oper_; // 操作符int result_; // 结果int exitcode_; // 错误码
};main.cc
#include blockqueue.hpp
#include Task.hpp
#include unistd.h
#include ctimestd::string TOHEX(pthread_t x)
{char buffer[128];snprintf(buffer, sizeof(buffer), %p, x);return buffer;
}void *Consumer(void *args)
{BlockQueueTask *bq static_castBlockQueueTask *(args);while (true){// 消费Task t bq-Pop();// t.Run(); // 之间调用t(); // 仿函数std::cout 处理任务: t.GetTask() 运算结果是 t.GetResult() thread id: TOHEX(pthread_self()) std::endl;// sleep(1);}
}void *Productor(void *args)
{int len opers.size();BlockQueueTask *bq static_castBlockQueueTask *(args);while (true){// 生产int data1 rand() % 10 1;int data2 rand() % 10 1;usleep(10);char op opers[rand() % len];Task t(data1, data2, op);bq-Push(t); // 入队std::cout 生产了一个任务: t.GetTask() thread id: TOHEX(pthread_self()) std::endl;sleep(1);}
}int main()
{srand(time(nullptr));// 在堆上上创建对象BlockQueueTask *bq new BlockQueueTask();pthread_t c[3], p[5];for (int i 0; i 3; i){pthread_create(c i, nullptr, Consumer, bq);}for (int i 0; i 5; i){pthread_create(p i, nullptr, Productor, bq);}for (int i 0; i 3; i){pthread_join(c[i], nullptr);}for (int i 0; i 5; i){pthread_join(p[i], nullptr);}delete bq;return 0;
}POSIX信号量 信号量的本质是一把计数器那么这把计数器的本质是什么 来描述资源数目的把资源是否就绪放在了临界区之外申请信号量时其实就间接的已经在做判断了 –P-原子的-申请资源 V-原子的-归还资源 信号量申请成功了就一定保证会拥有一部分临界资源吗 只要信号量申请成功就一定会获得指定的资源。 申请mutex只要拿到了锁就可以获得临界资源并且不担心被切换。 临界资源可以当成整体可不可以看成一小部分一小部分呢 结合场景一般是可以的
信号量 –p1-0 ---- 加锁 v: 0-1 ---- 释放锁 这样的叫做二元信号量 互斥锁
初始化信号量
#include semaphore.h
int sem_init(sem_t *sem, int pshared, unsigned int value);参数 pshared:0表示线程间共享非零表示进程间共享value信号量初始值
销毁信号量 int sem_destroy(sem_t *sem);等待信号量 功能等待信号量会将信号量的值减1 int sem_wait(sem_t *sem); //P()发布信号量 功能发布信号量表示资源使用完毕可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()基于环形队列的生产消费模型代码验证 环形队列中的生产者和消费者什么时候会访问同一个位置 当这两个同时指向同一个位置的时候只有满or空的时候互斥and同步 其它时候都指向的是不同的位置并发 因此操作的基本原则
① 空消费者不能超过生产者 --生产者先运行
② 满生产者不能把消费者套一个圈里继续再往后写入 --消费者先运行 谁来保证这个基本原则呢 信号量来保证。 生产者最关心的是什么资源 空间 消费者最关心的是什么资源 数据 怎么保证不同的线程访问的是临界资源中不同的区域呢 通过程序员编码保证
makefile
myRingQueue:Main.ccg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -rf myRingQueueRingQueue.hpp
#include iostream
#include pthread.h
#include unistd.h
#include vector
#include semaphore.h// 默认队列容量
const static int defaultcap 5;template class T
class RingQueue
{
private:void Lock(pthread_mutex_t mutex){pthread_mutex_lock(mutex);}void UnLock(pthread_mutex_t mutex){pthread_mutex_unlock(mutex);}// -void P(sem_t sem){sem_wait(sem);}// void V(sem_t sem){sem_post(sem);}public:RingQueue(int cap defaultcap): ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0){sem_init(cdata_sem_, 0, 0); // 消费者初始值为0sem_init(pspace_sem_, 0, cap); // 生产者初始值为容量pthread_mutex_init(c_mutex_, nullptr);pthread_mutex_init(p_mutex_, nullptr);}// 生产void Push(const T in){// 先申请空间--P(pspace_sem_); // P- V// 加锁Lock(p_mutex_);// 将数据存放ringqueue_[p_step_] in;p_step_;p_step_ % cap_;// 解锁UnLock(p_mutex_);// data资源V(cdata_sem_);}// 消费void Pop(T *out){// 申请资源--P(cdata_sem_);// 加锁Lock(c_mutex_);*out ringqueue_[c_step_];c_step_;c_step_ % cap_;// 解锁UnLock(c_mutex_);// 释放空间V(pspace_sem_);}~RingQueue(){sem_destroy(cdata_sem_);sem_destroy(pspace_sem_);pthread_mutex_destroy(c_mutex_);pthread_mutex_destroy(p_mutex_);}private:std::vectorT ringqueue_; // 队列int cap_; // 队列的容量int c_step_; // 消费者下标int p_step_; // 生产者下标sem_t cdata_sem_; // 消费者关注的数据资源sem_t pspace_sem_; // 生产者关注的空间资源pthread_mutex_t c_mutex_; // 消费者锁pthread_mutex_t p_mutex_; // 生产者锁
};Task.hpp
#pragma once
#include iostream
#include string
#include sstream
#include functional
#include mapstd::string opers -*/%;enum
{DivZero 1,ModZero,Unknown
};class Task
{
public:Task(){}Task(int x, int y, char op): data1_(x), data2_(y), oper_(op){}void Run(){std::mapchar, std::functionvoid() CmdOp{{, [this](){ result_ data1_ data2_; }},{-, [this](){ result_ data1_ - data2_; }},{*, [this](){ result_ data1_ * data2_; }},{/, [this](){ if (data2_ 0) exitcode_ DivZero; else result_ data1_ / data2_; }},{%, [this](){ if (data2_ 0) exitcode_ ModZero; else result_ data1_ % data2_; }}};// auto it CmdOp.find(oper_);std::mapchar, std::functionvoid()::iterator it CmdOp.find(oper_);if (it ! CmdOp.end())it-second(); // 调用lambda函数elseexitcode_ Unknown; // 如果没有找到操作设置错误代码}void operator()(){Run();}std::string GetResult() // 这里可以使用stringstream{std::stringstream s;s data1_ oper_ data2_ result_ [code: exitcode_ ];std::string r;r s.str();return r;}std::string GetTask(){std::stringstream s;s data1_ oper_ data2_ ?;std::string r;r s.str();return r;}~Task(){}private:int data1_; // 操作数int data2_; // 操作数char oper_; // 操作符int result_; // 结果int exitcode_; // 错误码
};Main.cc
#include iostream
#include pthread.h
#include ctime
#include RingQueue.hpp
#include Task.hppusing namespace std;// 创建的线程数量
#define ProductorNum 5
#define ConsumerNum 5struct ThreadData
{RingQueueTask *rq;std::string threadname;
};void *Productor(void *args)
{ThreadData *td static_castThreadData *(args);RingQueueTask *rq td-rq;std::string name td-threadname;int len opers.size();while (true){// 1. 获取数据int data1 rand() % 10 1;usleep(10);int data2 rand() % 10 1;char op opers[rand() % len];Task t(data1, data2, op);t();// 2. 生产数据rq-Push(t);cout Productor task done, task is : t.GetTask() who: name endl;sleep(1);}return nullptr;
}
void *Consumer(void *args)
{ThreadData *td static_castThreadData *(args);RingQueueTask *rq td-rq;std::string name td-threadname;while (true){// 消费数据Task t;rq-Pop(t);t(); // 处理数据cout Consumer get task, task is : t.GetTask() who: name result: t.GetResult() endl;// sleep(1);}return nullptr;
}int main()
{srand(time(nullptr) ^ getpid());RingQueueTask *rq new RingQueueTask(5);pthread_t c[ConsumerNum], p[ProductorNum];for (int i 0; i ProductorNum; i){ThreadData *td new ThreadData();td-rq rq;td-threadname Productor- std::to_string(i);pthread_create(p i, nullptr, Productor, td);}for (int i 0; i ConsumerNum; i){ThreadData *td new ThreadData();td-rq rq;td-threadname Consumer- std::to_string(i);pthread_create(c i, nullptr, Consumer, td);}for (int i 0; i ProductorNum; i){pthread_join(p[i], nullptr);}for (int i 0; i ConsumerNum; i){pthread_join(c[i], nullptr);}return 0;
}一个线程查看直观一些 多个线程一起跑打印就会显示的很乱 线程池
一种线程使用模式。线程过多会带来调度开销进而影响缓存局部性和整体性能。而线程池维护着多个线程等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景 ① 需要大量的线程来完成任务且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务使用线程池技术是非常合适的。因为单个任务小而任务数量巨大你可以想象一个热门网站的点击次数。 但对于长时间的任务比如一个Telnet连接请求线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 ② 对性能要求苛刻的应用比如要求服务器迅速响应客户请求。 ③ 接受突发性的大量请求但不至于使服务器因此产生大量线程的应用。突发性大量客户请求在没有线程池情况下将产生大量线程虽然理论上大部分操作系统线程数目最大值不是问题短时间内产生大量线程可能使内存到达极限出现错误。
线程池示例 创建固定数量线程池循环从任务队列中获取任务对象 获取到任务对象后执行任务对象中的任务接口 下列是基于单例模式懒汉模式来创建的线程池
makefile
ThreadPool:Main.cppg -o $ $^ -stdc11 -lpthread
.PHONY:clean
clean:rm -f ThreadPoolThreadPool.hpp
#pragma once
#include iostream
#include queue
#include vector
#include pthread.h
#include unistd.hstatic const int defaultnum 5;struct ThreadInfo
{pthread_t tid;std::string name;
};template class T
class ThreadPool
{
private:void Lock(){pthread_mutex_lock(mutex_);}void Unlock(){pthread_mutex_unlock(mutex_);}void WakeUp(){pthread_cond_signal(cond_);}std::string GetThreadName(pthread_t tid){for (const auto ti : thread_){if (ti.tid tid)return ti.name;}return None;}bool IsQueueEmpty(){return task_.empty();}void ThreadSleep(){pthread_cond_wait(cond_, mutex_);}public:static void *HandlerTask(void *args){ThreadPoolT *tp static_castThreadPoolT *(args);std::string name tp-GetThreadName(pthread_self()); // 获取名字while (true){// 加锁tp-Lock();// 如果队列为空就将线程放到条件变量下等待while (tp-IsQueueEmpty()){tp-ThreadSleep();}// 取出节点T t tp-Pop();// 解锁tp-Unlock();// 执行任务t();std::cout name run, result: t.GetResult() std::endl;}}void Start(){for (int i 0; i thread_.size(); i){thread_[i].name Thread- std::to_string(i);pthread_create((thread_[i].tid), nullptr, HandlerTask, this);}}void Push(T t){Lock(); // 先加锁task_.push(t);WakeUp(); // 唤醒Unlock(); // 解锁}T Pop(){T t task_.front();task_.pop();return t;}// 单例模式创建对象static ThreadPoolT *GetInstance(){// 其他线程就不会每次执行加锁和解锁了~if (nullptr tp_){// 加锁pthread_mutex_lock(lock_);if (nullptr tp_){std::cout log: singleton create done first! std::endl;tp_ new ThreadPoolT();}// 解锁pthread_mutex_unlock(lock_);}return tp_;}public:ThreadPool(int num defaultnum): thread_(num){pthread_mutex_init(mutex_, nullptr);pthread_cond_init(cond_, nullptr);}~ThreadPool(){pthread_mutex_destroy(mutex_);pthread_cond_destroy(cond_);}private:std::vectorThreadInfo thread_; // 任务线程std::queueT task_; // 任务队列pthread_mutex_t mutex_; // 锁pthread_cond_t cond_; // 条件变量// 单例模式只能创建一个对象static ThreadPoolT *tp_;static pthread_mutex_t lock_;
};// 类外进行初始化
template class T
ThreadPoolT *ThreadPoolT::tp_ nullptr;template class T
pthread_mutex_t ThreadPoolT::lock_ PTHREAD_MUTEX_INITIALIZER;Main.cpp
#include iostream
#include ctime
#include Task.hpp
#include ThreadPool.hppint main()
{// 创建线程ThreadPoolTask *tp new ThreadPoolTask(5);// 启动线程ThreadPoolTask::GetInstance()-Start();srand(time(nullptr) ^ getpid());while (true){// 1. 构建任务int x rand() % 10 1;usleep(10);int y rand() % 10 1;char op opers[rand() % opers.size()];Task t(x, y, op);// 2. 交给线程池进行处理ThreadPoolTask::GetInstance()-Push(t);std::cout main thread make task: t.GetTask() std::endl;sleep(1);}return 0;
}这样就创建起来跑起来了~~ 简单封装一下pthread库
#include iostream
#include string
#include ctime
#include pthread.htypedef void (*callback_t)();
static int num 1;class Thread
{
private:void Entery(){cb_();}static void *Routine(void *args){Thread *thread static_castThread *(args);thread-Entery();return nullptr;}public:Thread(callback_t cb): tid_(0), name_(), start_timestamp_(0), isrunning_(false), cb_(cb){}void Run(){name_ thread- std::to_string(num);start_timestamp_ time(nullptr);isrunning_ true;pthread_create(tid_, nullptr, Routine, this);}void Join(){pthread_join(tid_, nullptr);isrunning_ false;}std::string Name(){return name_;}uint64_t StartTimestamp(){return start_timestamp_;}bool IsRunning(){return isrunning_;}private:pthread_t tid_;std::string name_;uint64_t start_timestamp_;bool isrunning_;// 回调函数callback_t cb_;
};STL、智能指针和线程安全
STL中的容器不是线程安全的
原因 STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。 而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶)。 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全。
智能指针是否是线程安全的 对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题. 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数。
常见的锁 悲观锁在每次取数据时总是担心数据会被其他线程修改所以会在取数据前先加锁读锁写锁行锁等当其他线程想要访问数据时被阻塞挂起。 乐观锁每次取数据时候总是乐观的认为数据不会被其他线程修改因此不上锁。但是在更新数据前会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式版本号机制和CAS操作。 CAS操作当需要更新数据时判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败失败则重试一般是一个自旋的过程即不断重试。 挂起等待锁当某个线程没有申请到锁的时候此时该线程会被挂起即加入到等待队列等待。当锁被释放的时候就会被唤醒重新竞争锁。当临界区运行的时间较长时我们一般使用挂起等待锁。我们先让线程PCB加入到等待队列中等待等锁被释放时再重新申请锁。 之前所学的互斥锁就是挂起等待锁 自旋锁当某个线程没有申请到锁的时候该线程不会被挂起而是每隔一段时间检测锁是否被释放。如果锁被释放了那就竞争锁如果没有释放过一会儿再来检测。如果这里使用挂起等待锁可能线程刚加入等待队列锁就被释放了因此当临界区运行的时间较短时我们一般使用自旋锁。
pthread_spin_lock();自旋锁只需要把mutex变成spin。
读者写者问题
在编写多线程的时候有一种情况是十分常见的。那就是有些公共数据修改的机会比较少。相比较改写它们读的机会反而高的多。通常而言在读的过程中往往伴随着查找的操作中间耗时很长。给这种代码段加锁会极大地降低我们程序的效率。那么有没有一种方法可以专门处理这种多读少写的情况呢 有那就是读写锁。 3种关系写者和写者互斥读者和读者没有关系读者和写者互斥关系 2种角色读者、写者 1个交易场所读写场所 读者写者 vs 生产者消费者 本质区别消费者会把数据拿走而读者不会 初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 读者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 写者加锁int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); // 读写者解锁设置读写优先 分为读者优先和写者优先。 读者写者进行操作的时候读者非常多频率特别高写者比较少频率不高 存在写者饥饿的情况
代码实现的较为简单的读者写者问题
#include iostream
#include unistd.h
#include pthread.hint board 0;pthread_rwlock_t rw;using namespace std;void *reader(void *args)
{const char *name static_castconst char*(args);cout run... endl;while(true){pthread_rwlock_rdlock(rw);cout reader read : board tid: pthread_self() endl;sleep(10);pthread_rwlock_unlock(rw);}
}void *writer(void *args)
{const char *name static_castconst char*(args);sleep(1);while(true){pthread_rwlock_wrlock(rw);board;cout I am writer endl;sleep(10);pthread_rwlock_unlock(rw);}
}int main()
{pthread_rwlock_init(rw, nullptr);pthread_t r1,r2,r3,r4,r5,r6, w;pthread_create(r1, nullptr, reader, (void*)reader);pthread_create(r2, nullptr, reader, (void*)reader);pthread_create(r3, nullptr, reader, (void*)reader);pthread_create(r4, nullptr, reader, (void*)reader);pthread_create(r5, nullptr, reader, (void*)reader);pthread_create(r6, nullptr, reader, (void*)reader);pthread_create(w, nullptr, writer, (void*)writer);pthread_join(r1, nullptr);pthread_join(r2, nullptr);pthread_join(r3, nullptr);pthread_join(r4, nullptr);pthread_join(r5, nullptr);pthread_join(r6, nullptr);pthread_join(w, nullptr);pthread_rwlock_destroy(rw);return 0;
}struct rwlock_t
{int readers 0;int who;mutex_t mutex;
}读者
ptjread_rwlock_rdlock()
lock(mutex)
readers
(unlock)mutexread操作
lock(mutex)
readers--;
unlock(mutex) 写者
pthread_rwlock_wrlock()
lock(mutex)
while(readers 0) 释放锁, waitwrite操作
unlock(mutex)Linux系统编程到此结束end…