程序员阿沛
发布于 2026-06-27 / 0 阅读
0
0

面向对象和设计模式十八观察者模式轻松搞定对象间的联动关系

面向对象和设计模式(十八) 观察者模式,轻松搞定对象间的联动关系

观察者模式又叫做发布订阅模式,观察者模式的定义如下:在对象之间定义一个一对多的关联,当一个对象状态改变的时候,所有下游关联的对象都会自动收到通知并做出相应改变。下游对象就是观察者,而上游对象就是被观察者。

使用场景

当一个对象与其他对象有联动关系时,上游对象的数据或状态改变需要同时改变其关联的下游对象的数据和状态,以保持整个链路的数据一致性,此时就可以使用观察者模式。

类图和角色

下面给出观察者模式的模板代码:

<?php

interface Subject { // 被观察者接口 public function registerObserver($key, Observer $observer); // 添加观察者 public function removeObserver($key); // 移除观察者 public function notifyObservers(Message $message); // 通知观察者}
interface Observer { // 观察者接口 public function update(Message $message); // 观察者更新}
class ConcreteSubject implements Subject { // 具体被观察者 private $observers = [];
public function registerObserver($key, Observer $observer) { $this->observers[$key] = $observer; }
public function removeObserver($key) { unset($this->observers[$key]); }
public function notifyObservers(Message $message) { foreach ($this->observers as $observer) { $observer->update($message); } }
}
class ConcreteObserverOne implements Observer { // 具体观察者 public function update(Message $message) { //TODO: 获取消息通知,执行自己的逻辑… }}
class ConcreteObserverTwo implements Observer { public function update(Message $message) { //TODO: 获取消息通知,执行自己的逻辑… }}
class Demo { public static function main() { $subject = new ConcreteSubject(); $subject->registerObserver(‘observer1’, new ConcreteObserverOne()); $subject->registerObserver(‘observer2’, new ConcreteObserverTwo()); $subject->notifyObservers(new Message()); }}

下面我们看一个具体的例子,在开发一个 P2P 投资理财系统,用户注册成功之后,系统会给用户发放投资体验金。代码实现大致是下面这个样子的:

<?php

class UserController { private $userService; // 依赖注入 private $promotionService; // 依赖注入
public function register($telephone, $password) { $userId = $this->userService->register($telephone, $password); $this->promotionService->issueNewUserExperienceCash($userId); // 发放体验金 return $userId; }}

实际上,发放体验金的行为是促销模块(promotionService)的功能,将促销模块的行为直接硬塞到用户模块的方法中并不是一个合适的做法,因为如果之后用户注册后的行为发生改变或者扩展,register()方法的代码就会越写越多或者频繁修改,违反开闭原则,且可读性和可维护性也会降低。比如现在,用户注册成功之后,不再发放体验金,而是改为发放优惠券,并且还要给用户发送一封“欢迎注册成功”的站内信,就需要再在register()中写逻辑。

此时我们就可以用观察者模式对其进行优化重构。

<?php
interface RegObserver {  function handleRegSuccess($userId);}

class RegPromotionObserver implements RegObserver { // 注册行为与促销系统相关行为的观察者 private $promotionService; // 依赖注入
public function handleRegSuccess($userId) { $this->promotionService->issueNewUserExperienceCash($userId); }}
class RegNotificationObserver implements RegObserver { // 注册行为与通知系统相关行为的观察者 private $notificationService;
public function handleRegSuccess($userId) { $this->notificationService->sendInboxMessage($userId, “Welcome…”); }}
class UserController { private $userService; // 依赖注入 private $regObservers = [];
// 一次性设置好 public function setRegObservers($observers) { $this->regObservers = $observers; }
public function register($telephone, $password) { $userId = $this->userService->register($telephone, $password);
foreach ($this->regObservers as $observer) { $observer->handleRegSuccess($userId); }
return $userId; }}

值得一提的是,观察者类并不直接是下游对象,而是介于上游对象和下游对象的一个中间层,上游对象要关联触发的下游逻辑都由观察者类委托下游对象去调用。通过添加这样的一个中间层可以很好的解耦上下游对象间的联动行为。

然而上述代码实现仍有缺陷,可以进一步完善优化。

1.从观察者类的定义和命名来说,上面的观察者类与上游对象耦合,其扩展性和复用性不高;正确的做法应该是让观察者类只与下游对象耦合,与上游对象解耦,这样一来不仅是用户注册可以发放体验金,用户的其他行为(比如下订单,分享到朋友圈等)也可以发放体验金。

RegPromotionObserver 类这个名字一看就知道是用户注册时与促销模块行为相关的观察者,只能用于用户注册register()中。

假如,现在对需求进行扩展,不仅用户注册会发放体验金,用户通过分享app到朋友圈时,也会发放注册金。那么上面的 RegPromotionObserver
类就不适合用作分享逻辑的观察者了,开发者不得不再新写一个 SharePromotionObserver。

说的具体一点就是,上面的RegPromotionObserver类不应该叫RegPromotionObserver,而应该叫做
PromotionObserver
类,handleRegSuccess()方法应该改名叫做issueExperienceCash(),这样PromotionObserver就彻底和register()解耦,也和用户模块解耦,告诉使用者这个观察者类是做和促销模块相关的操作而不仅仅是用户注册时可以使用。

此外,观察者接口也不该叫做 RegObserver,而应该直接取消观察者接口,采用观察者抽象基类,抽象类的类名就叫做 Observer。

2.观察者的粒度问题

从第一点我们知道,观察者最好是只和下游对象耦合,那么问题来了,这个观察者应该定义成包含某个下游对象的所有被关联方法,还是定义成只包含某个下游对象的一个被关联方法呢?

答案是都可以。以上面的代码为例,促销模块PromotionService可能被上游对象需要的行为有两个,一个是发放体验金,一个是发放优惠券。那么观察者可以定义为下面两种形式。

形式1:以整个下游业务类作为定义观察者类的维度。

定义成的观察者类名叫做 PromotionObserver,包含 发放体验金issueExperienceCash() 和
发放优惠券issueCoupon()方法。

class PromotionObserver {   // 与促销系统相关行为的观察者  private $promotionService; // 依赖注入
  public function issueExperienceCash(Message $msg) {   // 发放体验金    // 其他逻辑    $this->promotionService->issueExperienceCash($msg->userId);    // 其他逻辑  }
  public function issueCoupon(Message $msg) {   // 发放优惠券    $this->promotionService->issueCoupon($msg->userId);  }}
class UserController {  private $userService; // 依赖注入  private $observers = [];
  public function addObservers($observer, array $methods) {    $this->observers = ['observer' => $observer, 'methods'=>$methods];  }
  public function register($telephone, $password) {    $userId = $this->userService->register($telephone, $password);        $message = new Message($userId);    foreach ($this->observers as $observerInfo) {  // 请注意下面调用观察者方法的方式      $observer = $observerInfo['observer'];      foreach($observerInfo['methods'] as $method){        $observer->$method($message);      }    }
    return $userId;  }}

形式2:以下游业务类的某一个方法作为定义观察者类的维度。

定义成2个观察者类,一个负责发放体验金,一个负责发放优惠券。

interface Observer{  public function update(Message $msg);   // 执行下游对象的逻辑}
abstract class PromotionActionObserver implements Observer{  protected $promotionService; // 依赖注入
  // 省略其他公共方法}
// 该观察者只负责发放体验金class issueExperienceCashObserver extends PromotionActionObserver {  public function update(Message $msg) {   // 发放体验金    // 其他逻辑    $this->promotionService->issueExperienceCash($msg->userId);    // 其他逻辑  }}
// 该观察者只负责发放优惠券class issueCouponObserver extends PromotionActionObserver {  public function update(Message $msg) {   // 发放优惠券    // 其他逻辑    $this->promotionService->issueCoupon($msg->userId);    // 其他逻辑  }}
class UserController { private $userService; // 依赖注入 private $observers = [];  public function addObservers($observer) {  $this->observers = $observer; }
 public function register($telephone, $password) {  $userId = $this->userService->register($telephone, $password);  $message = new Message($userId);
  /** @var Observer $observer */  foreach ($this->observers as $observer) { // 请注意下面调用观察者方法的方式,直接调用update()    $observer->update($message);  }
  return $userId; }}

这两种形式的区别在于:

形式2的粒度比形式1更细,所以定义出来的观察者类会更多,而且形式2的观察者让上游对象调用起来会方便很多,因为上游对象会明确的知道要调用 update
方法,而对于形式1来说,观察者的更新方法不只一个且方法名五花八门(即上述代码中的 issueExperienceCash() 和
issueCoupon()都可能是更新方法),必须在注册观察者时同时告诉上游对象哪个方法是它的目标更新方法。

3.未经完善的代码中,观察者的更新方法的参数各种各样,这对上游对象对观察者更新方法的调用很不友好。

解决方法是定义一个Message对象,将观察者所需的参数都归纳到Message中。由于不同的观察者对象所需的上游数据是不同的,可能观察者A的update方法需要用户的id作为参数,观察者B的update方法需要用户id和订单id,观察者C的update方法需要用户的账单明细信息等。

因此Message需要包含什么属性由不同的观察者自己决定,最好不要有上层调用做Message的实例化逻辑,而是在观察者内部创建一个Message的工厂方法,由这个工厂方法实现Message的实例化逻辑和属性赋值。

如下所示:

<?php

class ObsMessage{ protected $params = []; // 观察者所需的数据
public function setParam($name, $val){ $this->params[$name] = $val; }
public function getParam($name){ return $this->params[$name] ?? null; }}
abstract class Observer{ const MESSAGE_PARAMS = []; abstract public function update(ObsMessage $msg); // 执行下游对象的逻辑
public static function buildMessage(&$params){ // 通用的观察者Message构建方法,子类可重写 $message = new ObsMessage(); foreach(static::MESSAGE_PARAMS as $paramKey){ if(isset($params[$paramKey])){ $message->setParam($paramKey, $params[$paramKey]); } } }}
// 观察者1class ConcreteObserver1 implements Observer{ const MESSAGE_PARAMS = [‘param_1’, ‘param_2’, ‘param_3’]; // 观察者1需要’param_1’, ‘param_2’, ‘param_3’这三个参数
public function update(ObsMessage $msg){ $param1 = $msg->getParam(‘param_1’); $param2 = $msg->getParam(‘param_2’); $param3 = $msg->getParam(‘param_3’);
// …相关的处理逻辑 }}
// 观察者2class ConcreteObserver2 implements Observer{ const MESSAGE_PARAMS = [‘param2’, ‘param_4’]; // 观察者2需要’param2’, 'param_4’这两个参数
public function update(ObsMessage $msg){ $param2 = $msg->getParam(‘param_2’); $param4 = $msg->getParam(‘param_4’);
// …相关的处理逻辑 }}
// 上游对象class UpstreamService { public $observers = [];
public function handle(){ // 上游业务逻辑 // …省略UpstreamService类自身的业务逻辑代码 $params = [“param_1”, … , “param_10”];
/** @var Observer $observer */ foreach($this->observers as $observer){ $observerCls = get_class($observer); $message = $observerCls::buildMessage($params); $observer->update($message); } }}

4.一个上游对象的某个方法需要关联哪些观察者一般是已经确定好了的,我们可以在上游对象的类里面将各个上游对象方法所需的观察者提前定义好到一个静态方法或者静态属性中。同时也保留addObserver
和 removeObserver 方法,允许上游对象动态添加观察者。

5.有没有发现,上面例子中UserController->register()方法需要遍历所有的观察者,调用观察者的handleRegSuccess方法。如果上游对象的每个方法都需要写一遍遍历观察者的逻辑未免太过繁琐,遍历观察者这种流程化的东西应该控制反转交给一个单独的类负责。

6.假如上游对象包含的下游操作很多,同步操作势必会拖慢接口,对于业务复杂的接口可以异步执行下游操作,可以不要求数据的强一致性,保证数据的最终一致性即可。因此观察者类中可以多加一个async属性标识某个观察者是否要异步执行。对于异步执行的观察者,我们只需把它丢到异步队列即可。

现在,综合上面所有的优化点,可以整理出下面这套观察者模式的框架:

<?php

// 观察者所需的信息
class ObsMessage{ protected $params = []; // 观察者所需的数据
public function setParam($name, $val){ $this->params[$name] = $val; }
public function getParam($name){ return $this->params[$name] ?? null; }}
/** * 观察者抽象类 * @property ObsMessage $message /abstract class Observer{ const MESSAGE_PARAMS = []; protected $async = false; // 表示本观察者要执行的下游任务是否异步执行 protected $message;
abstract public function update(); // 执行下游对象的逻辑
public function __construct($async){ $this->async = $async; }
public function setAsync($flag){ $this->async = $flag; }
public function getAsync(){ return $this->async; }
public function setMessage(&$params){ $message = new ObsMessage(); foreach(static::MESSAGE_PARAMS as $paramKey){ if(isset($params[$paramKey])){ $message->setParam($paramKey, $params[$paramKey]); } } $this->message = $message; }}
// 负责发放体验金的观察者
class IssueExperienceCashObserver extends Observer { private $promotionService; // 依赖注入 const MESSAGE_PARAMS = [‘user_id’];
public function update() { $this->promotionService->issueNewUserExperienceCash($this->message->getParam(‘user_id’)); }} // 负责发送站内信的观察者
class SendInboxMessageObserver extends Observer { private $notificationService; const MESSAGE_PARAMS = [‘user_id’, “content”];
public function update() { $userId = $this->message->getParam(‘user_id’); $content = $this->message->getParam(‘content’); $this->notificationService->sendInboxMessage($userId, $content); }}
// 控制反转将注册观察者、通知观察者的逻辑收拢trait ObservableTrait{ // 使用ObservableTrait的类必须实现Observable接口 protected $observers = []; // 同步执行的观察者,是个二维数组,一个key表示一个上游方法对应的下游观察者 protected $asyncObservers = []; // 异步执行的观察者
public function addObserver($key, Observer $obs){ $obsCls = get_class($obs); if($obs->getAsync()){ if(!isset($asyncObservers[$key])){ $this->asyncObservers[$key] = []; } $this->asyncObservers[$key][$obsCls] = $obs; }else{ if(!isset($observers[$key])){ $this->observers[$key] = []; } $this->observers[$key][$obsCls] = $obs; } }
public function post($key, $data){ // 通知key对应的观察者 $observers = $this->observers[$key] ?? []; // 最终要通知的观察者, 键名是观察者类名,值是观察者对象 $asyncObservers = $this->asyncObservers[$key] ?? []; $obsCfgs = static::observerConfigs()[$key] ?? []; $asyncObsCfg = static::asyncObserverConfigs()[$key] ?? [];
foreach($obsCfgs as $observerCls){ if(isset($observers[$observerCls])){ // 动态添加的观察者如何和配置中的观察者重复则优先取动态添加的观察者 continue; } $obs = new $observerCls(false); $obs->setMessage($data); $observers[$observerCls] = $obs; }
foreach($asyncObsCfg as $observerCls){ if(isset($asyncObservers[$observerCls])){ continue; } $obs = new $observerCls(true); $obs->setMessage($data); $asyncObservers[$observerCls] = $obs; }
// 异步执行 if(!empty($asyncObservers)){ $mq = new MessageQueue(…); // 实例化消息队列,消息队列的参数省略 $mq->send(json_encode($asyncObservers)); // 将异步观察者序列化后再发送到队列 }
// 同步执行 if(!empty($observers)){ // 省略try-catch捕获异常 $this->runObservers($observers); } }
protected function runObservers($observers){ /
* @var Observer $obs */ foreach($observers as $obs){ $obs->update(); } }
public static function observerConfigs(){ return []; }
public static function asyncObserverConfigs(){ return []; }} // 被观察者接口interface Observable{ public function addObserver($key, Observer $obs); public function post($key, $data); public static function observerConfigs(); public static function asyncObserverConfigs();}

下面是这套框架的使用示范,在下面的示范中用户注册后会触发发放体验金和发送站内信两个行为,其中发放体验金是同步操作,发送站内信是异步操作:

class UserController implements Observable{  use ObservableTrait;  private $userService; // 依赖注入    public static function observerConfigs(){    return [      'register' => [IssueExperienceCashObserver::class],    ];  }
  public static function asyncObserverConfigs(){    return [      'register' => [SendInboxMessageObserver::class],    ];  }
  public function register($telephone, $password) {    $userId = $this->userService->register($telephone, $password);        $data = [      'user_id' => $userId,      'content' => "感谢您的注册"    ];    $this->post("register", $data);   // 通知观察者
    return $userId;  }}

EventBus框架

上面我们实现了一个简单的观察者模式框架,实际上目前开源代码中实现了有很多的观察者模式框架,其中 Google Guava EventBus
就是一个比较著名的观察者模式框架。

EventBus被称为“事件总线”,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。

下面我先给出 Google Guava EventBus 框架的使用示例(Java实现):

public class UserController {  private UserService userService; // 依赖注入
  private EventBus eventBus;  private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
  public UserController() {    //eventBus = new EventBus(); // 同步阻塞模式    eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 异步非阻塞模式  }
  public void setRegObservers(List<Object> observers) {    for (Object observer : observers) {      eventBus.register(observer);    }  }
  public Long register(String telephone, String password) {    //省略输入参数的校验代码    //省略userService.register()异常的try-catch代码    long userId = userService.register(telephone, password);
    eventBus.post(userId);
    return userId;  }}
public class RegPromotionObserver {  private PromotionService promotionService; // 依赖注入
  @Subscribe  public void handleRegSuccess(Long userId) {    promotionService.issueNewUserExperienceCash(userId);  }}
public class RegNotificationObserver {  private NotificationService notificationService;
  @Subscribe  public void handleRegSuccess(Long userId) {    notificationService.sendInboxMessage(userId, "...");  }}

EventBus 框架的相关细节:

1. EventBus不需要定义 Observer 接口,任意类型的对象都可以作为观察者对象注册到 EventBus 中,通过 @Subscribe
注解来标明类中哪个函数可以接收被观察者发送的消息。

public void register(Object object);  // 任意类型的对象都可以注册到 EventBus 中

2. EventBus框架包含 EventBus与AsyncEventBus
两种类,EventBus类实现了同步阻塞的观察者模式,AsyncEventBus 继承自
EventBus,实现了异步非阻塞的观察者模式。具体使用方式如下所示:

EventBus eventBus = new EventBus(); // 同步阻塞模式EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(8));// 异步阻塞模式

3. post() 方法给观察者发送消息。

public void post(Object event);

post() 发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息(post 函数定义中的
event)类型的父类。

这个能力是通过反射和Observer注册表实现的,通过反射解析 @Subscribe
注解的方法参数中的对象类型,并将对象类型与方法名的映射保存到Observer注册表中。

4.对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。

如果是EventBus框架的基本实现,感兴趣的朋友可以看看:

1.Subscribe

Subscribe 是一个注解,用于标明观察者中的哪个函数可以接收消息。

@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)@Betapublic @interface Subscribe {}

2.ObserverAction

ObserverAction 类用来表示 @Subscribe 注解的方法,其中,target 表示观察者类,method 表示方法。它主要用在
ObserverRegistry 观察者注册表中。

public class ObserverAction {  private Object target;  private Method method;
  public ObserverAction(Object target, Method method) {    this.target = Preconditions.checkNotNull(target);    this.method = method;    this.method.setAccessible(true);  }

public void execute(Object event) { // event是method方法的参数 try { method.invoke(target, event); } catch (InvocationTargetException | IllegalAccessException e) { e.printStackTrace(); } }}

3.ObserverRegistry

ObserverRegistry 类就是前面讲到的 Observer 注册表,是最复杂的一个类,框架中几乎所有的核心逻辑都在这个类中。这个类大量使用了
Java 的反射语法,不过代码整体来说都不难理解,其中,一个比较有技巧的地方是 CopyOnWriteArraySet 的使用。

CopyOnWriteArraySet 在写入数据的时候,会创建一个新的 set,并且将原始数据 clone 到新的 set 中,在新的 set
中写入数据完成之后,再用新的 set 替换老的
set。这样就能保证在写入数据的时候,不影响数据的读取操作,以此来解决读写并发问题。除此之外,CopyOnWriteSet
还通过加锁的方式,避免了并发写冲突。

public class ObserverRegistry {  private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();
  public void register(Object observer) {    Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);    for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {      Class<?> eventType = entry.getKey();      Collection<ObserverAction> eventActions = entry.getValue();      CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);      if (registeredEventActions == null) {        registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());        registeredEventActions = registry.get(eventType);      }      registeredEventActions.addAll(eventActions);    }  }
  public List<ObserverAction> getMatchedObserverActions(Object event) {    List<ObserverAction> matchedObservers = new ArrayList<>();    Class<?> postedEventType = event.getClass();    for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {      Class<?> eventType = entry.getKey();      Collection<ObserverAction> eventActions = entry.getValue();      if (postedEventType.isAssignableFrom(eventType)) {        matchedObservers.addAll(eventActions);      }    }    return matchedObservers;  }
  private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {    Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();    Class<?> clazz = observer.getClass();    for (Method method : getAnnotatedMethods(clazz)) {      Class<?>[] parameterTypes = method.getParameterTypes();      Class<?> eventType = parameterTypes[0];      if (!observerActions.containsKey(eventType)) {        observerActions.put(eventType, new ArrayList<>());      }      observerActions.get(eventType).add(new ObserverAction(observer, method));    }    return observerActions;  }
  private List<Method> getAnnotatedMethods(Class<?> clazz) {    List<Method> annotatedMethods = new ArrayList<>();    for (Method method : clazz.getDeclaredMethods()) {      if (method.isAnnotationPresent(Subscribe.class)) {        Class<?>[] parameterTypes = method.getParameterTypes();        Preconditions.checkArgument(parameterTypes.length == 1,                "Method %s has @Subscribe annotation but has %s parameters."                        + "Subscriber methods must have exactly 1 parameter.",                method, parameterTypes.length);        annotatedMethods.add(method);      }    }    return annotatedMethods;  }}

4.EventBus

EventBus 实现的是阻塞同步的观察者模式。

public class EventBus {  private Executor executor;  private ObserverRegistry registry = new ObserverRegistry();
  public EventBus() {    this(MoreExecutors.directExecutor());  }
  protected EventBus(Executor executor) {    this.executor = executor;  }
  public void register(Object object) {    registry.register(object);  }
  public void post(Object event) {    List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);    for (ObserverAction observerAction : observerActions) {      executor.execute(new Runnable() {        @Override        public void run() {          observerAction.execute(event);        }      });    }  }}

5.AsyncEventBus

public class AsyncEventBus extends EventBus {  public AsyncEventBus(Executor executor) {    super(executor);  }}

评论