DoneSpeak

Spring Event初步讲解

字数: 3.6k时长: 15 min
2020/04/11 Share

写在前面

前段时间发现Spring的Event超级好用,所以已经逐步在项目中加入了Spring Event的功能。

开发环境:

Java 1.8
Spring Boot 2.1.6.RELEASE
Spring 5.1.8.RELEASE

无知与半知不解都会带来灾难。

基本开发

Event是Spring中的概念,不是Spring Event所有的。只要添加了spring-context依赖就可以引入了Spring的事件。

要使用Event只要准备三个部分:

  • 事件类:定义事件,继承ApplicationEvent的类成为一个事件类。
  • 发布者:发布事件,通过ApplicationEventPublisher发布事件。
  • 监听者:监听并处理事件,实现ApplicationListener接口或者使用@EventListener注解。

事件类

只要继承org.springframework.context.ApplicationEvent,便是一个Spring Event类。一般我们会专门为一个类型的Event写一个抽象的事件类,作为该类型的所有事件的父类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 账户相关的事件
*/
public abstract class AccountEvent extends ApplicationEvent {

/**
* 该类型事件携带的信息
*/
private AccountEventData eventData;

/**
*
* @param source 最初触发该事件的对象
* @param eventData 该类型事件携带的信息
*/
public AccountEvent(Object source, AccountEventData eventData) {
super(source);
this.eventData = eventData;
}

public AccountEventData getEventData() {
return eventData;
}
}

然后定义具体的发布事件。这里推荐使用类实现的方式来发布具体的事件,而不是在事件中使用private String eventType定义事件的类型。使用具体的类表示具体的事件,监听器只要监听具体的事件类即可,而无需再做判断,同时也不需要再另外维护事件类型列表。

1
2
3
4
5
public class AccountCreatedEvent extends AccountEvent {
public AccountCreatedEvent(Object source, AccountEventData eventData) {
super(source, eventData);
}
}

还有一种实践是,利用泛型定义一个统一的父类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class BaseEvent<T> extends ApplicationEvent {

/**
* 该类型事件携带的信息
*/
private T eventData;

/**
*
* @param source 最初触发该事件的对象
* @param eventData 该类型事件携带的信息
*/
public BaseEvent(Object source, T eventData) {
super(source);
this.eventData = eventData;
}

public T getEventData() {
return eventData;
}
}

然后再指定事件的泛型类型即可。

1
2
3
4
5
6
7
8
9
10
11
public class AccountCreatedEvent extends BaseEvent<AccountEventData> {
public AccountCreatedEvent(Object source, AccountEventData eventData) {
super(source, eventData);
}
}

public class TodoCreatedEvent extends BaseEvent<TodoEventData> {
public TodoCreatedEvent(Object source, TodoEventData eventData) {
super(source, eventData);
}
}

以上均使用了一个AccountEventData,这是为了方便拓展,如果后续需要给事件增加新的字段,可以直接在该类上增加即可,而无需修改所有的子事件类。

发布者

发布者负责发布消息,有三种实现方式。Spring容器中默认的ApplicationEventPublisherAbstractApplicationContext,同时AbstractApplicationContext也是ApplicationContext的一个子类,也就是说,Spring默认使用AbstractApplicationContext发布事件。

方式1:直接使用ApplicationEventPublisher(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.springframework.context.ApplicationEventPublisher;

public class AccountsController {

@PostMapping("")
public Account createAccount(@RequestBody Account account) {

...

publisher.publishEvent(new AccountCreatedEvent(this, new AccountEventData()));
return account;
}
}

方式2:实现ApplicationEventPublisherAware接口(推荐)

1
2
3
4
public interface ApplicationEventPublisherAware extends Aware {

void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher);
}

这个方式其实就是注入一个ApplicationEventPublisher,然后再用ApplicationEventPublisher#publisheEvent(ApplicationEvent)方法发布事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.springframework.data.rest.webmvc;

@RepositoryRestController
class RepositoryEntityController extends AbstractRepositoryRestController implements ApplicationEventPublisherAware {

private ApplicationEventPublisher publisher;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}

