如何实现消息过滤?

有两种方案:

  • 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  • 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。

对消息的过滤有三种方式:

weixin-mianznxrocketmqessw-f2c8bf50-dc51-44c8-9d71-b8a22af199c4.png

  • 根据Tag过滤:这是最常见的一种,用起来高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL 表达式过滤:SQL表达式过滤更加灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();
  • Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

标签: java, Java面试题, RocketMQ, Java问题合集, Java编程, Java问题精选, Java常见问题, RocketMQ面试题