推荐 最新
比我长的不多

OpenManus

OpenManus [表情] Manus is incredible, but OpenManus can achieve any ideas without an Invite Code [表情]! Our team members @mannaandpoem @XiangJinyu @MoshiQAQ @didiforgithub from @MetaGPT built it within 3 hours! It’s a simple implementation, so we welcome any suggestions, contributions, and feedback! Enjoy your own agent with OpenManus! Project Demo Installation Create a new conda environment: conda create -n open_manus python=3.12 conda activate open_manus Clone the repository: git clone https://github.com/mannaandpoem/OpenManus.git cd OpenManus Install dependencies: pip install -r requirements.txt Configuration OpenManus requires configuration for the LLM APIs it uses. Follow these steps to set up your configuration: Create a config.toml file in the config directory (you can copy from the example): cp config/config.example.toml config/config.toml Edit config/config.toml to add your API keys and customize settings: # Global LLM configuration [llm] model = "gpt-4o" base_url = "https://api.openai.com/v1" api_key = "sk-..." # Replace with your actual API key max_tokens = 4096 temperature = 0.0 # Optional configuration for specific LLM models [llm.vision] model = "gpt-4o" base_url = "https://api.openai.com/v1" api_key = "sk-..." # Replace with your actual API key Quick Start One line for run OpenManus: python main.py Then input your idea via terminal! For unstable version, you also can run: python run_flow.py How to contribute We welcome any friendly suggestions and helpful contributions! Just create issues or submit pull requests. Or contact @mannaandpoem via [表情]email: mannaandpoem@gmail.com Roadmap [ ] Better Planning [ ] Live Demos [ ] Replay [ ] RL Fine-tuned Models [ ] Comprehensive Benchmarks Community Group Join our networking group on Feishu and share your experience with other developers!

19
0
2
浏览量156
Yourselffff

在Java的等待唤醒机制中,为什么锁对象不能是业务操作的对象?

举个很简单的例子,厨师和吃客,厨师做出一道菜,吃客吃一道菜;按常理推,菜的数量应为锁,从而操作线程唤醒或等待,但事实是food不能成为"synchronized"的参数,必须在"Desk"中定义一个"Object lock"并传入"synchronized",程序才能跑的通,很疑惑,请大神解答! 正确代码如下,错误代码放在注释中。 //餐桌 public class Desk { public static Integer food = 0;// 食物,1代表有食物,0代表无食物 public static Object lock = new Object(); } 厨师类 public class Cook extends Thread { @Override public void run() { synchronized (Desk.lock) {//这里必须使用Object类对象,而使用Desk.food就报错 while (true) { if (Desk.food == 1) { try { Desk.lock.wait();//这里必须用lock对象,而使用Desk.food就报错 } catch (InterruptedException e) { throw new RuntimeException(e); } } else { Desk.food += 1; System.out.println("厨师做菜" + Desk.food); Desk.lock.notifyAll();//唤起消费者线程,这里必须用lock对象,而使用Desk.food就报错 } } } } } 吃客类 public class Foodie extends Thread { @Override public void run() { synchronized (Desk.lock) {//这里必须使用Object类对象,而使用Desk.food就报错 while (true) { if (Desk.food == 0) { try { Desk.lock.wait();//当前线程等待,这里必须用lock对象,而使用Desk.food就报错 } catch (InterruptedException e) { throw new RuntimeException(e); } } else { Desk.food -= 1; System.out.println("我吃了" + Desk.food); Desk.lock.notifyAll();//唤起厨师线程,这里必须用lock对象,而使用Desk.food就报错 } } } } } main方法 public static void main(String[] args){ Cook cook =new Cook(); Foodie foodie = new Foodie(); Thread c = new Thread(cook,"厨师线程"); Thread f = new Thread(foodie,"消费者线程"); c.start(); f.start(); }

10
1
0
浏览量295
爱喝奶茶的波波

操作系统(1)---操作系统的运行机制

1.CPU指令CPU的指令类型有特权指令与非特权指令。应用程序只能使用“非特权指令”,如:加法指令,减法指令等。操作系统内核作为“管理者”,有时会让CPU执行一些“特权指令”,如:内存清零指令。这些指令影响重大只允许“管理者”一即操作系统内核来使用。在CPU设计和生产的时候就划分了特权指令和非特权指令,因此CPU执行一条指令前就能判断出其类型,CPU是怎么判断出指令类型的呢?CPU有两种状态,“内核态”和“用户态”CPU 中有一个寄存器叫程序状态字寄存器(PSW),其中有个二进制位,1表示“内核态”,0示“用户态”处于内核态(核心态,管态)时,说明此时正在运行的是内核程序,此时可以执行特权指令处于用户态(目态)时,i说明此时正在运行的是应用程序,此时只能执行非特权指令2.内核态、初始态的切换内核态--->用户态: 执行一条特权指令--修改PSW的标志位为“用户态”,这个动作意味着操作系统将主动让出CPU使用权用户态--->内核态:由“中断”引发,硬件自动完成变态过程,触发中断信号意味着操作系统将强行夺回CPU的使用权,这种情况会发生在非法使用特权指令,或者操作系统需要开展某项管理工作,即操作系统需要介入,都需要触发中断信号。举个例子:(1)开机时,需要加载操作系统,进行操作系统的初始化工作,初始化工作就是由操作系统中某些内核程序完成的,所以CPU此时处在"内核态"(2)开机完成后,用户可以启动某个应用程序。(3)操作系统内核程序在合适的时候主动让出CPU,让该应用程序上CPU运行。操作系统内核在让出CPU之前,会用一条特权指令把 PSW 的标志位设置为“用户态”(4)应用程序运行在“用户态”(5)若此时植入一条特权指令,这个非法事件会引发一个中断信号,但CPU检查PSW的状态为“0”,处于“用户态”,CPU检测到中断信号后,会立即变为“核心态”,并停止运行当前的应用程序(拒绝执行这一植入的特权指令),转而运行处理中断信号的内核程序(6)操作系统会对引发中断的事件进行处理,处理完了再把CPU使用权交给别的应用程序3.中断机制的基本原理不同的中断信号,需要用不同的中断处理程序来处理。当CPU检测到中断信号后,会根据中断信号的类型去查询“中断向量表”,以此来找到相应的中断处理程序在内存中的存放位置。中断处理程序一定是内核程序,需要运行在“内核态”。4.中断的类型中断的类型由内中断与外中断。内中断(异常,例外)与当前执行的指令有关,中断信号来源于CPU内部。内中断:内中断,也称为异常,有以下几种类型:陷阱、陷入:由陷入指令引发,是应用程序故意引发的故障:由错误条件引起的,可能被内核程序修复。内核程序修复故障后会把 CPU使用权还给应用程序,让它继续执行下去。如: 缺页故障。终止:由致命错误引起,内核程序无法修复该错误,因此一般不再将CPU使用权还给引发终止的应用程序,而是直接终止该应用程序。如:整数除0、非法使用特权指令。内中断的例子:1.由非法植入的特权指令引起的中断(终止)2.除法指令中发现除数为0,也会引起内中断(终止)。3.有时候应用程序想请求操作系统内核的服务,此时会执行一条特殊的指令--陷入指令,该指令会引发一个内部中断信号。("系统调用"就是通过陷入指令完成的)注:陷入指令是一条特殊的指令,不是特权指所以若当前执行的指令是非法的,则会引发一个中断信号。注:CPU在执行指令时会检查是否有异常发生这里补充一下系统调用:操作系统作为用户和计算机硬件之间的接口,需要向上提供一些简单易用的服务。主要包括命令接口和程序接口。其中,程序接口由一组系统调用组成。“系统调用”是操作系统提供给应用程序(程序员/编程人员)使用的接口,可以理解为一种可供应用程序调用的特殊函数,应用程序可以通过系统调用来请求获得操作系统内核的服务系统调用的作用:操作系统内核会对共享资源进行统一的管理,并向上提供“系统调用”,用户进程想要使用打印机这种共享资源,只能通过系统调用向操作系统内核发出请求。内核会对各个请求进行协调处理。应用程序通过系统调用请求操作系统的服务。而系统中的各种共享资源都由操作系统内核统一掌管,因此凡是与共享资源有关的操作(如存储分配、I/O操作、文件管理等),都必须通过系统调用的方式向操作系统内核提出服务请求,由操作系统内核代为完成。这样可以保证系统的稳定性和安全性,防止用户进行非法操作。高级语言的库函数,在底层也会使用到系统调用,所以系统调用是比高级语言的库函数更为底层的接口:注:不是所有的库函数都需要涉及系统调用不涉及系统调用的库函:如的“取绝对值”的函数涉及系统调用的库函数:如“创建一个新文件”的函数系统调用的过程:1.应用系统首先会执行传参指令,传参指令会将参数传递到寄存器中,传递多少参数取决于系统调用需要多少参数,操作系统会根据这些参数判断是哪种类型的服务。2.参数传递完毕后,应用程序会执行陷入指令,触发内中断信号,转入相应的中断处理程序一即系统调用的入口程序3.接下来CPU就会响应中断,进行系统调用入口程序的处理,此时CPU的状态由"用户态"转为"内核态",该程序会根据寄存器中的参数判断用户需要哪种系统调用服务。4.接下来CPU就会转入响应的系统调用处理程序进行处理。总结来讲就是:传递系统调用参数--->执行陷入指令(用户态)--->执行相应的内核请求程序处理系统调用(核心态)---->返口应用程序注意:1.陷入指令是在用户态执行的,执行陷入指令之后立即引发一个内中断,使CPU进入核心态2.发出系统调用请求是在用户态,而对系统调用的相应处理在核心态下进行外中断:外中断(中断)与当前执行的指令无关,中断信号来源于CPU外部。1.时钟中断---由时钟部件发来的中断信号。时钟部件每隔一个时间片(如 50ms) 会给CPU发送一个时钟中断信号,通过这个时钟中断信号就可以实现多道程序并发运行了。假设系统中想要并发执行两个应用程序,应用程序1执行自己的应用程序,直到50ms,时钟部件会向CPU发送中断信号,CPU收到中断信号会将“用户态”转变为“内核态”,在内核态下执行内核程序,处理中断信号,并且决定让另一个应用程序上CPU运行。所以切换应用程序2上CPU运行。这样两个应用程序就可以实现交替运行。2.I/O中断一一由输入/输出设备发来的中断信号。当输入输出任务完成时,向CPU发送中断信号。CPU收到中断信号,就会处理I/O中断中的内核程序。注:CPU在每一条指令执行结束后,都会理性检查是否有外中断信号。

