查看原文
其他

类型化消息的一种设计模式

The following article is from QMQ开源消息队列 Author 余昭辉

使用消息的时候,我们经常会碰到一种场景:Producer 将消息格式升级了,如果没有通知到 Consumer 方,Consumer 就会获取到不兼容格式的消息,导致应用报错。


鉴于大部分公司上下游联调基本靠吼,所以这种不兼容导致的问题还不少。有没有办法解决这个问题?

业界知名的 Kafka 通过引入 schema registry 来解决这个问题:Kafka 有个配套的 schema registry,可以预先将发送的消息格式 (avro schema) 记录到 schema registry,每次升级时,schema registry 会检查这个 topic 所有的消费者是不是都兼容,如果不兼容,那么就不能升级,你要么修改消息格式,保持兼容;要么联系所有的 consumer 让他们进行升级改造。

好多年前我就想在 QMQ 里引入类似 schema registry 的想法,但是受限于精力和要保持 API 的兼容性,一直没有付诸实践。

另外,Kafka 的 Schema Registry 虽然是一个绝妙的好点子,但还是有点问题:

  1. Kafka 的 Schema registry 必须提前去注册,这个过程有一点点繁琐,然后很多人就不愿意提前去配置,然后干脆不用这种方式,这真是遗憾。所以公共组件一定要做到 API 简洁,使用一定要简单,千万不要繁琐,这些开发人员懒得要命。
  2. Kafka 的检查是运行时发送的时候检查的,这个时候检查虽然能够避免将不兼容的消息发送出去,但是还是有点晚了。

那么有没有什么办法解决这两个问题呢?

前一阵在高可用架构群,@翁伟老板一直在“鼓吹” micronaut 这个框架,翁老板说的是这样的一个例子:

import io.micronaut.http.annotation.Get;import io.micronaut.http.client.Client;import io.reactivex.Single;
@Client("/hello")public interface HelloClient {
@Get("/") Single hello();}

这种通过 annotation 的方式提供强类型的 API 让人眼前一亮(后来经过群友@皆浩的提醒,有个 feign 的框架也提供类似的 API)。

有了这种方式,那 Kafka 的两个问题就迎刃而解了,那么我就可以在 QMQ 里提供类似下面的 API:

//发送消息,定义API,这个接口是自动实现的,不需要y用户实现@QMQProducerpublic interface Producer {
  @QMQTopic(topic="order.changed")  void orderChagned(OrderEvent event);}
//使用发送消息的API@Resourceprivate Producer producer;
public void pay(Order order){ OrderEvent e = buildEvent(order); producer.orderChanged(e);}
//消费消息@QMQConsumer(topic = "order.changed")public void onMessage(Message<OrderEvent> msg){  OrderEvent e = msg.Data();  //process}

通过这种 API,配合 Java 里的 annotation processor,我们就可以在编译时对这个 API 进行检查了。

  1.  在编译时我们就可以提取 OrderEvent 这个类型,将其自动的转换成 schema,然后注册到 schema registry
  2. 编译时,我们就可以将 OrderEvent 与 schema registry 里的进行兼容性对比,然后编译时就可以确定是否兼容,不兼容编译时报错,而不是等到发送消息的时候再报错,然后紧急修复。

上述方案,除了解决 Kafka 的 Schema Registry 存在的一些问题外,是不是也得到了一套更好的 API ?欢迎留言讨论。


参考阅读:


本文授权转载自公众号 QMQ 开源消息队列,技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。


高可用架构
改变互联网的构建方式

长按二维码 关注「高可用架构」公众号

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存