里程碑 版本 功能点 作者 完成 1.0.0 支持PulsarTemplate发送消息&支持自定义注解实例化Consumer监听消息 howinfun ✅ 1.1.0 支持动态开启/关闭Consumer消费线程池、支持自定义配置Consuemr消费线程池参数 howinfun ✅ 1.2.0 支持Spring容器停止时,释放Pulsar所有相关资源 howinfun TODO 1.3.0 支持多Pulsar数据源 howinfun TODO一、背景
Pulsar 作为新生代云原生消息队列,越来越受到开发者的热爱;而我们现在基本上的项目都是基于 SpringBoot 上开发的,但是我们可以发现,至今都没有比较大众和成熟的关于 Pulsar 的 Starter,所以我们需要自己整一个,从而避免常规使用 Pulsar API 时产生大量的重复代码。
二、设计思路
由于是第一版的设计,所以我们是从简单开始,不会一开始就设计得很复杂,尽量保留 Pulsar API 原生的功能。
2.1、PulsarClient
我们都知道,不管是 Producer 还是 Consumer,都是由 PulsarClient 创建的。
当然了,PulsarClient 可以根据业务需要自定义很多参数,但是第一版的设计只会支持比较常用的参数。
我们这个组件支持下面功能点:
支持 PulsarClient 参数配置外部化,参数可配置在 applicatin.properties 中。 支持 applicatin.properties 提供配置提示信息。 读取外部配置文件,根据参数实例化 PulsarClient,并注入到 IOC 容器中。 2.2、Producer
Producer是发送消息的组件。
这里我们提供一个模版类,可以根据需求创建对应的 Producer 实例。 支持将 TopicProducer 关系缓存起来,避免重复创建 Producer 实例。 支持同步/异步发送消息。 2.3、Consumer
Consumer是消费消息的组件。
这里我们提供一个抽象类,开发者只需要集成此实现类并实现 doReceive 方法即可,即消费消息的逻辑方法。 接着还提供一个自定义注解,自定义注解支持自定义 Consmuer 配置,例如Topic、Tenant、Namespace等。 实现类加入上述自定义注解后,组件将会自动识别并且生成对应的 Consumer 实例。 支持同步/线程池异步消费。 三、使用例子 3.1、引入依赖 io.github.howinfun winfun-pulsar-spring-boot-starter 1.1.0 3.2、加入配置 pulsar.service-url=pulsar://127.0.0.1:6650 pulsar.tenant=winfun pulsar.namespace=study pulsar.operation-timeout=30 pulsar.io-threads=10 pulsar.listener-threads=10 3.3、发送消息 /** * 发送消息 * @author: winfun **/ @RestController @RequestMapping(“msg”) public class MessageController { @Autowired private PulsarTemplate pulsarTemplate; @Autowired private PulsarProperties pulsarProperties; /*** * 往指定topic发送消息 * @author winfun * @param topic topic * @param msg msg * @return {@link String } **/ @GetMapping(“/{topic}/{msg}”) public String send(@PathVariable(“topic”) String topic,@PathVariable(“msg”) String msg) throws Exception { this.pulsarTemplate.createBuilder().persistent(Boolean.TRUE) .tenant(this.pulsarProperties.getTenant()) .namespace(this.pulsarProperties.getNamespace()) .topic(topic) .send(msg); return “success”; } } 3.4、消费消息 /** * @author: winfun * @date: 2021/8/20 8:13 下午 **/ @Slf4j @PulsarListener(topics = {“test-topic2”}, threadPool = @ThreadPool( coreThreads = 2, maxCoreThreads = 3, threadPoolName = “test-thread-pool”)) public class ConsumerListener extends BaseMessageListener { /** * 消费消息 * @param consumer 消费者 * @param msg 消息 */ @Override protected void doReceived(Consumer consumer, Message msg) { log.info(“成功消费消息:{}”,msg.getValue()); try { consumer.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } } /*** * 是否开启异步消费 * @return {@link Boolean } **/ @Override public Boolean enableAsync() { return Boolean.TRUE; } } 四、源码
源码就不放在这里分析了,大家可到Github上看看,如果有什么代码上面的建议或意见,欢迎大家提MR。
Ubuntu是一个以桌面应用为主的Linux操作系统。它是一个开放源代码的自由软件,提供了一个健壮、功能丰富的计算环境,既适合家庭使用又适用于商业环境。Ubuntu将为全球数百个公司提供商业支持。 ...
查看全文Docker采取了一种保守的方法来清理未使用的对象(通常称为“垃圾收集”),例如图像,容器,卷和网络:除非您明确要求Docker这样做,否则通常不会删除这些对象。这可能会导致Docker使用额外的磁盘空...
查看全文新浪科技讯 北京时间5月27日晚间消息,据报道,四位知情人士今日透露,亚马逊、微软和谷歌这三大云计算服务提供商,正在竞争波音公司(Boeing)价值10亿美元的云服务合同。 这些...
查看全文新浪科技讯 北京时间5月27日晚间消息,据报道,多位知情人士今日称,继加州、纽约州和华盛顿州之后,马萨诸塞州和宾夕法尼亚州的总检察长也加入到对亚马逊的反垄断调查中。 如今,越来越...
查看全文
您好!请登录