0
0
0
浏览量1096
带鱼

Java 多线程 : 勉强弄懂了AQS

一 . AQS 基础一句话概括AQS :一个叫 AbstractQueuedSynchronizer 的抽象类包含2个重要概念 : 以Node为节点实现的链表的队列(CHL队列) + STATE标志支持2种锁 : 独占锁和共享锁 ,1.1 什么是 AQS ?java.util.concurrent.locks.AbstractQueuedSynchronizer 抽象类,简称 AQS , 队列同步器作用 :  用于构建锁和同步容器的同步器 原理 :  使用一个 FIFO 的队列表示排队等待锁的线程队列头节点称作“哨兵节点”或者“哑节点”,不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态 waitStatus场景 :  AQS解决了实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列1.2 AQS 的 status 表示了什么 ?AQS 使用一个int 的 status 来表示同步状态, 同步状态重要目的是用于跟踪线程是否应该阻塞 , 当它的前身释放时,一个节点被通知。否则,队列中的每个节点都充当一个特定通知样式的监视器,该监视器持有单个等待线程.status > 0 : 获取了锁status = 0 : 释放了锁status < 0 :1.3 常用方法:状态处理getState():返回同步状态的当前值。setState(int newState):设置当前同步状态。compareAndSetState(int expect, int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。独占锁相关方法【可重写】#tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态。【可重写】#tryRelease(int arg):独占式释放同步状态。共享锁相关方法【可重写】#tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于 0 ,则表示获取成功;否则,获取失败。【可重写】#tryReleaseShared(int arg):共享式释放同步状态。【可重写】#isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占。独占式获取同步状态acquire(int arg):独占式获取同步状态。   如果当前线程获取同步状态成功,则由该方法返回;否则,将会进入同步队列等待。该方法将会调用可重写的       #tryAcquire(int arg) 方法;acquireInterruptibly(int arg):与 #acquire(int arg) 相同,但是该方法响应中断。   当前线程为获取到同步状态而进入到同步队列中   如果当前线程被中断,则抛出 InterruptedException()   如果未中断 ,将尝试调用 tryAcquire , 调用失败线程将进入队列,可能会反复阻塞和解除阻塞tryAcquireNanos(int arg, long nanos):超时获取同步状态。   如果当前线程被中断,则抛出 InterruptedException()   如果当前线程在 nanos 时间内没有获取到同步状态,那么将会返回 false ,已经获取则返回 true 。   未超时未获取会一只排队 ,反复阻塞共享式获取同步状态acquireShared(int arg):共享式获取同步状态   如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断。tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制。释放同步状态release(int arg):独占式释放同步状态   该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。releaseShared(int arg):共享式释放同步状态。public class SimpleLock extends AbstractQueuedSynchronizer { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override protected boolean tryAcquire(int unused) { logger.info("------> try tryAcquire :{} <-------", unused); //使用compareAndSetState控制AQS中的同步变量 if (compareAndSetState(0, 1)) { logger.info("------> cas success "); setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int unused) { logger.info("------> try tryRelease :{} <-------", unused); setExclusiveOwnerThread(null); //使用setState控制AQS中的同步变量 setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } } 其他关联知识点 : 1 AQS的所有子类中,要么使用了它的独占锁,要么使用了它的共享锁,不会同时使用它的两个锁。二 .  AQS 原理2.1  基本原理AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。在await之后,一个节点被插入到条件队列中(可见后面代码)。收到信号后,节点被转移到主队列CLH(Craig,Landin,and Hagersten)队列 该队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。AQS定义两种资源共享方式 Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:公平锁:按照线程在队列中的排队顺序,先到者先拿到锁非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。2.1  AQS底层使用了模板方法模式2.1.1 模板方法详情同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:以下方法未重写抛出 UnsupportedOperationExceptionisHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。2.1.2 常见的实现案例Semaphore(信号量)功能 :  允许多个线程同时访问synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。CountDownLatch (倒计时器) 功能 : CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。CyclicBarrier(循环栅栏)功能 :  CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。应用场景 : 和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。作用 : 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。三 .  AQS 同步状态的获取和释放独占式获取和释放同步状态同一时刻,仅有一个线程持有同步状态acquire(int arg) : 该方法对中断不敏感 , 即后续对该线程进行中断操作时,线程不会从 CLH 同步队列中移除tryAcquire(int arg) : 去尝试获取同步状态   true : 获取成功 , 设置锁状态 , 直接返回不用线程阻塞 , 自旋直到获得同步状态成功     false : 获取失败 , 用#addWaiter(Node mode) 方法,将当前线程加入到 CLH 同步队列尾部,mode 方法参数为Node.EXCLUSIVEacquireQueued : 自旋直到获得同步状态成功 public final void acquire(int arg) { if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 共享式获取和释放同步状态 // 首先调用至少一次tryacquisharered // public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // 上述调用失败 , 线程可能会进入队列反复阻塞和解除阻塞 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // shouldParkAfterFailedAcquire 检查并更新未能获取的节点的状态。如果线程阻塞,则返回true // parkAndCheckInterrupt : 中断线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } shouldParkAfterFailedAcquire 的作用等待状态为 Node.SIGNAL 时,表示 pred 的下一个节点 node 的线程需要阻塞等待。在pred 的线程释放同步状态时,会对 node 的线程进行唤醒通知等待状态为 0 或者 Node.PROPAGATE 时,通过 CAS 设置,将状态修改为 Node.SIGNAL等待状态为 NODE.CANCELLED 时,则表明该线程的前一个节点已经等待超时或者被中断了,则需要从 CLH 队列中将该前一个节点删除掉,循环回溯,直到前一个节点状态 <= 0 查询同步队列中的等待线程情况 // 自旋处理流程: for (;;) { // 获取当前线程的前驱节点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 当前线程的前驱节点是头结点,且同步状态成功 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 获取失败,线程等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } 四. 阻塞和唤醒线程AQS 在处理中有2处值得深入的过程 : 阻塞和唤醒 // 阻塞发生在获取对应同步方法同步失败后 ,流程为 : Step Start : 获取同步状态 -> 获取失败 -> 检查该状态 shouldParkAfterFailedAcquire(Node pred, Node node) Step 2 : 返回true -> 当前线程应该被柱塞 Step 3 : parkAndCheckInterrupt() 阻塞线程 - 调用 LockSupport#park(Object blocker) 方法,将当前线程挂起,此时就进入阻塞等待唤醒的状态 // 后续将进行线程的唤醒操作 , 唤醒分为2种 第一种,当前节点(线程)的前序节点释放同步状态时,唤醒了该线程 第二种,当前线程被打断导致唤醒。 Step Start : 当线程释放同步状态后 , 唤醒该线程的后继节点 (unparkSuccessor) Step 2 : 后继节点存在 , 唤醒后继节点 LockSupport.unpark(s.thread) Step 3 : 如果后继节点为null (超时 , 中断) , 通过 tail 回溯的方式找到最前序的可用的线程 // 补充 : > park() : 阻塞当前线程 > park(Object blocker) : 为了线程调度 , 在许可可用之前兼用当前线程 > unpark : 如果给定线程的许可尚不可用 , 则使其可用 > parkNanos(long nanos) :为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用 - park 方法和 unpark(Thread thread) 方法,都是成对出现的 - unpark(Thread thread) 方法,必须要在 park 方法执行之后执行 五 . CLH 同步队列> 简介 : CLH 同步队列是一个 FIFO 双向队列,AQS 依赖它来完成同步状态的管理 > 2种状态 : • 当线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程 • 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。 C- Node : AbstractQueuedSynchronizer 的内部静态类 SF- Node SHARED = new Node(); SF- Node EXCLUSIVE = null; SF- int CANCELLED = 1; SF- int SIGNAL = -1; SF- int CONDITION = -2; SF- int PROPAGATE = -3; F- volatile int waitStatus -- CANCELLED SIGNAL CONDITION PROPAGATE INITAL 总共 5 种状态 , 其中 INITAL 是初始状态 F- volatile Node prev; -- 指向前一个节点 F- volatile Node next; -- 指向后一个节点 F- volatile Thread thread; -- Node 节点对应的线程 Thread F- Node nextWaiter; -- Node 节点获取同步状态的模型( Mode ) M- tryAcquire : 独占式获取同步状态 M- tryAcquireShared : 共享式获取同步状态 M- addWaiter : 入队 M- isShared : 判断是否为共享式获取同步状态 M- predecessor : 获得 Node 节点的前一个 Node 节点 // 属性详情 : Node 中包含链表常用的2个概念 : prev , next , 同步器中包含2个属性 head , tail 分别指向队列的头和尾 , > 入列 : M- addWaiter - 准备新节点 -> 记录尾节点 -> 将新节点放入尾节点 -> CAS 设置新的尾节点 - 失败反复尝试 , 直到成功 > 出列 : - 首节点的线程释放同步状态后,将会唤醒它的下一个节点(Node.next)。后继节点将会在获取同步状态成功时,将自己设置为首节点( head ) - setHead - 该操作为单线程操作 // 原理简述 : 六. AQS 源码9.6.1 问题一 : CLH 的形式AQS 有2个属性private transient volatile Node head;   等待队列的头,懒加载。初始化后只能通过sehead方法进行修改。   注意:如果head存在,它的waitStatus保证不存在private transient volatile Node tail;   尾部的等待队列,懒加载AQS 有几个重要的方法private Node addWaiter(Node mode)   为当前线程和给定模式创建并进入节点队列private void setHead(Node node)设置队列头为节点,退出队列。   仅通过acquire方法调用。   将未使用的字段置空 (GC 及效率)private Node enq(final Node node)   将节点插入队列,必要时进行初始化9.6.2 问题二 : Node 节点AQS 中有个内部类 Node , 他是节点对象 , 它其中有四个属性表示状态SIGNAL :该节点的后继节点被阻塞(或即将被阻塞)(通过park),因此当前节点在释放或取消后继节点时必须解除它的后继节点的阻塞CANCELLED : 该节点因超时或中断而被取消CONDITION : 该节点当前在条件队列中。它将不会被用作同步队列节点,直到传输时,状态将被设置为0PROPAGATE : 这个 releaseShared 应该传播到其他节点它还有如下几个重要的属性volatile Node prev : 前一个节点volatile Node next : 下一个节点volatile Thread thread : 当前线程9.6.3 AQS 状态private volatile int state;9.6.5 AQS 流程图七 . AQS 使用@ https://github.com/black-ant/case/tree/master/case%20Module%20Thread/case%20AQS public class SimpleLock extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int unused) { //使用compareAndSetState控制AQS中的同步变量 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); //使用setState控制AQS中的同步变量 setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } } // 分析 1 [ main] try tryAcquire :1 <------- 2 [ main] cas success 3 [ Thread-52] try tryAcquire :1 <------- 4 [ Thread-52] try tryAcquire :1 <------- 5 [ Thread-52] try tryAcquire :1 <------- 6 [ Thread-53] try tryAcquire :1 <------- 16 [ main] try tryRelease :1 <------- 17 [ Thread-52] try tryAcquire :1 <------- 18 [ Thread-52] cas success 19 [ Thread-52] c.g.s.thread.aqs.demo.logic.StartLogic : ------> acquired the lock! <------- 20 [ Thread-52] try tryRelease :1 <------- 21 [ Thread-53] try tryAcquire :1 <------- 22 [ Thread-53] cas success 23 [ Thread-53] c.g.s.thread.aqs.demo.logic.StartLogic : ------> acquired the lock! <------- 24 [ Thread-53] try tryRelease :1 <------- // 第2行 : main 线程获取了独占锁 , 导致后续3-6行的线程全部无法获取锁 , 排在队列中 // 第16行 : main 释放了锁 , 所以从 17-20 行 ,是 Thread-52 的操作流程 (后面可以看到 53 的队列流) // AQS 使用如上文所示 , 通常要实现 tryAcquire 和 tryRelease TODO

0
0
0
浏览量2017
带鱼

Java多线程(一) : 快查手册

快查手册// 乐观锁/悲观锁 java悲观锁:synchronized、lock的实现类 java乐观锁:乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法,Java原子类中的递增操作就通过CAS自旋实现的。 // 独享锁/共享锁 synchronized、ReentrantLock是独享锁。 ReadWriteLock其读锁是共享锁,其写锁是独享锁。 // 可重入锁 synchronized、ReentrantLock // 公平锁/非公平锁 synchronized是非公平锁 ReetrantLock(通过构造函数指定该锁是否是公平锁,默认是非公平锁) JVM 参数变量> User user = new User() - new User 会创建到 Heap 中 - User user 为对象得引用 ,放在方法栈中 JVM 多线程的变量同步

0
0
0
浏览量2015
带鱼

Java 多线程 : 真想聊清楚线程池

一 .   线程池简介1 线程池的元素 线程池主要由两个概念组成,一个是任务队列,另一个是工作者线程。任务队列是一个阻塞队列,保存待执行的任务。工作者线程主体就是一个循环,循环从队列中接受任务并执行。2 为什么要用线程池降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。3 线程池中的核心概念BlockingQueue workQueue : 用于保留任务并移交给工作线程的队列HashSet workers : 线程池中所有的工作线程4 线程池的原理定义 : 线程池通过一个叫 ctl 的 AtomicInteger 决定运行情况 , 通过 ThreadFactory 创建线程 , 并且把等待的线程放入 workQueue , 等待移交给工作线程二.  常见的线程池// 基本对象 ThreadPoolExecutor // 可重用固定线程数的线程池 FixedThreadPool - ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); // 使用单个worker线程的Executor SingleThreadExecutor // 会根据需要创建新线程的线程池 CachedThreadPool 三. 线程池的创建线程池创建可以通过 ThreadPoolExecutor 和 工具类 Executors 实现3.1 通过构造方法实现(推荐)通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则3.2 通过Executor 框架的工具类Executors来实现 (个人demo 可以考虑)3.2.1 FixedThreadPoolreturn new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); 该方法返回一个固定线程数量的线程池。 (corePoolSize == maximumPoolSize)使用LinkedBlockingQuene作为阻塞队列当线程池没有可执行任务时,也不会释放线程该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。3.2.2 SingleThreadExecutorreturn new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。3.2.3 CachedThreadPool:return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); 该方法返回一个可根据实际情况调整线程数量的线程池。(默认缓存60s , 线程池的线程数可达到Integer.MAX_VALUE,即2147483647)内部使用SynchronousQueue作为阻塞队列线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源3.2.4 ScheduledExecutorService :return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); 初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据四. Fork/Join1 > Fork / Join 的核心是 ForkJoinPool , 用于来管理工作线程 : 工作线程一次只能执行一个任务 , : 不会根据任务创建线程,而是将任务存储到工作线程的双端队列中 2 > Fork / join 的思路是分而治之 - Fork 递归的将任务分为较小的子任务 - Join : 将子任务递归的串联成单个结果 3 > 工作窃取算法 : 空闲的线程试图从繁忙的线程(他们的双端队列)中窃取工作 // Fork/Join 依赖于 ForkJoinPool , 此处仅简单介绍 , 详情参考十六章 五. ThreadPoolExecutorThreadPoolExecutor实现了生产者/消费者模式, - 工作者线程就是消费者 - 任务提交者就是生产者,线程池自己维护任务队列。 > ThreadPoolExecutor - AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); : 此变量 记录了 “线程池中的任务数量”和“线程池的状态”两个信息 : 高3位表示"线程池状态",低29位表示"线程池中的任务数量" - RUNNING : 111 : 该线程池能接收新任务 ,且能对新任务进行处理 - SHUTDOWN : 000 : 不能接收新任务 ,但是可以对任务进行处理 - STOP : 001 : 不添加新任务 , 不对任务进行处理 , 会中断正在执行的任务 - TIDYING : 010 : 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态 - 当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态 - TERMINATED : 011 : 线程池彻底终止的状态 ---------------------------------------- > ThreadPoolExecutor 的参数 - corePoolSize : 线程池中核心线程的数量 - maximumPoolSize : 线程池中允许的最大线程数 - keepAliveTime : 线程空闲的时间 - unit : keepAliveTime的单位 - workQueue : 用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口 - threadFactory : 用于设置创建线程的工厂 - allowCoreThreadTimeOut : 允许核心线程过期 - Handler : 处理器 - defaultHandler : 任务拒绝处理器 六 .  线程池的饱和和动态调整// 线程池的饱和策略 , 当线程池满了. 会通过对应的策略 1、AbortPolicy:直接抛出异常,默认策略; 2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4、DiscardPolicy:直接丢弃任务; // ThreadPoolExecutor 提供了动态调整线程池容量大小的方法: • setCorePoolSize:设置核心池大小。 • setMaximumPoolSize:设置线程池最大能创建的线程数目大小。 当上述参数从小变大时,ThreadPoolExecutor 进行线程赋值,还可能立即创建新的线程来执行任务。 // 动态调整源码核心 : 七. 线程池执行任务的过程刚创建时,里面没有线程调用 execute() 方法,添加任务时:完成一个任务,继续取下一个任务处理。 没有任务继续处理,线程被中断或者线程池被关闭时,线程退出执行,如果线程池被关闭,线程结束。 否则,判断线程池正在运行的线程数量是否大于核心线程数,如果是,线程结束,否则线程阻塞。因此线程池任务全部执行完成后,继续留存的线程池大小为 corePoolSize 。八. 线程池中 submit 和 execute 方法有什么区别两个方法都可以向线程池提交任务。#execute(...) 方法,返回类型是 void ,它定义在 Executor 接口中 , 必须实现Runnable接口 。#submit(...) 方法,可以返回持有计算结果的 Future 对象,它定义在 ExecutorService 接口中,它扩展了 Executor 接口,其它线程池类像 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 都有这些方法。九 .  如果你提交任务时,线程池队列已满,这时会发生什么重点在于线程池的队列是有界还是无界的。> 如果你使用的 LinkedBlockingQueue,也就是无界队列的话,没关系,继续添加任务到阻塞队列中等待执行,因为 LinkedBlockingQueue 可以近乎认为是一个无穷大的队列,可以无限存放任务。> 如果你使用的是有界队列比方说 ArrayBlockingQueue 的话,任务首先会被添加到 ArrayBlockingQueue 中,ArrayBlockingQueue满了,则会使用拒绝策略 RejectedExecutionHandler 处理满了的任务,默认是 AbortPolicy 。十 .  线程池的底层逻辑要想弄清楚这一部分 , 首先得理解 Queue , Worker , Task , Thread 等多个概念Queue :Worker :Task :Thread :10.1 线程池的物理结构Worker 对象Worker 对象是 ThreadPoolExecutor 中的一个内部类 , 他是一个包装类 , 是一个线程单元 , 同时提供线程的中断等功能// 问题一 : Worker 结构 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 被封装的线程 final Thread thread; // 初始任务 Runnable firstTask; // 线程任务计数器 volatile long completedTasks; // 可以看到 , 把 worker 都行构建成了 Thread Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() {....} // 获取同步状态 protected boolean tryAcquire(int unused) {....} // 释放同步状态 protected boolean tryRelease(int unused) {....} //锁操作 public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 暂停操作 void interruptIfStarted(){....} } // 问题二 : ThreadPoolExecutor中的线程包装 - 线程被封装成一个对象Worker - 通过调用 runWorker(Worker w) 获取任务并执行的死循环 - 如果任务的运行出了什么问题 ,调用 processWorkerExit() 处理 C- ThreadPoolExecutor PVC- Worker M- run : public void run() {runWorker(this); } 拒绝策略部分 PolicyThreadPoolExecutor 中提供了4 个拒绝策略内部类  , 具体的类型详见上文 , 这里来看一下结构 :public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); } 10.2 主要流程Step 1 : 进入的起点 - 线程的封装// task 的构建 : 匿名传进来的线程会构建成一个 FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } Step 2 : 运行的起点 - executeThreadPoolExecutor 中 excutor 方法是执行的起点 , 其中会进行三种操作当线程池未满时 , 直接 addWorker 运行当线程池满了且正在运行时 , 将线程加入 workQueue 中当上述均失败后 , 就会调用 reject 来处理异常情况 (RejectedExecutionHandler)// 问题一 : execute 中线程池处理任务的逻辑 1 int c = ctl.get(); 2 if (workerCountOf(c) < corePoolSize) { 3 if (addWorker(command, true)) 4 return; 5 c = ctl.get(); 6 } 7 if (isRunning(c) && workQueue.offer(command)) { 8 int recheck = ctl.get(); 9 if (! isRunning(recheck) && remove(command)) 10 reject(command); 11 else if (workerCountOf(recheck) == 0) 12 addWorker(null, false); 13 } 14 else if (!addWorker(command, false)) 15 reject(command); // 2 : workerCountOf 判断当前线程数是否小于corePoolSize 从而决定是否通过 addWorker 创建线程 // 7 : 如果线程池已满 ,且状态为 running , 尝试把任务添加到 workQueue // 14 : 如果 7 步处理失败 , 尝试 addWorker , 失败则通过 reject 处理 //补充 : addWork 作用 - 检查是否可以根据当前池状态和给定边界(核心或最大)添加新的工作者 - 创建并启动新的worker,运行firstTask作为它的第一个任务 // 问题二 : 线程池运行 Work 详情 (简述一下就是核心的四步) 1 addWorker(Runnable firstTask, boolean core) : 可以看到addWorker 添加的是一个 Runnable 2 new Worker(firstTask) :如果状态符合 ,会创建一个 Worker 对象 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } 3 final Thread t = w.thread; ?- 这里将Thread 取了出来 4 后文将会 t.start()运行 // 补充 : 期间还会进行锁的处理 , 省略一些的主要流程如下 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 核心一 : 状态判断 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 核心二 : 容量满了的处理 , 退出或者重试 (可以看到 c 语言的影子) for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 核心三 : 处理开始 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 核心四 : 启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } Step 3 : 工厂的创建// 问题四 : 创建工厂 从上文说到 , 线程池通过 ThreadFactory 创建线程 (newThread()) , Step 4 : 线程的复用在上文 Step 1 问题一 中 , 将 线程加入到 workQueue 中了isRunning(c) && workQueue.offer(command) , 这里就是取出来的步骤 :// 这个问题涉及到的方法主要包括 getTask () M- runWorker while (task != null || (task = getTask()) != null) : 死循环 , 只要还有 task 就会执行 - task.run() : 获取到 task 后 通过 task run 执行 M- getTask() workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 补充 : 这里可以看到 , worker 可以理解为一个工作线程 ,他通过 while 不停的从 queue 中获取 task 执行 // 这里很有趣 , worker 更像一个加工工厂 , 我一开始还以为迭代的是 worker , 现在发现是在 worker 上锁后在里面迭代 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // task 实际上不是 worker 的内部属性 while (task != null || (task = getTask()) != null) { // 上锁 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 线程执行 task.run(); } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } Step 5 : 拒绝策略// 瞅一瞅拒绝策略 : 当你的线程池满了后 , 通常这个异常就爆出来了 java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@523424b5 rejected from java.util.concurrent.ThreadPoolExecutor@319dead1 [Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0] - 尽管我们可以通过拒绝策略有很多种 ,但是超高并发的时候哪一种都不靠谱 , 所以我们先看下 , 这个拒绝策略怎么来的 1 从问题三代码的第七行我们就能看到 workQueue.offer(command) , queue 已经 offer 失败了 , 说明Queue 也满了 2 到14行 , 再次通过 addWork 直接运行 , 失败了 3 执行了 reject 方法 , handler.rejectedExecution(command, this); ?- handler 是接口 ,他有四个实现类 , 具体含义可以见上文拒绝策略 4 例如 AbortPolicy 就是 throw new RejectedExecutionException , CallerRunsPolicy 就是再次run (主线程慢慢跑 , 肯定慢的) - 所以部分业务我们要改 , 怎么改 ? 1 spring 里面可以自定义你的拒绝策略 , 可以参考这一篇的用法 @ https://blog.csdn.net/tanglei6636/article/details/90721801 2 ThreadPoolExecutor 构造器里面也有 - 改的思路 ? 前提一是集群已经无法解决 (基本上现阶段集群都能满足) ,且你无法节流 1 放到消息队列 2 入库 3 写盘 4 放集合 , 单独一个线程 , 用一个取一个 Step 6 : 线程的关闭1 checkShutdownAccess 校验是否可以关闭 2 RunState 改为 STOP 3 ReentrantLock and isInterrupted 4 drainQueue : remove queue 队列 Step 7 : 如何实现回调 ?submit 回调 - <T> Future<T> submit(Callable<T> task) ?- 很明显 , submit 返回的是 Future , 这就意味着主线程能阻塞等待 - RunnableFuture<T> ftask = newTaskFor(task); 10.2 底层复杂分析ctl 到底怎么玩的 ?> private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ?- 线程池里面通过 ctl 来判断线程的状态 , 前面说了线程池高3位表示 "线程池状态",低29位表示线程池中的任务数量- ?- 以 STOP 状态为例 , 在运行的时候 ,他的十进制值为 536870912 - 首先 ,我们将他转换为二进制 -> 10000 00000 00000 00000 00000 00000 - 获取后面的29位 ,然后前面补齐 , 最后的高三位即为 001 ?- 而 TIDYING 对应的就是 00001 00000 00000 00000 00000 00000 00000 -> 010 ?- RUNNING 为 -1 , 按照为数不多的一点残留知识 , 这里说成111是因为负数按照补码表示的原因 ?- 众所周知 , 二进制处理的效率最高 ,所以这么玩合情合理 线程池公式> 计算密集型 :Ncpu + 1 > 包含了 I/O和其他阻塞操作的任务 : Nthreads = Ncpu x Ucpu x (1 + W/C)   Ncpu = CPU的数量   Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1   W/C = 等待时间与计算时间的比率 > IO密集型 = 2Ncpu 比较当前线程容量的方法 workerCountOf(c) 为什么要 & 一个 CAPACITY ?1 > public static final int SIZE = 32; ?- 用二进制补码形式表示int值的位数 2 > private static final int COUNT_BITS = Integer.SIZE - 3; 3 > private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 > private static int workerCountOf(int c) { return c & CAPACITY; } // 原因一 : 还是状态的原因 , 低 29 位才是 线程数量 , 加上这个参数才能包装低 29 二进制时最开始为 0 十一.  线程池使用 // 构造一个线程池 , 推荐用法 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)); // 除了测试 , 尽量避免使用以下方法构建线程池 // 线程创建 // 1 CachedThreadPool ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> {} // 2 FixedThreadPool ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> {} // 3 SingleThreadExecutor ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> {} // 4 SingleThreadScheduledExecutor ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleWithFixedDelay(() -> {} // 线程池的关闭 executor.shutdown(); executor.shutdownNow(); // 信息获取 executor.isTerminated() : 是否关闭 executor.getPoolSize() executor.getQueue().size() 十二 .  线程池的想法// 使用线程池时有一些规约和建议是需要注意的 : - 创建线程或线程池时请指定有意义的线程名称 - 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式 - 并且不建议创建无界线程 , 避免 OOM - 必须回收自定义的 ThreadLocal 变量,尤其在线程池场景下 // 注意点 : - 注意线程池的拒绝策略 , 当线程池满了时 , 可能会因为策略带来系统崩溃 - newCachedThreadPool 也有可能出现 OOM , 其最大值为 newCachedThreadPool -

0
0
0
浏览量2017
带鱼

Java 多线程 : 简简单单原子类

一.  原子类的简述1.1  原子类的应用场景原子类适用于需要原子操作而有需要减少资源消耗时 , 原子类相当于 volatile 和 CAS 的工具类 .1.2 原子类的类型基本类型 : 使用原子的方式更新基本类型AtomicInteger:整形原子类AtomicLong:长整型原子类AtomicBoolean :布尔型原子类数组类型 : 使用原子的方式更新数组里的某个元素AtomicIntegerArray:整形数组原子类AtomicLongArray:长整形数组原子类AtomicReferenceArray :引用类型数组原子类引用类型AtomicReference:引用类型原子类AtomicStampedRerence:原子更新引用类型里的字段原子类AtomicMarkableReference :原子更新带有标记位的引用类型对象的属性修改类型AtomicIntegerFieldUpdater:原子更新整形字段的更新器AtomicLongFieldUpdater:原子更新长整形字段的更新器AtomicStampedReference :原子更新带有版本号的引用类型。1.3 . AtomicInteger 类常用方法public final int get() : 获取当前的值javapublic final int getAndSet(int newValue) : 获取当前的值,并设置新的值public final int getAndIncrement(): 获取当前的值,并自增public final int getAndDecrement() : 获取当前的值,并自减public final int getAndAdd(int delta) : 获取当前的值,并加上预期的值boolean compareAndSet(int expect, int update)  : 如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)public final void lazySet(int newValue) : 最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。public final int incrementAndGet() : 以原子方式给当前值加1并获取新值public final int decrementAndGet() : 以原子方式给当前值减1并获取新值public final int addAndGet(int delta) : 以原子方式给当前值加delta并获取新值public final boolean compareAndSet(int expect, int update) : CAS 比较方法二 . 原子类的原理2.1 原理简述原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 contextswitch切换到另一个线程 , 之所以称为原子变量,是因为其包含一些以原子方式实现组合操作的方法回顾 CAS : CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值原子类主要通过 CAS (compare and swap) + volatile 和 native 方法来保证原子操作 !// 案例 : AtomicInteger // 它的主要内部成员是: private volatile int value; 注意,它的声明带有volatile,这是必需的,以保证内存可见性。 // 它的大部分更新方法实现都类似,我们看一个方法incrementAndGet,其代码为: public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; } } // 重点 : 1 . value 是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值 2 . UnSafe 类的objectFieldOffset()方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址 返回值是 valueOffset 先获取当前值current,计算期望的值next然后调用CAS方法进行更新,如果当前值没有变,则更新并返回新值,否则继续循环直到更新成功为止2.2 AtomicInteger 原理深入我们从源码看看那些之前被我们忽略的东西  , 此类可以代表大多数基本类型 // Node 1 : 原子类支持序列化 implements java.io.Serializable ----------------------> // Node 2 : CAS 对象 Unsafe , Unsafe 之前已经说过了, 其中有很多 Native 犯法 private static final Unsafe unsafe = Unsafe.getUnsafe(); ----------------------> // Node 3 : 偏移量 valueOffset private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } // objectFieldOffset() : 获取某个字段相对Java对象的“起始地址”的偏移量 , 后续通过便宜量获取方法 // getDeclaredField() : 返回一个字段对象,该对象反映由这个类对象表示的类或接口的指定声明字段 ----------------------> // Node 4 : 核心值 Value ,可以看到 value 使用 volatile 进行修饰 private volatile int value; ----------------------> // Node 5 : 操作方法 , 可以看到 valueOffset 此时已经发挥了作用 public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } ----------------------> // Node 6 : 值转换 , AtomicInteger 提供了以下四个值得固有转换方法 public int intValue() ; public long longValue() ; public float floatValue(); public double doubleValue(); 2.3 AtomicReference 深入现在看一下 AtomicReference 有什么特别之处 // Node 1 : 不再继承 Number 接口 // Node 2 : 使用泛型方法 public class AtomicReference<V> implements java.io.Serializable private volatile V value; // Node 3 : 比对时使用 putOrderedObject return unsafe.compareAndSwapObject(this, valueOffset, expect, update); 2.4 AtomicIntegerArray 深入相对而言 AtomicIntegerArray 有更多得变化 , 其他的大同小异 // Node 1 : 对数组元素偏移进行了记录 , 此处不再是 "value" 偏移 private static final int base = unsafe.arrayBaseOffset(int[].class); private static final int shift; private final int[] array; // Node 2 : 比较使用了 getAndSetInt unsafe.getAndSetInt(array, checkedByteOffset(i), newValue) 三 . 原子类的操作3.1 原子类常见案例AtomicInteger integer = new AtomicInteger(); logger.info("------> 1 > 获取原子变量 :[{}] <-------", integer.get()); // Step 2 : 设置参数 integer.set(999); logger.info("------> 2 > 获取原子变量 :[{}] <-------", integer.get()); logger.info("------> 失败比较获取 : 测试比较判断 :[{}] <-------", integer.compareAndSet(0, 998)); logger.info("------> 3 > 获取原子变量 :[{}] <-------", integer.get()); logger.info("------> 成功比较获取 : 测试比较判断 :[{}] <-------", integer.compareAndSet(999, 998)); logger.info("------> 4 > 获取原子变量 :[{}] <-------", integer.get()); // Step 3 : 获取当前的值,并设置新的值 logger.info("------> 测试比较判断 :[{}] <-------", integer.getAndSet(888)); logger.info("------> 5 > 获取原子变量 :[{}] <-------", integer.get()); // Step 4 : 获取当前的值,并设置新的值 logger.info("------> 测试比较判断 :[{}] <-------", integer.getAndIncrement()); logger.info("------> 6 > 获取原子变量 :[{}] <-------", integer.get()); // 以原子方式给当前值加1并获取新值 logger.info("------> 测试比较判断 :[{}] <-------", integer.incrementAndGet()); logger.info("------> 6-1 > 获取原子变量 :[{}] <-------", integer.get()); // Step 5 : 获取当前的值,并设置新的值 logger.info("------> 测试比较判断 :[{}] <-------", integer.getAndDecrement()); logger.info("------> 7 > 获取原子变量 :[{}] <-------", integer.get()); // 以原子方式给当前值减1并获取新值 logger.info("------> 测试比较判断 :[{}] <-------", integer.decrementAndGet()); logger.info("------> 7 > 获取原子变量 :[{}] <-------", integer.get()); // Step 6 : 获取当前的值,并设置新的值 logger.info("------> 测试比较判断 :[{}] <-------", integer.getAndAdd(99)); logger.info("------> 8 > 获取原子变量 :[{}] <-------", integer.get()); // 以原子方式给当前值加delta并获取新值 logger.info("------> 测试比较判断 :[{}] <-------", integer.addAndGet(99)); logger.info("------> 8 > 获取原子变量 :[{}] <-------", integer.get()); } 3.2 原子类测试多线程情况 /** * 测多线程方式 */ public void testThead() throws Exception { InnerTO innerTO = new InnerTO(); MyThread[] threadDSS = new MyThread[1000]; for (int i = 0; i < 1000; i++) { threadDSS[i] = new MyThread(innerTO); } for (int i = 0; i < 1000; i++) { threadDSS[i].start(); } logger.info("------> 原子类线程 Start 完成 :{} <-------", innerTO.getInteger().get()); for (int i = 0; i < 1000; i++) { if (i % 100 == 0) { Thread.sleep(1); logger.info("------> 测试原子类 :{} <-------", innerTO.getInteger().get()); } } } /** * 包含原子类的对象 * **/ class InnerTO { AtomicInteger integer = new AtomicInteger(); public AtomicInteger getInteger() { return integer; } public void setInteger(AtomicInteger integer) { this.integer = integer; } } /** * 运行线程类 * **/ class MyThread extends Thread { public InnerTO innerTO = new InnerTO(); public MyThread(InnerTO innerTO) { this.innerTO = innerTO; } @Override public void run() { int i = innerTO.getInteger().getAndIncrement(); if (i == 999) { logger.info("------> 线程执行完成 <-------"); } } } // 可以看到在没有锁的情况下 ,数据保证了原子性 ------> 原子类线程 Start 完成 :876 <------- ------> 测试原子类 :918 <------- ------> 测试原子类 :950 <------- ------> 测试原子类 :973 <------- ------> 测试原子类 :989 <------- ------> 线程执行完成 <------- ------> 测试原子类 :1000 <------- ------> 测试原子类 :1000 <------- ------> 测试原子类 :1000 <------- ------> 测试原子类 :1000 <------- ------> 测试原子类 :1000 <------- ------> 测试原子类 :1000 <------- 欢迎大家关注我的相关文档 多线程集合参考文档[芋道源码](http://www.iocoder.cn/JUC/sike/aqs-3/) [死磕系列](http://cmsblogs.com/?cat=151)