private ResponseEntity<ResourceSupport> saveAndReturn(Object domainObject, RepositoryInvoker invoker,
HttpMethod httpMethod, PersistentEntityResourceAssembler assembler, boolean returnBody) {

publisher.publishEvent(new BeforeSaveEvent(domainObject));
Object obj = invoker.invokeSave(domainObject);
publisher.publishEvent(new AfterSaveEvent(obj));

...
}
...
}

如果你希望你的Service类能够发布事件,可以实现这个接口ApplicationEventPublisherAware

方式3:使用ApplicationContext#publishEvent(ApplicationEvent)发布。

1
2
3
4
5
6
7
8
9
10
11
12
public class AccountEventPublisher {

private final ApplicationContext applicationContext;

public AccountEventPublisher(ApplicationContext context) {
this.applicationContext = context;
}

public void publish(TodoEvent ev) {
applicationContext.publishEvent(ev);
}
}

ApplicationContextApplicationEventPublisher的一个实现,在有前面的两种方案之后,其实就不需要这个重复封装实现方案了。当然,你也可以直接使用ApplicationContext

监听器

监听器负责接收和处理事件。

基本用法

有两种实现方法:实现ApplicationListener接口或者使用@EventListener注解。

实现ApplicationListener接口:

1
2
3
4
5
6
public class TodoFinishedListener implements ApplicationListener<TodoEvent.TodoFinishedEvent> {
@Override
public void onApplicationEvent(TodoEvent.TodoFinishedEvent event) {
// do something
}
}

使用@EventListener注解(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@Component
public class SyncAccountListener {

/**
* 异步发送邮件
* @param event
*/
@EventListener
public void doOnNormalEvent(NormalAccountEvent event) {
try {
log.debug("befor");
Thread.sleep(1000);
log.debug("after");
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}

可以使用@EventListener在同一个类中用不同的方法监听多个不同的事件。相对于实现ApplicationListener接口,使用@EventListener会更加灵活。

@EventListener 注解的说明

1
2
3
4
@EventListener(value = {AccountCreatedEvent.class, AccountUpdatedEvent.class}, condition = "#event.account.age > 10")
public void doSomethingOnAccountEvent(AccountEvent event) {
// TODO
}
  • value: 监听的事件(组),用于支持同一父类的事件
  • class: 同value
  • condition: SpEL,使得Event Handler变得conditional
    • #root.event, Application的引用
    • #root.args, 表示方法参数,#root.args[0]表示第0个方法参数
    • #<name>, 如上面代码中的#event表示以参数名关联参数

ApplicationEventMulticaster 事件广播器

事件广播器负责将ApplicationEventPublisher发布的事件广播给所有的监听器。如果没有提供事件广播器,Spring会自动使用SimpleApplicationEventMulticaster作为默认的事件广播器。

构建事件基础

AbstractApplicationContext.java中的refresh()方法中构建了完整的事件基础。AbstractApplicationContext#initApplicationEventMulticaster()初始化了事件广播器,AbstractApplicationContext#registerListeners()则负责添加Spring容器中的事件监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Initialize the ApplicationEventMulticaster.
* Uses SimpleApplicationEventMulticaster if none defined in the context.
* @see org.springframework.context.event.SimpleApplicationEventMulticaster
*/
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Add beans that implement ApplicationListener as listeners.
* Doesn't affect other listeners, which can be added without being beans.
*/
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}

// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}

// Publish early application events now that we finally have a multicaster...
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}

事件发布

我们可以在AbstractApplicationContext.java中找到事件的直接发布方法。AbstractApplicationContext#publish(Object, ResolvableType)中,事件的发布是通过ApplicationEventMulticaster做的广播发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");

// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}

// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}

// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}

在前面我们可以知道,SimpleApplicationEventMulticasterApplicationEventMulticaster在Spring容器中的默认实现。理所当然地可以从中找到事件发布的真实方式。multicastEvent方法会找到监听当前事件的所有监听器,然后再执行执行监听方法。

SimpleApplicationEventMulticaster中有两个属性,Executor taskExecutorErrorHandler errorHandler。前者可以定义所有监听器是否异步执行,默认为null,等价于同步执行的SyncTaskExecutor,你也可以使用SimpleAsyncTaskExecutor将所有监听器设置为异步执行。但这里有一点非常重要,如果你设置了Executor为异步的,那么所有的监听器都会异步执行,监听器和调用类会处于不同的上下文,不同的事务中,除非你有办法让TaskExecutor支持。其实我们完全不用通过修改广播器taskExecutor的方式来让监听器异步,可以通过@EnableAsync启动异步,并@Async将监听器设置为异步执行。通过@Async的方式,可以自由地决定任意一个监听器是否为异步,而非暴力地让所有的监听器都异步化。

1
2
3
4
5
6
7
8
9
10
11
12
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}

