① “高并发”两种异步模型与深度解析Future接口-
大家好,我是冰河~~
本文有点长,但是满满的干货,以实际案例的形式分析了两种异步模型,并从源码角度深度解析Future接口和FutureTask类,希望大家踏下心来,打开你的IDE,跟着文章看源码,相信你一定收获不小!
在java的并发编程中,大体上会分为两种异步编程模型,一类是直接以异步的形式来并行运行其他的任务,不需要返回任务的结果数据。一类是以异步的形式运行其他任务,需要返回结果。
1.无返回结果的异步模型
无返回结果的异步任务,可以直接将任务丢进线程或线程池中运行,此时,无法直接获得任务的执行结果数据,一种方式是可以使用回调方法来获取任务的运行结果。
具体的方案是:定义一个回调接口,并在接口中定义接收任务结果数据的方法,具体逻辑在回调接口的实现类中完成。将回调接口与任务参数一同放进线程或线程池中运行,任务运行后调用接口方法,执行回调接口实现类中的逻辑来处理结果数据。这里,给出一个简单的示例供参考。
便于接口的通用型,这里为回调接口定义了泛型。
回调接口的实现类主要用来对任务的返回结果进行相应的业务处理,这里,为了方便演示,只是将结果数据返回。大家需要根据具体的业务场景来做相应的分析和处理。
任务的执行类是具体执行任务的类,实现Runnable接口,在此类中定义一个回调接口类型的成员变量和一个String类型的任务参数(模拟任务的参数),并在构造方法中注入回调接口和任务参数。在run方法中执行任务,任务完成后将任务的结果数据封装成TaskResult对象,调用回调接口的方法将TaskResult对象传递到回调方法中。
到这里,整个大的框架算是完成了,接下来,就是测试看能否获取到异步任务的结果了。
在测试类中,使用Thread类创建一个新的线程,并启动线程运行任务。运行程序最终的接口数据如下所示。
大家可以细细品味下这种获取异步结果的方式。这里,只是简单的使用了Thread类来创建并启动线程,也可以使用线程池的方式实现。大家可自行实现以线程池的方式通过回调接口获取异步结果。
2.有返回结果的异步模型
尽管使用回调接口能够获取异步任务的结果,但是这种方式使用起来略显复杂。在JDK中提供了可以直接返回异步结果的处理方案。最常用的就是使用Future接口或者其实现类FutureTask来接收任务的返回结果。
使用Future接口往往配合线程池来获取异步执行结果,如下所示。
运行结果如下所示。
FutureTask类既可以结合Thread类使用也可以结合线程池使用,接下来,就看下这两种使用方式。
结合Thread类的使用示例如下所示。
运行结果如下所示。
结合线程池的使用示例如下。
运行结果如下所示。
可以看到使用Future接口或者FutureTask类来获取异步结果比使用回调接口获取异步结果简单多了。注意:实现异步的方式很多,这里只是用多线程举例。
接下来,就深入分析下Future接口。
1.Future接口
Future是JDK1.5新增的异步编程接口,其源代码如下所示。
可以看到,在Future接口中,总共定义了5个抽象方法。接下来,就分别介绍下这5个方法的含义。
取消任务的执行,接收一个boolean类型的参数,成功取消任务,则返回true,否则返回false。当任务已经完成,已经结束或者因其他原因不能取消时,方法会返回false,表示任务取消失败。当任务未启动调用了此方法,并且结果返回true(取消成功),则当前任务不再运行。如果任务已经启动,会根据当前传递的boolean类型的参数来决定是否中断当前运行的线程来取消当前运行的任务。
判断任务在完成之前是否被取消,如果在任务完成之前被取消,则返回true;否则,返回false。
这里需要注意一个细节:只有任务未启动,或者在完成之前被取消,才会返回true,表示任务已经被成功取消。其他情况都会返回false。
判断任务是否已经完成,如果任务正常结束、抛出异常退出、被取消,都会返回true,表示任务已经完成。
当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成并返回任务的结果数据。
当任务完成时,直接返回任务的结果数据;当任务未完成时,等待任务完成,并设置了超时等待时间。在超时时间内任务完成,则返回结果;否则,抛出TimeoutException异常。
2.RunnableFuture接口
Future接口有一个重要的子接口,那就是RunnableFuture接口,RunnableFuture接口不但继承了Future接口,而且继承了java.lang.Runnable接口,其源代码如下所示。
这里,问一下,RunnableFuture接口中有几个抽象方法?想好了再说!哈哈哈。。。
这个接口比较简单run()方法就是运行任务时调用的方法。
3.FutureTask类
FutureTask类是RunnableFuture接口的一个非常重要的实现类,它实现了RunnableFuture接口、Future接口和Runnable接口的所有方法。FutureTask类的源代码比较多,这个就不粘贴了,大家自行到java.util.concurrent下查看。
(1)FutureTask类中的变量与常量
在FutureTask类中首先定义了一个状态变量state,这个变量使用了volatile关键字修饰,这里,大家只需要知道volatile关键字通过内存屏障和禁止重排序优化来实现线程安全,后续会单独深度分析volatile关键字是如何保证线程安全的。紧接着,定义了几个任务运行时的状态常量,如下所示。
其中,代码注释中给出了几个可能的状态变更流程,如下所示。
接下来,定义了其他几个成员变量,如下所示。
又看到我们所熟悉的Callable接口了,Callable接口那肯定就是用来调用call()方法执行具体任务了。
看一下WaitNode类的定义,如下所示。
可以看到,WaitNode类是FutureTask类的静态内部类,类中定义了一个Thread成员变量和指向下一个WaitNode节点的引用。其中通过构造方法将thread变量设置为当前线程。
(2)构造方法
接下来,是FutureTask的两个构造方法,比较简单,如下所示。
(3)是否取消与完成方法
继续向下看源码,看到一个任务是否取消的方法,和一个任务是否完成的方法,如下所示。
这两方法中,都是通过判断任务的状态来判定任务是否已取消和已完成的。为啥会这样判断呢?再次查看FutureTask类中定义的状态常量发现,其常量的定义是有规律的,并不是随意定义的。其中,大于或者等于CANCELLED的常量为CANCELLED、INTERRUPTING和INTERRUPTED,这三个状态均可以表示线程已经被取消。当状态不等于NEW时,可以表示任务已经完成。
通过这里,大家可以学到一点:以后在编码过程中,要按照规律来定义自己使用的状态,尤其是涉及到业务中有频繁的状态变更的操作,有规律的状态可使业务处理变得事半功倍,这也是通过看别人的源码设计能够学到的,这里,建议大家还是多看别人写的优秀的开源框架的源码。
(4)取消方法
我们继续向下看源码,接下来,看到的是cancel(boolean)方法,如下所示。
接下来,拆解cancel(boolean)方法。在cancel(boolean)方法中,首先判断任务的状态和CAS的操作结果,如果任务的状态不等于NEW或者CAS的操作返回false,则直接返回false,表示任务取消失败。如下所示。
接下来,在try代码块中,首先判断是否可以中断当前任务所在的线程来取消任务的运行。如果可以中断当前任务所在的线程,则以一个Thread临时变量来指向运行任务的线程,当指向的变量不为空时,调用线程对象的interrupt()方法来中断线程的运行,最后将线程标记为被中断的状态。如下所示。
这里,发现变更任务状态使用的是UNSAFE.putOrderedInt()方法,这个方法是个什么鬼呢?点进去看一下,如下所示。
可以看到,又是一个本地方法,嘿嘿,这里先不管它,后续文章会详解这些方法的作用。
接下来,cancel(boolean)方法会进入finally代码块,如下所示。
可以看到在finallly代码块中调用了finishCompletion()方法,顾名思义,finishCompletion()方法表示结束任务的运行,接下来看看它是如何实现的。点到finishCompletion()方法中看一下,如下所示。
在finishCompletion()方法中,首先定义一个for循环,循环终止因子为waiters为null,在循环中,判断CAS操作是否成功,如果成功进行if条件中的逻辑。首先,定义一个for自旋循环,在自旋循环体中,唤醒WaitNode堆栈中的线程,使其运行完成。当WaitNode堆栈中的线程运行完成后,通过break退出外层for循环。接下来调用done()方法。done()方法又是个什么鬼呢?点进去看一下,如下所示。
可以看到,done()方法是一个空的方法体,交由子类来实现具体的业务逻辑。
当我们的具体业务中,需要在取消任务时,执行一些额外的业务逻辑,可以在子类中覆写done()方法的实现。
(5)get()方法
继续向下看FutureTask类的代码,FutureTask类中实现了两个get()方法,如下所示。
没参数的get()方法为当任务未运行完成时,会阻塞,直到返回任务结果。有参数的get()方法为当任务未运行完成,并且等待时间超出了超时时间,会TimeoutException异常。
两个get()方法的主要逻辑差不多,一个没有超时设置,一个有超时设置,这里说一下主要逻辑。判断任务的当前状态是否小于或者等于COMPLETING,也就是说,任务是NEW状态或者COMPLETING,调用awaitDone()方法,看下awaitDone()方法的实现,如下所示。
接下来,拆解awaitDone()方法。在awaitDone()方法中,最重要的就是for自旋循环,在循环中首先判断当前线程是否被中断,如果已经被中断,则调用removeWaiter()将当前线程从堆栈中移除,并且抛出InterruptedException异常,如下所示。
接下来,判断任务的当前状态是否完成,如果完成,并且堆栈句柄不为空,则将堆栈中的当前线程设置为空,返回当前任务的状态,如下所示。
当任务的状态为COMPLETING时,使当前线程让出CPU资源,如下所示。
如果堆栈为空,则创建堆栈对象,如下所示。
如果queued变量为false,通过CAS操作为queued赋值,如果awaitDone()方法传递的timed参数为true,则计算超时时间,当时间已超时,则在堆栈中移除当前线程并返回任务状态,如下所示。如果未超时,则重置超时时间,如下所示。
如果不满足上述的所有条件,则将当前线程设置为等待状态,如下所示。
接下来,回到get()方法中,当awaitDone()方法返回结果,或者任务的状态不满足条件时,都会调用report()方法,并将当前任务的状态传递到report()方法中,并返回结果,如下所示。
看来,这里还要看下report()方法啊,点进去看下report()方法的实现,如下所示。
可以看到,report()方法的实现比较简单,首先,将outcome数据赋值给x变量,接下来,主要是判断接收到的任务状态,如果状态为NORMAL,则将x强转为泛型类型返回;当任务的状态大于或者等于CANCELLED,也就是任务已经取消,则抛出CancellationException异常,其他情况则抛出ExecutionException异常。
至此,get()方法分析完成。注意:一定要理解get()方法的实现,因为get()方法是我们使用Future接口和FutureTask类时,使用的比较频繁的一个方法。
(6)set()方法与setException()方法
继续看FutureTask类的代码,接下来看到的是set()方法与setException()方法,如下所示。
通过源码可以看出,set()方法与setException()方法整体逻辑几乎一样,只是在设置任务状态时一个将状态设置为NORMAL,一个将状态设置为EXCEPTIONAL。
至于finishCompletion()方法,前面已经分析过。
(7)run()方法与runAndReset()方法
接下来,就是run()方法了,run()方法的源代码如下所示。
可以这么说,只要使用了Future和FutureTask,就必然会调用run()方法来运行任务,掌握run()方法的流程是非常有必要的。在run()方法中,如果当前状态不是NEW,或者CAS操作返回的结果为false,则直接返回,不再执行后续逻辑,如下所示。
接下来,在try代码块中,将成员变量callable赋值给一个临时变量c,判断临时变量不等于null,并且任务状态为NEW,则调用Callable接口的call()方法,并接收结果数据。并将ran变量设置为true。当程序抛出异常时,将接收结果的变量设置为null,ran变量设置为false,并且调用setException()方法将任务的状态设置为EXCEPTIONA。接下来,如果ran变量为true,则调用set()方法,如下所示。
接下来,程序会进入finally代码块中,如下所示。
这里,将runner设置为null,如果任务的当前状态大于或者等于INTERRUPTING,也就是线程被中断了。则调用()方法,接下来,看下()方法的实现。
可以看到,()方法的实现比较简单,当任务的状态为INTERRUPTING时,使用while()循环,条件为当前任务状态为INTERRUPTING,将当前线程占用的CPU资源释放,也就是说,当任务运行完成后,释放线程所占用的资源。
runAndReset()方法的逻辑与run()差不多,只是runAndReset()方法会在finally代码块中将任务状态重置为NEW。runAndReset()方法的源代码如下所示,就不重复说明了。
(8)removeWaiter()方法
removeWaiter()方法中主要是使用自旋循环的方式来移除WaitNode中的线程,比较简单,如下所示。
最后,在FutureTask类的最后,有如下代码。
关于这些代码的作用,会在后续深度解析CAS文章中详细说明,这里就不再探讨。
至此,关于Future接口和FutureTask类的源码就分析完了。
好了,今天就到这儿吧,我是冰河,我们下期见~~
② IAsyncResult异步编程
IAsyncResult 异步设计模式通过名为 BeginOperationName 和 EndOperationName 的两个方法来实现原同步方法的异步调用,如 FileStream 类提供了 BeginRead 和 EndRead 方法来从文件异步读取字节,它们是 Read 方法的异步版本。
IAsyncResult接口:
补充说明:WaitHandle用来封装等待对共享资源进行独占访问的操作系统特定的对象。它是一个抽象类,我们一般不直接用,而是用它的派生类:
下面了解一些详细使用。
BeginInvoke 方法可以使用线程异步地执行委托所指向的方法。然后通过 EndInvoke 方法获得方法的返回值(EndInvoke 方法的返回值就是被调用方法的返回值),或是确定方法已经被成功调用。
WaitOne的第一个参数表示要等待的毫秒数,在指定时间之内,WaitOne方法将一直等待,直到异步调用完成,并发出通知,WaitOne方法才返回true。当等待指定时间之后,异步调用仍未完成,WaitOne方法返回false,如果指定时间为0,表示不等待,如果为-1,表示永远等待,直到异步调用完成。
重点需要注意BeginInvoke方法的参数传递方式:
前面是其委托本身的参数。 倒数第二个参数(MethodCompleted)是回调方法委托类型,他是回调方法的委托,此委托没有返回值,有一个IAsyncResult类型的参数,当method方法执行完后,系统会自动调用MethodCompleted方法。 最后一个参数(task)需要向MethodCompleted方法中传递一些值,一般可以传递被调用方法的委托,这个值可以使用IAsyncResult.AsyncState属性获得。
③ 如何进行nodejs异步编程
更新下,我之所以让您玩一下AJAX,是希望您体验一下异步,并不是希望您了解AJAX这机制的实现方法,因为AJAX是一个特别典型且简单的异步场景,比如:
执
行某个函数 -> 执行语句A,B,C,D -> 在D语句发起异步请求,同时向引擎注册一个回调事件 -> 执行E,F,G
->退出函数块 ,引擎Loop...Loop...Loop,此时异步的请求得到了Response,之前注册的回调被执行。
@VILIC VANE
也提到了,实际上Node.js主要是为了应对主流web
app存在大量I/O等待而CPU闲置的场景所衍生的解决方案,而在架构上,它的后端有一个底层的worker封装,每当你有一个诸如addUser这样
的I/O操作时,它们都会被交由worker去执行从而达到让出尽快让出当前函数的执行权的目的,在向引擎注册完回调后,内部会通过事件轮询去检查该I
/O事件的句柄,当句柄显示该事件操作完成后,则注册的回调则被执行。
所以,假设有人(按题设,简化一下场景,有且只有2个人)同时请求
addUser(A)和userList(B),B的请求会在执行完A的请求内部所有同步代码后被执行,而哪怕worker此时仍然在进行addUser
这一 I/O操作,用户B也并不会被引擎挂起或者等待。这就是为什么Node.js单节点却一样可以拥有高负载能力的原因。
至于什么样的代码是异步的,你看看node文档里fs模块的使用方法就知道了,大概的形式就是如下这种。
mole.method( args [,callback] )
当然还有一种比较极端的情况,假设您使用的数据库是山寨的,驱动是基于同步实现的,那么B就该等多久等多久把,树荫底下喝杯茶,下个棋,和后面的C,D,E,F,G打个招呼呗~
我推荐您先去玩一下前端的AJAX了解一下 异步编程方式,体验一下异步的“感觉”,然后看一本叫《JavaScript异步编程》的书。
Node.js
是一款基于Event-driven的模型构建的Framework,它典型的特征就是通过内置的事件轮询来调度事件,通常来说node.js的数据库驱
动都是基于异步实现的,所以在实际情况中,A提交博客和B注册用户这两个请求是可以同时由Node.js
来handle,并按照实际操作的处理事件分别调度给予浏览器响应。
当然,假设您在业务代码里写了一个耗时很久的同步代码(比如直接写一
个while(true)的loop,Node就死了),由于JavaScript本身单线程的限制,所以整个App就会被block住,后续的事件/程
序只有等到该段代码执行完成之后才会被处理,这也是为什么我们通常不建议在Node.js层做大规模计算(JS本身的计算效率太低,会导致Node吞吐量
会大大降低),而倾向由C++的拓展去实现。
④ c++中的异步编程——future,promise
c++中创建线程很方便,例如下面
但是当我想异步获取线程的执行结果,就不太方便,join()并不能返回结果。
std::async,通过这个异步接口可以很方便的获取线程函数的执行结果。std::async会自动创建一个线程去调用线程函数,它返回一个std::future,这个future中存储了线程函数返回的结果,当我们需要线程函数的结果时,直接从future中获取,非常方便。
std::future提供了一种访问异步操作结果的机制。从字面意思来理解,它表示未来,这个名字非常贴切,因为一个异步操作我们是不可能马上就获取操作结果的,只能在未来某个时候获取,但是我们可以以同步等待的方式来获取结果,可以通过查询future的状态(future_status)来获取异步操作的结果。future_status有三种状态:
获取future结果有三种方式:
通过成员函数set_value可以设置std::promise中保存的值,该值最终会被与之关联的std::future::get读取到。需要注意的是:set_value只能被调用一次,多次调用会抛出std::future_error异常
输出
分桶常用,返回下界的iter
通常这么找下标
⑤ Python异步编程--greenlet
是使用一个任务调度器和一些生成器或者协程实现协作式用户空间多线程的一种伪并发机制,即所谓的微线程。
当创建一个 greenlet 时,它会获得一个最初为空的堆栈;当你第一次切换到它时,它开始运行一个指定的函数,它可能会调用其他函数,切换出greenlet等。安装 pip install greenlet。
控制台输出:
⑥ 关于generator异步编程的理解以及如何动手写
关于generator异步编程的理解以及如何动手写一个co模块
generator出现之前,想要实现对异步队列中任务的流程控制,大概有这么一下几种方式:
回调函数
事件监听
发布/订阅
promise对象
第一种方式想必大家是最常见的,其代码组织方式如下:
我们把函数放到run的执行器里面,便实现了同步操作异步代码的过程
⑦ 什么是同步编程、异步编程
同步编程:传统的同步编程是一种请求响应模型,调用一个方法,等待其响应返回。就是一个线程获得了一个任务,然后去执行这个任务, 当这个任务执行完毕后,才能执行接下来的另外一个任务。
异步编程:异步编程就是要重新考虑是否需要响应的问题,也就是缩小需要响应的地方。因为越快获得响应,就是越同步化,顺序化,事务化,性能差化,异步编程通常是通过fire and forget方式实现。
(7)异步编程实现扩展阅读:
在同步编程中,所有的操作都是顺序执行的,比如从socket中读取(请求),然后写入(回应)到socket中,每一个操作都是阻塞的。
异步编程的原则是,让进程处理多个并发执行的上下文来模拟并行处理方式 ,异步应用使用一个事件循环,当一个事件触发暂停或恢复执行上下文:
只有一个上下文处于活动状态,上下文之间进行轮替,代码中的显示指令告诉事件循环,哪里可以暂停执行,这时,进程将查找其他待处理的线程进行恢复,最终,进程将回到函数暂停的地方继续运行,从一个执行上下文移到另一个上下文称为切换。
⑧ Python异步编程7:异步迭代器
迭代器:在其内部实现yield方法和next方法的对象。可迭代对象:在类内部实现一个iter方法,并返回一个迭代器。
异步迭代器:实现了__aiter__()和__anext__()方法的对象,必须返回一个awaitable对象。async_for支持处理异步迭代器的
__anext__()方法返回的可等待对象,直到引发一个stopAsyncIteration异常,这个改动由PEP 492引入。
异步可迭代对象:可在async_for语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous_iterator(异步迭代器). 这个改动由PEP 492引入。
示例: 不能直接写在普通方法或者暴露在外面。必须写在协程函数,任意协程函数均可。
⑨ Async和Await如何简化异步编程,几个实例让
同步代码存在的问题
对于同步的代码,大家肯定都不陌生,因为我们平常写的代码大部分都是同步的,然而同步代码却存在一个很严重的问题,例如我们向一个Web服务器发出一个请求时,如果我们发出请求的代码是同步实现的话,这时候我们的应用程序就会处于等待状态,直到收回一个响应信息为止,然而在这个等待的状态,对于用户不能操作任何的UI界面以及也没有任何的消息,如果我们试图去操作界面时,此时我们就会看到”应用程序为响应”的信息(在应用程序的窗口旁),相信大家在平常使用桌面软件或者访问web的时候,肯定都遇到过这样类似的情况的,对于这个,大家肯定会觉得看上去非常不舒服。引起这个原因正是因为代码的实现是同步实现的,所以在没有得到一个响应消息之前,界面就成了一个”卡死”状态了,所以这对于用户来说肯定是不可接受的,因为如果我要从服务器上下载一个很大的文件时,此时我们甚至不能对窗体进行关闭的操作的。为了具体说明同步代码存在的问题(造成界面开始),下面通过一个程序让大家更形象地看下问题所在:
//单击事件
privatevoidbtnClick_Click(objectsender,EventArgse)
{
this.btnClick.Enabled=false;
longlength=AccessWeb();
this.btnClick.Enabled=true;
//这里可以做一些不依赖回复的操作
OtherWork();
this.richTextBox1.Text+=String.Format(" 回复的字节长度为:{0}. ",length);
txbMainThreadID.Text=Thread.CurrentThread.ManagedThreadId.ToString();
}
privatelongAccessWeb()
{
MemoryStreamcontent=newMemoryStream();
//对MSDN发起一个Web请求
HttpWebRequestwebRequest=WebRequest.Create("http://msdn.microsoft.com/zh-cn/")asHttpWebRequest;
if(webRequest!=null)
{
//返回回复结果
using(WebResponseresponse=webRequest.GetResponse())
{
using(StreamresponseStream=response.GetResponseStream())
{
responseStream.CopyTo(content);
}
}
}
txbAsynMethodID.Text=Thread.CurrentThread.ManagedThreadId.ToString();
returncontent.Length;
}
⑩ 什么是异步编程
传统的同步编程是一种请求响应模型,调用一个方法,等待其响应返回.
异步编程就是要重新考虑是否需要响应的问题,也就是缩小需要响应的地方。因为越快获得响应,就是越同步化,顺序化,事务化,性能差化。
异步编程通常是通过fire and forget方式实现,发射事件后即忘记,做别的事情了,无需立即等待刚才发射的响应结果了。(发射事件的地方称为生产者,而将在另外一个地方响应事件的处理者称为消费者).异步编程是一种事件驱动编程,需要完全改变思路,将“请求响应”的思路转变到“事件驱动”思路上,是一种软件编程思维的转变.下面几种你看参考一下
1、异步编程模型 (APM) 模式(也称为 IAsyncResult 模式),其中异步操作要求 Begin 和 End 方法(例如,异步写操作的 BeginWrite 和 EndWrite)。对于新的开发工作不再建议采用此模式。
2、基于事件的异步模式 (EAP) 需要一个具有 Async 后缀的方法,还需要一个或多个事件、事件处理程序、委托类型和 EventArg 派生的类型。EAP 是在 .NET Framework 2.0 版中引入的。对于新的开发工作不再建议采用此模式。
3、基于任务的异步模式 (TAP),该模式使用一个方法表示异步操作的启动和完成。.NET Framework 4 中引入了 TAP,并且是 .NET Framework 中异步编程的建议方法。