0
0
0
浏览量2014
带鱼

Java 多线程 : 漫谈 Volatile

一 .  volatile 基础> volatile 保证内存的可见性 并且 禁止指令重排 > volatile 提供 happens-before 的保证,确保一个线程的修改能对其他线程是可见的 > 保证线程可见性且提供了一定的有序性 // Java 中可以创建 volatile 类型数组,不过只是一个指向数组的引用,而不是整个数组。 二 .  volatile  深入知识点> 读写主存中的数据没有 CPU 中执行指令的速度快 , 为了提高效率 , 使用 CPU 高速缓存来提高效率 > CPU 高速缓存 : CPU高速缓存为某个CPU独有,只与在该CPU运行的线程有关 // 原理 @ https://www.cnblogs.com/xrq730/p/7048693.html Step 1 : 先说说 CPU 缓存 , CPU 有多级缓存 , 查询数据会由一级到三级中 一级缓存:简称L1 Cache,位于CPU内核的旁边,是与CPU结合最为紧密的CPU缓存 二级缓存:简称L2 Cache,分内部和外部两种芯片,内部芯片二级缓存运行速度与主频相同,外部芯片二级缓存运行速度则只有主频的一半 三级缓存:简称L3 Cache,部分高端CPU才有 // 缓存的加载次序 1 > 程序以及数据被加载到主内存 2 > 指令和数据被加载到CPU缓存 3 > CPU执行指令,把结果写到高速缓存 4 > 高速缓存中的数据写回主内存 // Step End : 因为不同的缓存 , 就出现了数据不一致 , 所以出现了规则 当一个CPU修改缓存中的字节时,服务器中其他CPU会被通知,它们的缓存将视为无效 三 . volatile 和 synchronized 的区别volatile 本质是在告诉 JVM 当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取。synchronized 则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住。volatile 仅能使用在变量级别。synchronized 则可以使用在变量、方法、和类级别的。volatile 仅能实现变量的修改可见性,不能保证原子性。而synchronized 则可以保证变量的修改可见性和原子性。volatile 不会造成线程的阻塞。synchronized 可能会造成线程的阻塞。volatile 标记的变量不会被编译器优化。synchronized标记的变量可以被编译器优化。注意 :volatile 不能取代 synchronized四 . volatile  原理观察加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码发现,加入volatile 关键字时,会多出一个 lock 前缀指令。lock 前缀指令,其实就相当于一个内存屏障。内存屏障是一组处理指令,用来实现对内存操作的顺序限制。volatile 的底层就是通过内存屏障来实现的Step 1 : 写volatile的时候生成汇编码是 lock addl $0x0, (%rsp)Step 2 : 在写操作之前使用了lock前缀,锁住了总线和对应的地址,这样其他的写和读都要等待锁的释放。Step 3 : 当写完成后,释放锁,把缓存刷新到主内存。读volatile就很好理解了,不需要额外的汇编指令,CPU发现对应地址的缓存被锁了,等待锁的释放,缓存一致性协议会保证它读到最新的值。只需要对写volatile的使用用lock对总线加锁就行了,这样其他的读、写操作等待总线释放才能继续读。Lock会让其他CPU的缓存invalide,从内存重新加载数据。// volatile 的内存语义 - 当写一个 volatile 变量时,JMM 会把该线程对应的本地内存中的共享变量值,立即刷新到主内存中。 - 当读一个 volatile 变量时,JMM 会把该线程对应的本地内存设置为无效,直接从主内存中读取共享变量 > 所以 volatile 的写内存语义是直接刷新到主内存中,读的内存语义是直接从主内存中读取。 // volatile 的内存语义实现原理 : 为了实现 volatile 的内存语义,JMM 会限制重排序 1. 如果第一个操作为 volatile 读,则不管第二个操作是啥,都不能重排序。 ?- 这个操作确保volatile 读之后的操作,不会被编译器重排序到 volatile 读之前; 2. 如果第二个操作为 volatile 写,则不管第一个操作是啥,都不能重排序。 ?- 这个操作确保volatile 写之前的操作,不会被编译器重排序到 volatile 写之后; 3. 当第一个操作 volatile 写,第二个操作为 volatile 读时,不能重排序。 // volatile 的底层实现 : 内存屏障 , 有了内存屏障, 就可以避免重排序 -> 对于编译器来说,发现一个最优布置来最小化插入内存屏障的总数几乎是不可能的,所以,JMM 采用了保守策略 • 在每一个 volatile 写操作前面,插入一个 StoreStore 屏障 - StoreStore 屏障:保证在 volatile 写之前,其前面的所有普通写操作,都已经刷新到主内存中。 • 在每一个 volatile 写操作后面,插入一个 StoreLoad 屏障 - StoreLoad 屏障:避免 volatile 写,与后面可能有的 volatile 读 / 写操作重排序。 • 在每一个 volatile 读操作后面,插入一个 LoadLoad 屏障 - LoadLoad 屏障:禁止处理器把上面的 volatile读,与下面的普通读重排序。 • 在每一个 volatile 读操作后面,插入一个 LoadStore 屏障 - LoadStore 屏障:禁止处理器把上面的 volatile读,与下面的普通写重排序。 五.  volatile   原子性> 我们需要区别 volatile 变量和 atomic 变量 // volatile 并不能很好的保证原子性 volatile 变量,可以确保先行关系,即写操作会发生在后续的读操作之前,但它并不能保证原子性。例如用 volatile 修饰 count 变量,那么 count++ 操作就不是原子性的。 AtomicInteger 类提供的 atomic 方法,可以让这种操作具有原子性。例如 #getAndIncrement() 方法,会原子性的进行增量操作把当前值加一,其它数据类型和引用变量也可以进行相似操作。 六 . volatile 源码TODO : 涉及到源码 ,先留坑 , 具体可以先看 @ https://www.cnblogs.com/xrq730/p/7048693.html // 主要节点 : 0x0000000002931351: lock add dword ptr [rsp],0h ; *putstatic instance; - org.xrq.test.design.singleton.LazySingleton::getInstance@13 (line 14) > 将双字节的栈指针寄存器+0 , 保证volatile关键字的内存可见性 // 基本概念一 : LOCK# 的作用 - 锁总线 - 其它CPU对内存的读写请求都会被阻塞,直到锁释放 - 不过实际后来的处理器都采用锁缓存替代锁总线 - 因为锁总线的开销比较大,锁总线期间其他CPU没法访问内存 - lock后的写操作会回写已修改的数据,同时让其它CPU相关缓存行失效,从而重新从主存中加载最新的数据 - 不是内存屏障却能完成类似内存屏障的功能,阻止屏障两遍的指令重排序 // 基本概念二 : 缓存行 - 缓存是分段(line)的,一个段对应一块存储空间 , 即缓存行 - CPU看到一条读取内存的指令时,它会把内存地址传递给一级数据缓存 - 一级数据缓存检测是否由缓存段 , 没有加载这缓存段 // 原因 : volatile 基于 缓存一致性来实现 Step1 : 因为LOCK 效率问题 ,所以基于缓存一致性来处理 Step2 : 缓存一致性作用时 使用多组缓存,但是它们的行为看起来只有一组缓存那样 Step3 : 常见的协议是 snooping 和 MESI Step4 : snooping 的作用是 : 仲裁所有的内存访问操作 七 . volatile 实测// 测试原子性 , 结果 ThreadC : ------> count :9823 <------- // Thread 中操作 public static void addCount() { for (int i = 0; i < 100; i++) { count++; } logger.info("------> count :{} <-------", count); } ThreadC[] threadCS = new ThreadC[100]; for (int i = 0; i < 100; i++) { threadCS[i] = new ThreadC(); } for (int i = 0; i < 100; i++) { threadCS[i].start(); } // 添加 synchronized 后 -- > ThreadD count :10000 <------- synchronized public static void addCount()