ErrorHandler errorHandler则定义了在监听器发生异常之后的行为,在这里你可以看到,如果没有定义errorHandler的话,会直接抛到上一层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}

可以修改默认的ApplicationEventMulticaster,或者直接继承/实现AbstractApplicationEventMulticaster/ApplicationEventMulticaster

1
2
3
4
5
6
7
8
9
10
// 是谁,是谁让我所有的监听器都编程异步了。-》原来是有人修改了事件广播器的taskExecutor为异步的了。
public class AsyncConfig {

@Bean
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
}
  • @EnableAspectJAutoProxy(proxyTargetClass = true) 作用

同步

Listener默认都是同步的

1
2
3
4
5
6
7
8
9
10
11
public Account createAccount(Account account) {
...
try {
publisher.publishEvent(new ThrowExceptionAccountEvent(this, new AccountEventData()));
} catch (Exception e) {
// 捕获同步Listener抛出的异常
throw new ServiceException(e.getMessage(), e);
}
...
return account;
}

以上的方法中,publishEvent会执行完所有的同步监听器之后才返回。既然是同步,那么你可以通过@Order注解来指定执行顺序。使用同步的监听器,可以让事件参与到publisher所在的事务中。从上面对ApplicationEventMulticaster的讲解中可以知道,同步的执行其实就是简单的方法调用罢了。

异常与事务

上面已经讲到,同步的listener如果发生异常,而且没有被ErrorHandler拦截的话,是会往上抛出的,可以直接在publishEvent方法调用处捕获。

在同步的场景下,listener的执行,其实就是普通方法的调用。那么事务的控制也是和普通的方法调用是一样的。如果想要让监听器在事务中,那么就在监听器方法上添加事务注解@Transational就可以了。具体的分析见Spring的事务传播行为。

异步

既然前面讲到,监听器的执行其实就是一个普通方法的执行,那么将监听器声明为异步的方法也会和将一个普通方法声明为异步的方法一样,使用@Async

需要明确的一点是,当监听器设置为异步之后,会导致该监听器方法和调用publishEvent的方法处于不同的事务中。其实就和普通异步没有太多区别啦。

使用@Async实现异步

启动异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(300);
executor.setThreadNamePrefix("dspk-Executor-");
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

