参考: EventBus3.0源码解析
EventBus 初始化
通常都是直接通过静态方法 getDefault()
来获得实例,代码如下:
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
这是一个单例模式的方法,使用了双重判断的方式,防止并发的问题,还能极大的提高效率。上述方法中调用了如下的默认构造:
public EventBus() {
this(DEFAULT_BUILDER);
}
上述构造方法中又调用了带参的构造方法,并传入了默认了 Builder
对象:
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
我们也可以通过 EventBus
的 builder()
方法来构造一个新的 EventBus
对象或者修改默认的 builder
参数:
public static EventBusBuilder builder() {
return new EventBusBuilder();
}
这个方法会返回一个 EventBusBuilder
对象,也就是默认的 Builder
对象。最后,如果直接通过 build()
方法来创建 EventBus
对象的话,那就是独立配置的 EventBus
,跟 getDefault()
返回的对象不是同一个。
每个新建的 EventBus
发布和订阅事件都是相互隔离的,即一个 EventBus
对象中的发布者发布事件,另一个 EventBus
对象中的订阅者不会收到该订阅。如果通过 installDefaultEventBus()
方法来创建对象,则是修改默认对象的参数,之后通过 getDefault()
方法获取的对象都是同一个单例对象。
// 独立配置的 EventBus,跟 getDefault() 返回的对象不是同一个
public EventBus build() {
return new EventBus(this);
}
// 修改默认对象的参数,之后通过 getDefault() 方法获取的对象都是同一个单例对象
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." +
" It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
}
我们在上一篇文章配置索引文件时,调用的就是修改默认的 EventBus
单例对象:
EventBus.builder().ignoreGeneratedIndex(false).
addIndex(new MyEventBusIndex()).installDefaultEventBus();
EventBus 注册
注册方法的代码如下:
public void register(Object subscriber) {
// 拿到订阅者的运行时类型Class
Class<?> subscriberClass = subscriber.getClass();
// 利用订阅者的Class去查找类中声明的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//循环遍历逐个将订阅者和订阅方法订阅到EventBus
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
步骤归纳:
- 拿到订阅者对象
subscriber
。 - 根据订阅者对象查找到所有的订阅方法。
- 把订阅者和订阅方法订阅到
EventBus
。
查找订阅方法
首先来看一下 SubscriberMethod
类,该类是对我们声明的订阅方法和参数的封装。
public class SubscriberMethod {
final Method method; // 方法
final ThreadMode threadMode; // 执行线程
final Class<?> eventType; // 事件类型
final int priority; // 优先级
final boolean sticky; // 粘性事件
/** Used for efficient comparison */
String methodString;
public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.priority = priority;
this.sticky = sticky;
}
}
在上述第二步使用了一个 SubscriberMethodFinder
实例来进行方法查找。 SubscriberMethodFinder
这个类是专门用来查找订阅方法的,findSubscriberMethods()
最后返回了一个 SubscriberMethod
集合。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 如果subscriberClass类注册过,那么直接从METHOD_CACHE缓存中获取信息返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 是否忽略注解处理器生成的索引文件,默认 false
if (ignoreGeneratedIndex) {
// 使用反射来寻找订阅方法信息
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//先根据生成的索引文件来寻找订阅方法信息,索引文件中找不到,再使用反射寻找
subscriberMethods = findUsingInfo(subscriberClass);
}
// 没有订阅方法,直接抛异常
if (subscriberMethods.isEmpty()) {
...异常
} else {
// 将查找的方法最后都存进了内存缓存METHOD_CACHE中, 对应关系是订阅类和它的订阅方法
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
首先就是通过缓存来获取,因为查找是比较耗时的操作,缓存可以提高效率。 METHOD_CACHE
是一个以订阅者 Class
为 Key
, 订阅方法集合为 Value
的线程安全的的 HashMap
,定义如下:
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
第一次执行肯定是没有缓存的,然后会根据 ignoreGeneratedIndex
来执行不同的方法。从方法名来看,一个是使用反射去查找,另一个是使用已有的信息去查找(也就是通过 apt 技术生成的索引文件来查找)。
ignoreGeneratedIndex
这个值默认是 false
,因为反射开销大,所以默认是走 findUsingInfo()
分支,但是在 findUsingInfo()
方法中会检查本地是否有 apt
预先解析出的订阅者信息,如果没有,还是会执行反射方法findUsingReflectionInSingleClass()
。
apt
的部分涉及到了注解处理器,这里只看反射获取的情况:
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
// 循环找到有效的注册方法
while (findState.clazz != null) {
// 使用反射查找一个类的订阅方法
findUsingReflectionInSingleClass(findState);
//将父类赋给findState.clazz,往上进行查找
findState.moveToSuperclass();
}
// 此时,已经把有效的订阅信息收集到了findState的subscriberMethods变量中
return getMethodsAndRelease(findState);
}
因为子类会继承父类的方法,所以当子类找不到时,需要去查找父类。这里使用了一个 while
循环递归进行查找。查找过程使用了一个新的对象 FindState
,它是用来存储查找过程中的一些信息,方便进行迭代查找。它的类定义:
static class FindState {
//订阅方法集合
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
//当前订阅者
Class<?> subscriberClass;
//当前查找的类
Class<?> clazz;
//是否跳过父类查找
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;
}
真正开始使用反射解析一个类的订阅方法如下:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
//getDeclaredMethods在某些设备上会出现NoClassDefFoundError
// getDeclaredMethods()方法的效率要优于getMethods()
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// 如果报错了,再调用 getMethods() 方法获取
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
// 忽略父类,因为 .getMethods() 方法已经获取到了父类方法了
findState.skipSuperClasses = true;
}
for (Method method : methods) {
// // 获取方法的修饰符, 比如public, static等修饰符
int modifiers = method.getModifiers();
// 注册方法是 PUBLIC 的,而且不是抽象,静态,VOLATILE 等的
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 获取方法的参数信息
Class<?>[] parameterTypes = method.getParameterTypes();
// 方法只有一个参数
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 如果方法有@Subscribe注解信息
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 检查当前method是否是有效的订阅方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 保存对应的信息,包括方法、参数类型、线程类型、优先级和是否sticky
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
...异常
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
...异常
}
}
}
总的来说,上述方法有如下三步:
一. 获取到订阅者所有的方法。
二. 遍历处理方法,通过修饰符、参数个数、指定注解和 EventType
来筛选有效的订阅方法。
三. 将订阅方法添加进 findState
的 subscriberMethods
这个 List
中。
上述方法中,有一个非常重要的方法 checkAdd()
,这个方法决定了是否订阅方法可以被保存下来进而能接收到消息:
boolean checkAdd(Method method, Class<?> eventType) {
// 考虑到同一个类里面通常只会有一个方法监听同一类事件,为了效率,所以EventBus先根据事件类型去匹配同一个类中是否已经有添加过的方法监听同一类事件
// HashMap 保存数据时,如果已经存在key,会返回对应的旧的值
Object existing = anyMethodByEventType.put(eventType, method);
// 没有,则直接认为该方法为有效订阅方法
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
// 因为前面会调用 checkAddWithMethodSignature 方法把 existing 消费掉了,所以这里要调用这行代码,避免重复处理
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
一般来说,一个类不会有对同一个 EventType
写多个方法,会直接保存方法后返回 true
,即为有效订阅方法。对于出现了同一个类中同样的 EventType
写了多个方法,还有当 findUsingReflection()
中进行下一轮循环,会进行父类查找,如果子类继承了父类的订阅方法的情况,就会使用 checkAddWithMethodSignature()
方法来进行下一步的判断了:
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
// 根据一定规则计算出方法的唯一特征名称
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
// 如果具有该特征名称的方法之前还没有添加过 或者 之前添加过的方法所在的类是当前方法所在类的父类或者接口
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
// 恢复成之前的方法,即新方法不保存,通常是父类的方法被子类重写了,所以唯一特征名称一致,
// 但是父类的方法不保存,只保留子类重写的方法。
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
对于同一类中同样的 EventType
写了多个方法,因为方法名不同,所以[方法名>eventType]的Key不同,methodClassOld
会为 null
,直接返回 true
。所以这种情况会将所有相同 EventType
的方法都进行保存。
对于子类重写父类方法的情况,则 methodClassOld
(即子类)不为 null
,并且 methodClassOld
也不是 methodClass
的父类,所以会返回 false
。即对于子类重写父类订阅方法,只会保存子类的订阅方法,忽略父类的订阅方法。
至此,findState
的查找任务就结束了,通过循环向父类查找,将订阅者的订阅方法都保存在了其内部变量 subscriberMethods
列表中。
最后,会调用 getMethodsAndRelease()
来将 findState
中的 subscriberMethods
取出并返回,然后将使用完的 findState
实例置空恢复后又放回实例池中,将实例回收利用,节省了新的开销,代码如下:
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
// 将findState中的subscriberMethods取出
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
// 回收资料
findState.recycle();
// 将使用完的findState实例置空恢复后又放回实例池中,将实例回收利用,节省了新的开销
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
至此,查找订阅方法的流程就结束了。我们再将所有步骤归纳一下:
一. 先从订阅类获取到订阅者所有的方法。
二. 遍历处理方法,通过修饰符、参数个数、指定注解和 EventType
来筛选有效的订阅方法。
三. 将订阅方法添加进 findState
的 subscriberMethods
这个 List
中。
四. 再从订阅类的父类往上进行查找。
五. 取出所有的订阅方法返回并进行回收处理。
订阅事件
回到 register()
方法中的第三步,对上一步查找到的订阅方法集合进行了遍历调用 subscribe()
方法。来看看 subscribe()
方法做了什么事:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
// 实例一个Subscription对象,内部持有了订阅者和订阅方法
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 多次注册会报错
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
// 根据优先级排序Subscription
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// typesBySubscriber保存了订阅者对应的所有EventType
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性订阅方法要立即处理
if (subscriberMethod.sticky) {
// 是否考虑注册方法的参数,事件的继承关系,默认为 true
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 看当前EventType是否是已有的stickyEvent的父类
if (eventType.isAssignableFrom(candidateEventType)) {
// 取出缓存的Event
Object stickyEvent = entry.getValue();
// 立即投递事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
方法归纳:
一. 将订阅者和订阅方法封装到 subscriptionsByEventType
,它可以根据 EventType
拿到所有的 Subscription
对象,Subscription
对象中就有订阅者和订阅方法。这样当有 EventType
消息过来时,可以快速的传递给订阅者的订阅方法。(用来给 post
方法来分发事件的)。
二. 将订阅者封装到 typesBySubscriber
,它可以根据订阅类拿到所有的 EventType
。这样当我们调用调用 unregister(this)
时,就可以拿到 EventType
,又根据 EventType
拿到所有订阅者和方法,进行解绑了。(用来反注册的)。
三. 如果当前订阅方法是粘性方法,则立即去查找是否有本地事件,有的话就立即投递。这也就是为什么粘性事件在注册时会立即回调订阅方法的原因。
至此,注册流程就结束了。总的来说,就是把订阅者和订阅事件都缓存到了 EventBus
的成员变量 subscriptionsByEventType
和 typesBySubscriber
,接下来就要用到这两个变量进行事件的分发和反注册了。
注销事件
注册对应的就是反注册,也就是注销,可以调用 unregister
进行注销:
public synchronized void unregister(Object subscriber) {
// 通过订阅者拿到它订阅的所有的订阅事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 遍历事件类型集合,根据事件类型解绑
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 从记录中移除订阅者
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
上述方法,又会调用如下方法:
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 根据事件类型拿到所有的Subscription
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
// 遍历所有Subscription,符合解除条件的进行remove
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
注销其实就是把对应的注册者和对应的注册信息从记录中移除。
发送事件
EventBus
提供了两种发送方法 post()
和 postSticky()
。post()
发送的是非粘性的事件,postSticky()
发送的是粘性事件。
post 方式
public void post(Object event) {
// 从当前线程中取出PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
// 拿到EventType队列
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 当前线程是否有消息正在投递
if (!postingState.isPosting) {
// 当前线程是否是主线程
postingState.isMainThread = isMainThread();
// 当前线程是否正在发送事件的标识
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 遍历队列里面的每个事件
while (!eventQueue.isEmpty()) {
// 单个事件具体处理逻辑
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 重置标志
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
这里面有一个 PostingThreadState
对象,我们看一下该类的定义:
final static class PostingThreadState {
// 当前线程的事件队列
final List<Object> eventQueue = new ArrayList<>();
// 是否有事件正在分发
boolean isPosting;
// post 的线程是否在主线程
boolean isMainThread;
// 订阅者和订阅方法的封装类
Subscription subscription;
// 事件
Object event;
// 是否取消分发
boolean canceled;
}
PostingThreadState
是通过 currentPostingThreadState.get()
来获取实例的:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
ThreadLocal
保证了数据只对当前线程可见,其他线程是不可见的,这样的话当我们从不同的线程中去取数据,数据相当于是分开保存,设置和读取就会比较快。
接着会调用postSingleEvent()
对事件进行分发:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// 如果是可继承的事件
if (eventInheritance) {
// 查找Event的所有父类、接口类以及父类的接口类(Event的父类和接口也是Event)
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 根据查找到的所有Class(Event),逐个寻找订阅者,进行分发event
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 如果不是可继承的事件,则直接对事件进行分发
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 如果没有找到订阅者,报异常(调用了 post 方法,也写了方法,但是没有调用 register 方法)
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
这个方法的作用很简单,就是做了个分支处理:如果是可继承的事件,则查找到它的所有父事件,然后再往下分发。继续看 postSingleEventForEventType()
方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
// 可能同时在多个线程同时发送Event,subscriptionsByEventType是共有常量,所以需要加锁
synchronized (this) {
// 根据Event的类型拿到所有订阅者和订阅方法的封装类列表
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 逐个通知订阅者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
这个方法很简单,就是根据 Event
的类型拿到所有订阅者和订阅方法的封装类列表,然后遍历通知,关键方法在于postToSubscription()
:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 根据订阅线程模式进行分支处理
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 直接调用在本线程
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 如果就在主线程,则直接调用
invokeSubscriber(subscription, event);
} else {
// 如果不在主线程,则使用mainThreadPoster
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 如果在主线程,使用backgroundPoster
backgroundPoster.enqueue(subscription, event);
} else {
// 如果不在主线程,则直接在当前线程调用
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 启动新的线程调用,asyncPoster
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这里就是 EventBus
的线程切换了,根据注解定义的线程在不同的分支进行方法回调,方法执行是通过反射来实现的,反射方法如下:
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 直接反射调用
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
分发流程归纳:
一. 获取当前线程的 PostingThreadState
对象从而获取到当前线程的事件队列。
二. 通过事件类型获取到所有订阅者集合。
三. 通过反射执行订阅者中的订阅方法。
接下来,我们分析一下线程切换用到的几个 Poster
。
mainThreadPoster
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
mainThreadPoster
是个自定义的类 HandlerPoster
,它的目的是在主线程中调用订阅方法,而 EventBus
使用的就是我们熟悉的 Handler
:
public class HandlerPoster extends Handler implements Poster {
// 一个待处理消息的队列
private final PendingPostQueue queue;
// 最大处理时间
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 获取一个PendingPost实例
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 入队
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 主线程的handler发送消息,发送到主线程
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
// 从队列中取出一个pendingPost
PendingPost pendingPost = queue.poll();
// 如果队列里的消息处理完毕,就直接跳出循环。
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 调用订阅方法并会回收pendingPost
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
// 如果方法的执行时间超过最大执行时间(默认10毫秒)
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
HandlerPoster
是自定义的 Handler
,发送消息使用的是 Looper.getMainLooper()
即主线程的 Handler
。 内部定义了一个最大处理消息时间,默认是 10
毫秒,所以说我们一定不要在订阅方法中做耗时操作。还维护了一个 PendingPostQueue
,它是自定义的一个基于链表实现的队列。
队列维护的是 PendingPost
类,它封装了订阅者 subscription
实例和事件 event
实例;它内部又维护了一个大小为 10000
的 PendingPost
(数组集合)池,用来重复利用 PendingPost
实例。然后将 PendingPost
实例入队到上一步说的 PendingPostQueue
队列中。接着使用主线程的 Handler
发送一个消息。
接下来就是在 handleMessage()
中处理消息了,使用了 while
循环,不断从队列中去取 PendingPost
处理,但是加了个最大执行时间处理,因为是在主线程调用,所以一旦超时,就退出队列,并重新尝试去再进入队列。
BackgroundPoster
BackgroundPoster
的作用是将 UI
线程的订阅方法调度在非 UI
线程中。即它是要执行在新的 Thread
中的,而开启线程我们最常用的就是 Runnable
, 来看看源码:
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 从消息池中构建一个PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 入队
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 线程池调度执行,尽可能使用一个后台线程去依次排队执行订阅方法
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
里面通过一个可缓存的线程池去执行 BackgroundPoster
这个 Runnable
,总体跟 HandlerPoster
的流程类似,但是有两点不一样的地方,分别为:
一. 使用了 poll(int maxMillisToWait)
方法,这个设计很巧妙,当取到最后发现队列为空后,会 wait 1000
毫秒,当有有新的信息来临时就会唤醒线程,poll
出消息。这样设计就减少了发送消息的次数,节省了资源。
二. 因为是在子线程执行,所以就没有方法执行时间的限制了。
AsyncPoster
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
// 直接开启新的线程执行,每条消息都直接开启新的后台线程立即执行
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
// 直接取出消息执行
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
可以看到 AsyncPoster
和 BackgroundPoster
非常的相似,因为它们的功能也非常相似。但是不同之处在于: BackgroundPoster
是尽可能使用一个后台线层去依次排队执行订阅方法;而 AsyncPoster
则是每条消息都直接开启新的后台线程立即执行。
postSticky 方式
当注册了粘性事件后,立即能收到还没有注册时系统发出的最后一个事件。
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
postSticky()
方法用来发送一个粘性事件,在这个方法中,直接将粘性事件保存在了一个 Map
集合中,而 key
就是 Event的Class
对象。接着就调用正常的 post()
方法了。
那为什么我们后注册的方法也能接收到之前发出的粘性事件呢,答案就在上面提到的注册方法 subscribe()
中的最后一段:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
...
// 如果是粘性事件,则直接发送出去
if (subscriberMethod.sticky) {
if (eventInheritance) {
// 从stickyEvents取出粘性事件的Class对象
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 如果订阅的事件是保存的粘性事件Class或它的父类
if (eventType.isAssignableFrom(candidateEventType)) {
// 取出缓存的Event
Object stickyEvent = entry.getValue();
// 将缓存的Event发送出去
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
在我们注册订阅方法和事件时,如果是粘性事件,就直接会将事件发送给注册了相同 Event
的订阅者,方法中调用了checkPostStickyEventToSubscription(newSubscription, stickyEvent)
方法:
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
}
}
直接又调用了postToSubscription()
方法,根据指定线程分别进行分发。