0
0
0
浏览量2014
带鱼

Java 多线程 : 细说线程状态

一. 线程等待// 等待具体时间 > sleep(time) // 该方式不释放锁 ,低优先级有机会执行 // sleep 后转入 阻塞(blocked) > wait(time) > join(time) > LockSupport.parkNanos() > LockSupport.parkUnit() > yield // 该方式同样不会释放锁 ,同优先级及高优先级执行 // 执行后转入ready // 仅 进入等待 > wait() > join() > LockSuppot.park() 二. 线程通知// 对于设定具体等待时间的 timeout 后自动转入就绪 // 其他等待 > notify() > notifyAll() > 不同线程之间采用字符串作为监视器锁,会唤醒别的线程 > 不同线程之间的信号没有共享,等待线程被唤醒后继续进入wait状态: > 下图为不同线程的等待与唤醒 > 执行wait () 时释放锁 , 否则等待的线程如果继续持有锁 , 其他线程就没办法获取锁 , 会陷入死锁 // Wait - Notify 深入知识点 // 一 : Wait 等待知识点 - 当前线程必须拥有这个对象的监视器 // 二 : Wait 等待后 - 执行等待后 , 当前线程将自己放在该对象的等待集中,然后放弃该对象上的所有同步声明 - 如果当前线程在等待之前或等待期间被任何线程中断 , 会抛出 InterruptedException 异常 // 三 : 唤醒时注意点 - Notify 唤醒一个正在等待这个对象的线程监控 (monitor) - 执行 wait 时会释放锁 , 同时执行 notify 后也会释放锁 (如下图) - notify 会任意选择一个等待对象来提醒 // 四 : 唤醒后知识点 - 线程唤醒后 , 仍然要等待该对象的锁被释放 - 线程唤醒后 , 将会与任何竞争该锁的对象公平竞争 // 假醒 : 线程也可以在不被通知、中断或超时的情况下被唤醒,这就是所谓的伪唤醒。 三. 线程中断> interrupt()  // 方法,用于中断线程。调用该方法的线程的状态为将被置为”中断”状态。 > interrupted () // 查询当前线程的中断状态,并且清除原状态。 > isInterrupted () // 查询指定线程的中断状态,不会清除原状态+ // interrupt() 方法干了什么 ? public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); } // 1 checkAccess() : 其中涉及到 SecurityManager , 所以我们先看看这个类干什么的 - SecurityManager security = System.getSecurityManager(); - security.checkAccess(this); C- SecurityManager : ?- 这是 Java.lang 底下的一个类 四.  线程死锁死锁简介 : 当多个进程竞争资源时互相等待对方的资源死锁的条件 :互斥条件 : 一个资源每次只能被一个进程使用,即在一段时间内某 资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。请求与保持条件 :进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源 已被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。不可剥夺条件 : 进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走,即只能 由获得该资源的进程自己来释放(只能是主动释放)。循环等待条件 : 若干进程间形成首尾相接循环等待资源的关系// 资源的分类 - 可抢占资源 : 可抢占资源指某进程在获得这类资源后,该资源可以再被其他进程或系统抢占 , 例如 CPU 资源 - 不可抢占资源 // 死锁的常见原因 : - 竞争不可抢占资源引起死锁 (共享文件) - 竞争可消耗资源引起死 (程通信时) - 进程推进顺序不当引起死锁 // 死锁的预防 - 通过系统中尽量破坏死锁的条件 , 当四大条件有一个不符合时 , 死锁就不会发生 - 通过加锁顺序处理(线程按照一定的顺序加锁) - 加锁时限(线程尝试获取锁的时候加上一定的时限,超过时限则放弃对该锁的请求,并释放自己占有的锁) - 死锁检测 // 死锁的解除 - 资源剥离 , 挂起死锁进程且强制对应资源 , 分配进程 - 撤销进程 - 回退进程 五.  线程热锁> 热锁不算一个物理概念 , 它表示线程在频繁的竞争资源并且资源在频繁的切换\ - 循环等待 : 六.   线程的状态及转换> 线程有以下状态 - NEW : 尚未启动的线程的hread状态 - RUNNABLE : 可运行 , 从虚拟机的角度 , 已经执行 ,但是可能正在等待资源 - BLOCKED : 阻塞 , 此时等待 monitor锁 , 以读取 synchronized 代码 - WAITING : 等待状态 , 处于等待状态的线程正在等待另一个线程执行特定操作 - wait() - join() - LockSupport#park() - TIMED_WAITING : 指定等待时间的等待 - Thread.sleep - wait(long) - join(long) - LockSupport#parkNanos - LockSupport#parkUntil - TERMINATED : 终止线程 // 线程间状态改变的方式 • 还没起床:sleeping 。 • 起床收拾好了,随时可以坐地铁出发:Runnable 。 • 等地铁来:Waiting 。 • 地铁来了,但要排队上地铁:I/O 阻塞 。 • 上了地铁,发现暂时没座位:synchronized 阻塞。 • 地铁上找到座位:Running 。 • 到达目的地:Dead 。 七.  状态转换的原理7.1 wait 与 notify 原理// 节点一 : 你是否发现 , wait 和 notify 是 object 的方法 点开 wait 和 notify 方法就能发现 , 这两个方法是基于 Object 对象的 , 所以我们要理解 ,通知不是通知的线程 ,而是通知的对象 这也就是为什么 , 不要用常量作为通知对象 // 节点二 : java.lang.IllegalMonitorStateException 当我们 wait/notify 时 , 如果未获取到对应对象的 Monitor , 实际上我们会抛出 IllegalMonitorStateException 所以你先要获得监视器 , 有三种方式 : - 通过执行该对象的同步实例方法。 - 通过执行在对象上同步语句体。 - 对于类型为Class的对象,可以执行该类的同步静态方法。 // 节点三 : 如何进行转换 ? Step 1 : 首先看 Object 对象 Native 方法 , bative src 中搜索名字即可 static JNINativeMethod methods[] = { {"hashCode", "()I", (void *)&JVM_IHashCode}, {"wait", "(J)V", (void *)&JVM_MonitorWait}, {"notify", "()V", (void *)&JVM_MonitorNotify}, {"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll}, {"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone}, }; //可以看到这里分别调用了 JVM_MonitorWait , JVM_MonitorNotify , JVM_MonitorNotifyAll , //从名字就能看到 , 这里是和Monitor 有关的 Step 2 : 进入全路径了解 : \openjdk\hotspot\src\share\vm\prims JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms)) JVMWrapper("JVM_MonitorWait"); Handle obj(THREAD, JNIHandles::resolve_non_null(handle)); JavaThreadInObjectWaitState jtiows(thread, ms != 0); if (JvmtiExport::should_post_monitor_wait()) { JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms); // 当前线程已经拥有监视器,并且还没有添加到等待队列中,因此当前线程不能成为后续线程 } ObjectSynchronizer::wait(obj, ms, CHECK); JVM_END Step 3 : ObjectSynchronizer::wait(obj, ms, CHECK); // TODO : 看不懂了呀....先留个坑 // 总得来说就是 ObjectMonitor通过一个双向链表来保存等待该锁的线程 Step End : link by @ https://www.jianshu.com/p/a604f1a9f875 void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { ................... // 创建ObjectWaiter,添加到_WaitSet队列中 ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag //WaitSetLock保护等待队列。通常只锁的拥有着才能访问等待队列 Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; //加入等待队列,等待队列是循环双链表 AddWaiter (&node) ; //使用的是一个简单的自旋锁 Thread::SpinRelease (&_WaitSetLock) ; ..................... } 7.2 Thread run// 节点一 : 区别 run 和 start run 是通过方法栈直接调用对象的方法 , 而 Start 才是开启线程 , 这一点我们可以从源码中发现 : - start 方法是同步的 - start0 是一个 native 方法 - group 是线程组 (ThreadGroup) , 线程可以访问关于它自己线程组的信息 ?- 线程组主要是为了管理线程 , 将一个大线程分成多个小线程 (盲猜 fork 用到了 , 后面验证一下) ?- 线程组也可以通过关闭组来关闭所有的线程 public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { } } } // Step 1 : Thread.c 结构 -> openjdk\src\native\java\lang static JNINativeMethod methods[] = { {"start0", "()V", (void *)&JVM_StartThread}, {"stop0", "(" OBJ ")V", (void *)&JVM_StopThread}, {"isAlive", "()Z", (void *)&JVM_IsThreadAlive}, {"suspend0", "()V", (void *)&JVM_SuspendThread}, {"resume0", "()V", (void *)&JVM_ResumeThread}, {"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority}, {"yield", "()V", (void *)&JVM_Yield}, {"sleep", "(J)V", (void *)&JVM_Sleep}, {"currentThread", "()" THD, (void *)&JVM_CurrentThread}, {"countStackFrames", "()I", (void *)&JVM_CountStackFrames}, {"interrupt0", "()V", (void *)&JVM_Interrupt}, {"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted}, {"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock}, {"getThreads", "()[" THD, (void *)&JVM_GetAllThreads}, {"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads}, {"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName}, }; // \openjdk\hotspot\src\share\vm\prims\jvm.cpp // Step 2 : JVM_StartThread , 翻译了一下 , 大概可以看到那一句 native_thread = new JavaThread(&thread_entry, sz); // 以及最后的 Thread::start(native_thread); JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_StartThread"); JavaThread *native_thread = NULL; //由于排序问题,在抛出异常时不能持有Threads_lock。示例:在构造异常时,可能需要获取heap_lock。 bool throw_illegal_thread_state = false; // 我们必须释放Threads_lock才能在Thread::start中post jvmti事件 { // 确保c++ Thread和OSThread结构体在操作之前没有被释放 MutexLocker mu(Threads_lock); //从JDK 5开始threadStatus用于防止重新启动一个已经启动的线程,所以我们通常会发现javthread是null。然而,对于JNI附加的线程,在创建的线程对象(带有javthread集)和更新其线程状态之间有一个小窗口,因此我们必须检查这一点 if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) { throw_illegal_thread_state = true; } else { //我们也可以检查stillborn标志来查看这个线程是否已经停止,但是由于历史原因,我们让线程在开始运行时检测它自己 jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread)); //分配c++线程结构并创建原生线程。 //从java检索到的堆栈大小是有符号的,但是构造函数接受size_t(无符号类型),因此避免传递负值,因为这会导致非常大的堆栈。 size_t sz = size > 0 ? (size_t) size : 0; native_thread = new JavaThread(&thread_entry, sz); // 此时可能由于缺少内存而没有为javthread创建osthread。检查这种情况并在必要时抛出异常。 // 最终,我们可能想要更改这一点,以便只在线程成功创建时才获取锁——然后我们也可以执行这个检查并在javthread构造函数中抛出异常。 if (native_thread->osthread() != NULL) { // 注意:当前线程没有在“prepare”中使用。 native_thread->prepare(jthread); } } } if (throw_illegal_thread_state) { THROW(vmSymbols::java_lang_IllegalThreadStateException()); } assert(native_thread != NULL, "Starting null thread?"); if (native_thread->osthread() == NULL) { // No one should hold a reference to the 'native_thread'. delete native_thread; if (JvmtiExport::should_post_resource_exhausted()) { JvmtiExport::post_resource_exhausted( JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS, "unable to create new native thread"); } THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(), "unable to create new native thread"); } Thread::start(native_thread); JVM_END 7.3 Thread yieldC- Thread M- yield() : 可以看到 , yield 同样是一个 native 方法 // Step 1: \openjdk\hotspot\src\share\vm\prims\jvm.cpp // 主要是2句 : os::sleep(thread, MinSleepInterval, false); // os::yield(); JVM_ENTRY(void, JVM_Yield(JNIEnv *env, jclass threadClass)) JVMWrapper("JVM_Yield"); if (os::dont_yield()) return; #ifndef USDT2 HS_DTRACE_PROBE0(hotspot, thread__yield); #else /* USDT2 */ HOTSPOT_THREAD_YIELD(); #endif /* USDT2 */ // 当ConvertYieldToSleep关闭(默认)时,这与传统VM的yield使用相匹配。对于类似的线程行为至关重要 if (ConvertYieldToSleep) { os::sleep(thread, MinSleepInterval, false); } else { os::yield(); } JVM_END // TODO : 主要的其实还没有看懂 , 毕竟 C基础有限