/**
* 异常处理器
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}

默认的AsyncConfigurerAsyncConfigurerSupport,两个方法均返回了null。

设置一个监听器为异步

1
2
3
4
5
6
7
8
9
10
@Component
public class AccountListener {

@Async
@EventListener
public void sendEmailOnAccountCreatedEvent(AccountCreateEvent event) {
// 发送邮件
// do something else
}
}

使用ApplicationEventMulticaster实现异步

ApplicationEventMulticaster指定一个异步的taskExecutor,将会让所有的监听器都变成异步执行。真心不推荐这种做法。

1
2
3
4
5
6
7
8
9
public class AsyncConfig {

@Bean
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
}

事件事务管理 @TransactionalEventListener

定义

使用@TransactionalEventListener@EventListener的拓展,可以指定监听器和发布事件的方法的事务隔离级别。隔离级别确保数据的有效性。@TransactionalEventListener注解会将监听器声明为一个事务管理器(这部分有机会会在其他文章中说明)。

当一个监听器方法被@TransactionalEventListener注解时,那么这个监听器只会在调用方为事务方法时执行,如果调用方是非事务方法,则无法该监听器不会被通知。值得注意的是,虽然@TransactionalEventListener带有Transaction关键字,但这个方法并没有声明监听器为Transactional的

1
2
3
4
@TransactionalEventListener
public void afterRegisterSendMail(MessageEvent event) {
mailService.send(event);
}

配置 @TransactionalEventListener

除了含有与@EventListener相同的属性(classes, condition),该注解还提供了另外的两个属性fallbackExecutionphase

@TransactionalEventListener注解属性:fallbackExecution

定义:fallbackExecution 设置Listener是否要在没有事务的情况下处理event。

默认为false,表示当publishEvent所在方法没有事务控制时,该监听器不监听事件。通过设置fallbackExecution=true,可以让Listener在任何情况都可以执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Transactional
public void txMethod() {
publisher.publishEvent(new MessageEvent());
}

public void nonTxMethod() {
publisher.publishEvent(new MessageEvent());
}

// txMethod: 执行
// nonTxMethod: 不执行
@TransactionalEventListener
public void afterRegisterSendMail(MessageEvent event) {
mailService.send(event);
}

// txMethod: 执行
// nonTxMethod: 执行
@TransactionalEventListener(fallbackExecution = true)
public void afterRegisterSendMail(MessageEvent event) {
mailService.send(event);
}

@TransactionalEventListener注解属性:phase

  • AFTER_COMMIT - (default) 在事务完成之后触发事件。此时事务已经结束,监听器方法将找不到上一个事务
  • AFTER_ROLLBACK – 在事务回滚之后触发事件
  • AFTER_COMPLETION – 在事务完成之后触发事件 (含有 AFTER_COMMITAFTER_ROLLBACK)
  • BEFORE_COMMIT - 在事务提交之前触发事件,此时调用方方法的事务还存在,监听器方法可以找到该事务

当你尝试在@TransactionEventListener方法中执行JPA的save方法时,你会得到如下错误:

1
2
3
4
5
6
7
8
9
10
11
@EventListener
@TransactionalEventListener
public void doOnNormalEvent3(NormalAccountEvent event) {
Account account = new Account();
account.setPassword("33333");
account.setFirstName("333");
account.setLastName("333");
account.setEmail("33333@gr.com");

accountRepository.save(account);
}
1
2
3
4
Participating transaction failed - marking existing transaction as rollback-only
Setting JPA transaction on EntityManager [SessionImpl(1991443937<open>)] rollback-only
...
org.springframework.dao.InvalidDataAccessApiUsageException: no transaction is in progress; nested exception is javax.persistence.TransactionRequiredException: no transaction is in progress

原因是@TransactionalEventListener默认是AFTER_COMMIT,也就是当前事务已经结束了,所以无法找到所在事务,只能执行rollback,因而无法成功将数据保存到数据库中。但如果你希望执行findAll()方法,那么会拿到调用方提交到数据库中的数据,但也拿不到该Listener中save的数据。也许你会想,我在这个方法上@Transactional不就可以了吗?目前的测试结果是也不行。具体原因会在写事务的文章中再另外讲解,这里暂不拓展。可以通过设置phase为TransactionPhase.BEFORE_COMMIT进行解决,这样的话调用方的事务就还没有结束。

@Transactional
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.BEFORE_COMMIT)
public void doOnNormalEvent3(NormalAccountEvent event) {
    ...
}

参考

CATALOG
  1. 1. 写在前面
  2. 2. 基本开发
    1. 2.1. 事件类
    2. 2.2. 发布者
    3. 2.3. 监听器
    4. 2.4. ApplicationEventMulticaster 事件广播器
      1. 2.4.1. 构建事件基础
      2. 2.4.2. 事件发布
  3. 3. 同步
    1. 3.1. Listener默认都是同步的
    2. 3.2. 异常与事务
  4. 4. 异步
    1. 4.1. 使用@Async实现异步
    2. 4.2. 使用ApplicationEventMulticaster实现异步
  5. 5. 事件事务管理 @TransactionalEventListener
    1. 5.1. 定义
    2. 5.2. 配置 @TransactionalEventListener
  6. 6. 参考