参考:
spring 中的事件机制:https://www.cnblogs.com/rickiyang/p/12001524.html
观察者模式 : http://c.biancheng.net/view/1390.html
观察者模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。
观察者模式主要优点如下。
它的主要缺点如下。
实现观察者模式时要注意具体目标对象和具体观察者对象之间不能直接调用,否则将使两者之间紧密耦合起来,这违反了面向对象的设计原则。
以一个小栗子说明,发工资了,老婆孩子都有自己的花费
public class MyObserverDesign {
public static void main(String[] args) {
// 发九月工资了
MyMon myMon = new OctberMon();
myMon.addMyRelative(new MyWife());
myMon.addMyRelative(new MySon());
myMon.addMyRelative(new MyDaughter());
myMon.payOff();
}
// 抽象目标: 我的工资
public static abstract class MyMon {
protected List<MyRelative> myRelatives = new ArrayList<>();
public void addMyRelative(MyRelative myRelative) {
myRelatives.add(myRelative);
}
public void removeMyRelative(MyRelative myRelative) {
myRelatives.remove(myRelative);
}
// 发工资了
public abstract void payOff();
}
// 具体目标:11 月的工资
public static class OctberMon extends MyMon{
@Override
public void payOff() {
System.out.println("宝贝们,发工资了,你们想要什么");
for (MyRelative obs : myRelatives) {
obs.spendMoney();
}
}
}
// 抽象观察者 我的亲人
public interface MyRelative {
// 等着花钱
public void spendMoney();
}
// 具体观察者 我的老婆
public static class MyWife implements MyRelative {
@Override
public void spendMoney() {
System.out.println("老公我们去旅游吧~");
}
}
public static class MySon implements MyRelative {
@Override
public void spendMoney() {
System.out.println("爸爸,我要蜘蛛侠~");
}
}
public static class MyDaughter implements MyRelative {
@Override
public void spendMoney() {
System.out.println("爸爸,我要漂亮娃娃~");
}
}
}
执行结果
宝贝们,发工资了,你们想要什么
老公我们去旅游吧~
爸爸,我要蜘蛛侠~
爸爸,我要漂亮娃娃~
Java中提供了基本的事件处理基类:
同样的将上面的观察者模型场景进行改写
创建发工资的事件继承自 EventObject
/**
* 发工资事件
*/
public class PayOffEvent extends EventObject {
/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public PayOffEvent(Object source) {
super(source);
}
}
然后创建监听器,继承EventListener
,老婆孩子等着用钱呢 -_-
public interface MyRelative extends EventListener { void spendMoney();}
具体监听的实现类1
public class MyWife implements MyRelative {
@Override
public void spendMoney() {
System.out.println("老公我们去旅游吧~");
}
}
具体监听的实现类2
public class MySon implements MyRelative {
@Override
public void spendMoney() {
System.out.println("爸爸,我要蜘蛛侠~");
}
}
source
public class Source {
Set<MyRelative> listeners = new HashSet<>();
public void addStateChangeListener(MyRelative listener) {
listeners.add(listener);
}
public void removeStateChangeListener(MyRelative listener) {
listeners.remove(listener);
}
public void notifyListener(){
for (MyRelative listener : listeners) {
listener.spendMoney();
}
}
}
测试
public static void main(String[] args) {
Source source = new Source();
source.addStateChangeListener(new MyWife());
source.addStateChangeListener(new MySon());
source.notifyListener();
}
搜索了很多关于 java事件 的都和该模式相同,但是个人感觉 没有看到该事件接口起到的具体作用,有点类型方法的直接调用,没有体现事件的传播特性。去掉 两个标记接口 EventListener 都没有影响
上面讲了这么多都是为了补充前置知识。
在 Spring 容器中通过 ApplicationEvent
类和 ApplicationListener
接口来处理事件,如果某个 bean
实现 ApplicationListener
接口并被部署到容器中,那么每次对应的 ApplicationEvent
被发布到容器中都会通知该 bean
,这是典型的观察者模式。
Spring 的事件默认是同步的,即调用 publishEvent
方法发布事件后,它会处于阻塞状态,直到 onApplicationEvent
接收到事件并处理返回之后才继续执行下去,这种单线程同步的好处是可以进行事务管理。
定义下单成功事件
/**
* 定义下单成功事件
*/
public class OrderSuccessEvent extends ApplicationEvent {
/**
* Create a new {@code ApplicationEvent}.
*
* @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null})
*/
public OrderSuccessEvent(Object source) {
super(source);
}
}
定义监听器一 当订单下单成功后 发送短信
@Service
public class SmsListener implements ApplicationListener<OrderSuccessEvent> {
@Override
public void onApplicationEvent(OrderSuccessEvent event) {
sendSms();
}
private void sendSms() {
System.out.println("发送订单成功邮件");
}
}
定义监听器二 当订单下单成功后通知 发车
@Service
public class CarService implements ApplicationListener<OrderSuccessEvent> {
@Override
public void onApplicationEvent(OrderSuccessEvent event) {
sendCarInfo();
}
private void sendCarInfo() {
System.out.println("老王該發車了");
}
}
通过Spring 提供的 applicationContext 进行事件发布
@Service
public class OrderService {
@Autowired
private ApplicationContext applicationContext;
public void order(){
System.out.println("下单完成。。。");
applicationContext.publishEvent(new OrderSuccessEvent("126625874"));
System.out.println("流程结束");
}
}
通过web 方式触发 测试
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping(value = "/test/event")
public void testEvent(){
orderService.order();
}
}
访问后结果
下单完成。。。
老王該發車了
发送订单成功邮件
流程结束
可以看到 是等待监听器 全部成功后才会 流程结束,是同步执行的,且当其中出现异常时不会继续往下传播
将发车修改为
@Service
public class CarService implements ApplicationListener<OrderSuccessEvent> {
@Override
public void onApplicationEvent(OrderSuccessEvent event) {
sendCarInfo();
}
private void sendCarInfo() {
System.out.println("老王該發車了");
if (1/0 == 1){
}
}
}
异常后不会执行 发送短信的任务
可以通过将任务修改为异步执行,在源码 SimpleApplicationEventMulticaster
类中在执行传播事件任务时会判断 是否给 Executor
线程执行器 若给了就 通过线程池的方式执行,否则同步执行
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
所以最简单的方法是通过 自己构建 SimpleApplicationEventMulticaster
bean 然后给传入一个执行器来实现监听器的异步执行
@Configuration
public class AsyncEventConfig {
@Bean(name = "applicationEventMulticaster")
public ApplicationEventMulticaster getSimpleApplicationEventMultCaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return simpleApplicationEventMulticaster;
}
}
在配置后 在运行结果
下单完成。。。
流程结束
发送订单成功邮件
老王該發車了
Exception in thread "SimpleAsyncTaskExecutor-19" java.lang.ArithmeticException: / by zero
at com.sun.event.demo.CarService.sendCarInfo(CarService.java:15)
at com.sun.event.demo.CarService.onApplicationEvent(CarService.java:11)
at com.sun.event.demo.CarService.onApplicationEvent(CarService.java:6)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.lambda$multicastEvent$0(SimpleApplicationEventMulticaster.java:136)
at java.lang.Thread.run(Thread.java:745)
可以看到就算老王 生病了对 发短信的 监听是没有影响的并且 主流程不需要等待 发邮件和发车完成
在多个监听器之间若多个监听器之间需要保持执行的顺序的场景可以通过实现 ApplicationListener
的子类 SmartApplicationListener
来实现
将上面两个 监听器修改为
发车监听器
@Service
public class CarServiceWithOrder implements SmartApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
System.out.println("发车了 订单号" + event.getSource());
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == OrderSuccessEvent.class;
}
@Override
public boolean supportsSourceType(final Class<?> sourceType) {
return sourceType == String.class;
}
@Override
public int getOrder() {
return 2;
}
}
发送短信监听器
@Service
public class SmsListenerWithOrder implements SmartApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
System.out.println("发送短信 订单号" + event.getSource());
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == OrderSuccessEvent.class;
}
@Override
public boolean supportsSourceType(final Class<?> sourceType) {
return sourceType == String.class;
}
@Override
public int getOrder() {
return 1;
}
}
最终执行结果,order 数越大越先执行
下单完成。。。
流程结束
发车了 订单号126625874
发送短信 订单号126625874
从事件发布的方法开始看起,是在这里出发的事件发布,看看监听器是如何获取到事件的
// 发布 下单成功事件
applicationContext.publishEvent(new OrderSuccessEvent("126625874"));
可以在 AbstractApplicationContext
类看到 具体的 执行 publishEvent
的方法的核心逻辑主要在
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
中,其中 getApplicationEventMulticaster()
方法返回的是 SimpleApplicationEventMulticaster
对象 可以直接查看其中的实现多播的方法
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
// 此处 type 就是 com.sun.event.demo.OrderSuccessEvent 类型的事件
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 判断是否有注入 线程池
Executor executor = getTaskExecutor();
// 获取所有的监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 若注入有线程池 则放入线程池中异步执行
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
执行listener 的包装方法判断是否有注入 异常处理的 ErrorHandler 当监听异常的处理
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
最终的 监听执行的方法
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
由于所有的自定义的监听器都会实现ApplicationListener
接口这样就可以通过该方法执行 各个监听器中的具体的逻辑。
在上面有个很重要的方法有漏掉 getApplicationListeners(event, type)
这个方法用来查找所有的listener类
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
// 获取事件发布时的传入参数
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap...
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// this.retrievalMutex 为所有的 spring bean 此处需要加 重量级锁
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
在进行debug 过程中发现,spring 在启动过程中会发布事件的顺序,也能大致了解spring 的启动过程
org.springframework.boot.context.event.ApplicationStartingEvent--->
org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent -->
org.springframework.boot.context.event.ApplicationContextInitializedEvent -->
org.springframework.boot.context.event.ApplicationPreparedEvent -->
org.springframework.boot.web.servlet.context.ServletWebServerInitializedEvent-->
org.springframework.context.event.ContextRefreshedEvent -->
org.springframework.boot.context.event.ApplicationStartedEvent -->
org.springframework.boot.context.event.ApplicationReadyEvent -->
当请求过来时
com.sun.event.demo.OrderSuccessEvent[source=126625874] -->
org.springframework.web.context.support.ServletRequestHandledEvent -->