0
0
0
浏览量2014
带鱼

Java多线程 : 细说 synchronized

1.1 synchronized 简述synchronized 被称为重量级锁 , 但是 1.6 版本后得到了优化 , 相对轻量了很多, 它可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块 .主要操作对象是方法或者代码块中存在的共享数据, 同时可保证一个线程的变化(主要是共享数据的变化)被其他线程所看synchronized 的核心原理为 Java 对象头 以及 Monitor1.2 Java 对象头 和 Monitor对象头结构// 原理 --> 1 Java 对象头 和 Monitor |-> 对象头 : |-> Mark Word(标记字段)、Klass Pointer(类型指针) |-> Klass Pointer : 类元数据指针,决定是何数据 |-> Mark Word : 自身运行时数据 (hashcode,锁状态,偏向,标志位等) |-> Monitor : |-> 互斥 :一个 Monitor 锁在同一时刻只能被一个线程占用 // 关系 --> - Monitor 是一种对象类型 , 任何Java 对象都可以是 Monitor 对象 - 当Java 对象被 synchronized 修饰时 , 就可以当成 Monitor 对象进行处理 // Mark Word 和 Class Metadata Address 组成结构 -------------------------------------------------------------------------------------------------- 虚拟机位数 头对象结构 说明 |---------|-----------------------|---------------------------------------------------------------| 32/64bit Mark Word 存储对象的hashCode、锁信息或分代年龄或GC标志等信息 32/64bit Class Metadata Address 类型指针指向对象的类元数据,JVM通过这个指针确定该对象是哪个类的实例。 -------------------------------------------------------------------------------------------------- 32 位虚拟机 Mark Word >>>>64 位虚拟机 Mark Word >>>>数据结构// Monitor 的实现方式 @ https://blog.csdn.net/javazejian/article/details/70768369 ObjectMonitor中有两个队列以及一个区域 _WaitSet 和 _EntryList,用来保存ObjectWaiter对象列表( 每个等待锁的线程都会被封装成ObjectWaiter对象) _owner (指向持有ObjectMonitor对象的线程) 区域 1  当多个线程同时访问一段同步代码时,首先会进入 _EntryList 集合, 此时开始尝试获取monitor2  当线程获取到对象的monitor 后进入 _Owner 区域 ,并把 monitor中的owner变量 设置为当前线程同时monitor中的计数器count加13  若线程调用 wait() 方法,将释放当前持有的monitor,owner变量恢复为null,count自减1,同时该线程进入 WaitSet集合中等待被唤醒。4  若当前线程执行完毕 也将 释放monitor(锁) 并 复位变量的值,以便 其他线程进入获取monitor(锁)Monitor 指令monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处,JVM要保证每个monitorenter必须有对应的monitorexit与之匹配查看汇编情况 :// Step 1 : 准备简单的Demo public class SynchronizedService { public void method() { synchronized (this) { System.out.println("synchronized 代码块"); } } } // Step 2 : 查看汇编码 javap -c -s -v -l SynchronizedService.class // Step 3 : 注意其中 3 和 13 以及 19 public void method(); descriptor: ()V flags: ACC_PUBLIC Code: stack=2, locals=3, args_size=1 0: aload_0 1: dup 2: astore_1 3: monitorenter 4: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream; 7: ldc #3 // String synchronized 代码块 9: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 12: aload_1 13: monitorexit 14: goto 22 17: astore_2 18: aload_1 19: monitorexit 20: aload_2 21: athrow 22: return 处理逻辑详情 :// synchronized 源码分析 @ https://blog.csdn.net/javazejian/article/details/70768369 // 具体的流程可以参考上面博客的具体分析, 此处仅总结 , 大佬们肝真好 // synchronized 代码块底层逻辑 ( monitorenter 和 monitorexit ) > synchronized 同步语句块的实现使用的是 monitorenter 和 monitorexit 指令, 其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置 Start : 当执行monitorenter指令时,当前线程将试图获取 objectref(即对象锁) 所对应的 monitor 的持有权 Thread-1 : objectref.monitor = 0 --> 获取monitor --> 设置计数器值为 1 Thread-2 : 发现objectref.monitor = 0 --> 阻塞等待 --> Thread-1 执行 monitorexit , 计数器归 0 --> Thread-2 正常流程获取获取monitor 注意点 : 编译器将会确保无论方法通过何种方式完成,方法中调用过的每条 monitorenter 指令都有执行其对应 monitorexit 指令 , 方法异常时通过异常处理器处理异常结束 synchronized 方法底层逻辑 (ACC_SYNCHRONIZED标识)方法级的同步是隐式,即无需通过字节码指令来控制的,它实现在方法调用和返回操作之中。JVM可以从方法常量池中的方法表结构(method_info Structure) 中的 ACC_SYNCHRONIZED 访问标志区分一个方法是否同步方法。方法调用时,调用指令将会 检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置|- 如果设置了,执行线程将先持有monitor(虚拟机规范中用的是管程一词), 然后再执行方法,最后再方法完成(无论是正常完成还是非正常完成)时释放monitor。在方法执行期间,执行线程持有了monitor,其他任何线程都无法再获得同一个monitor异常处理 : 如果一个同步方法执行期间抛 出了异常,并且在方法内部无法处理此异常,那这个同步方法所持有的monitor将在异常抛到同步方法之外时自动释放synchronized 内存级原理// 最后生成的汇编语言 lock cmpxchg %r15, 0x16(%r10) 和 lock cmpxchg %r10, (%r11) synchronized的底层操作含义是先对对象头的锁标志位用lock cmpxchg的方式设置成"锁住"状态释放锁时,在用lock cmpxchg的方式修改对象头的锁标志位为"释放"状态,写操作都立刻写回主内存。JVM会进一步对synchronized时CAS失败的那些线程进行阻塞操作,这部分的逻辑没有体现在lock cmpxchg指令上,我猜想是通过某种信号量来实现的。lock cmpxchg 指令前者保证了可见性和防止重排序,后者保证了操作的原子性。1.3 synchronized 用法// 加锁方式 ,当前实例 ,当前class , 自定义object > synchronized(this) > synchronized(object) > synchronized(class) 或者静态代码块 synchronized关键字最主要的三种使用方式:修饰实例方法,作用于当前对象实例加锁,进入同步代码前要获得当前对象实例的锁修饰静态方法,作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁 。修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁。这里再提一下:synchronized关键字加到非 static 静态方法上是给对象实例上锁。另外需要注意的是 尽量不要使用 synchronized(String a), 部分字符串常量会缓冲到常量池里面, 不过可以试试 new String("a")1.4 synchronized 其他知识点解释 : synchronized 提供了一种独占式的加锁方式 ,其添加和释放锁的方式由JVM实现阻塞 : 当 synchronized  尝试获取锁的时候,获取不到锁,将会一直阻塞谈谈 synchronized和ReenTrantLock 的区别两者都是可重入锁synchronized 依赖于 JVM 而 ReenTrantLock 依赖于 APIReenTrantLock 比 synchronized 增加了一些高级功能synchronized 与等待唤醒机制 (notify/notifyAll和wait) 等待唤醒机制需要处于synchronized代码块或者synchronized方法中 , 调用这几个方法前必须拿到当前对象的监视器monitor对象synchronized 与 线程中断 线程的中断操作对于正在等待获取的锁对象的synchronized方法或者代码块并不起作用1.5  多线程中的锁概念1.5.1 锁按照等级分类锁可以按照以下等级进行升级 : 偏向锁 -> 轻量级锁 -> 重量级锁 , 锁的升级是单向的偏向锁轻量级锁自旋锁重量级锁1.5.2 锁的操作锁清除 :Java虚拟机在 JIT编译时 (可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间Java 常见的锁synchronized 关键字 , 重量锁ReentrantLock 重入锁ReadWriteLock 读写锁1.5.3 其他锁概念内部锁 :synchronized : 锁对象的引用 , 锁保护的代码块每个Java 对象都可以隐式地扮演一个用于同步的锁的角色 ,这些内置的锁被称为 内部锁 或 监视器锁 .公平锁/非公平锁公平锁是指多线程按照申请锁的顺序来获取锁,非公平锁指多个线程获取锁的顺序不是按照申请锁的顺序,有可能造成优先级反转或者饥饿现象,非公平锁的优点在于吞吐量比公平锁大,ReentrantLock默认非公平锁,可通过构造函数选择公平锁,Synchronized是非公平锁。可重入锁 可重入锁指在一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁,ReentrantLock与Synchronized都是可重入的。独享锁/共享锁 独享锁是指一个锁只能一个线程独有,共享锁指一个锁可被多个线程共享,对于ReadWriteLock,读锁是共享锁,写锁是独享所。互斥锁/读写锁 独享锁/共享锁是一种广义的说法,互斥锁/读写锁是其具体实现。乐观锁/悲观锁乐观锁与悲观锁是看待同步的角度不同,乐观锁认为对于同一个数据的修改操作,是不会有竞争的,会尝试更新,如果失败,不断重试。悲观锁与此相反,直接获取锁,之后再操作,最后释放锁。分段锁分段锁是一种设计思想,通过将一个整体分割成小块,在每个小块上加锁,提高并发。1.6 锁的转换过程对象头的变化可以看下图 , 说的很清楚了 @ https://www.cnblogs.com/jhxxb/p/10983788.html // 之前知道 , 锁的状态改变是单向的 , 由 偏向锁 -> 轻量级锁 -> 重量级锁 ,我们分别捋一下 // 偏向锁 -> 偏向锁, 即重偏向操作 1 对象先偏向于某个线程, 执行完同步代码后 , 进入安全点时,若需要重偏向,会把类对象中 epoch 值增加 2 退出安全点后 , 当有线程需要尝试获取偏向锁时, 直接检查类实例对象中存储的 epoch 值与类对象中存储的 epoch 值是否相等, 如果不相等, 则说明该对象的偏向锁已经无效了, 可以尝试对此对象重新进行偏向操作。 // 偏向锁 -> 轻量级锁 1 当发现对象已被锁定 ,且 ThreadID 不是自己 , 转为 偏向锁 , 在该线程的栈帧中建立 Lock Record 空间 1.7 为什么锁会转换// 这要从机制说起 , 每一种锁都有各自的特点 > 偏向锁 - 优点 : 无 CAS ,消耗少 , 性能高 , 可重入 - 缺点 : 锁竞争时撤销锁消耗高 - 场景 : 同一个线程执行同步代码 > 轻量级锁 - 优点 : 竞争的线程不会阻塞 - 缺点 : 轻量级锁未获取锁时会通过自旋获取 , 消耗资源 - 场景 : 线程交替执行同步块或者同步方法,追求响应时间,锁占用时间很短 > 重量级锁 - 优点 : 线程竞争不使用自旋 , 只会唤醒和等待 - 缺点 : 造成线程阻塞 , 锁的改变也消耗资源 - 场景 : 追求吞吐量,锁占用时间较长 // 针对不同的需求 , 选择合适的锁 , 达到业务目的 1.8 Synchoized 源码 synchronized 是一个修饰符 , 我们需要从 C 的角度去看 Step 1 : 下载 OpenJDK 代码 https://blog.csdn.net/leisure_life/article/details/108367675 Step 2 : 根据代码索引 .c 文件 // TODO 1.9 Synchoized 用法public void operation(Integer check) { /** * 校验无锁情况 */ public void functionShow(Integer check) { logger.info("------> check is {} <-------", check); if (check == 0) { showNum = 100; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else if (check == 1) { showNum = 200; } logger.info("------> check is Over {} :{}", check, showNum); } /** * 同步方法 , 校验 synchronized 方法 */ synchronized public void functionShowSynchronized(Integer check) { logger.info("------> check is {} <-------", check); if (check == 0) { showNum = 100; try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else if (check == 1) { showNum = 200; } logger.info("------> check is Over synchronized {} :{}", check, showNum); } /** * 校验 synchronized 代码块 */ public void statementShowSynchronized(Integer check) { logger.info("------> check is {} <-------", check); synchronized (this) { if (check == 0) { showNum = 100; try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else if (check == 1) { showNum = 200; } } logger.info("------> check is Over synchronized {} :{}", check, showNum); } // 校验 代码块以 Class 为对象 public void classShowSynchronized(Integer check) { logger.info("check is {} <-------", check); synchronized (CommonTO.class) { if (check == 0) { showNum = 100; try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else if (check == 1) { showNum = 200; } } logger.info("check is Over synchronized {} :{}", check, showNum); } // 同步代码块 Object public void objectShowSynchronized(Integer check) { logger.info("check is {} <-------", check); synchronized (lock) { if (check == 0) { showNum = 100; try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else if (check == 1) { showNum = 200; } } logger.info("check is Over synchronized {} :{}", check, showNum); } // 同步代码块 Object public void objectStringShowSynchronized(Integer check) { logger.info("check is {} <-------", check); synchronized (lock2) { if (check == 0) { showNum = 100; try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else if (check == 1) { showNum = 200; } } logger.info("check is Over synchronized {} :{}", check, showNum); }

0
0
0
浏览量2021