公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:xttblog2,发送下载链接帮助你免费下载!
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云
目前网上关于 OpenMessaging 的技术文章还很少,我个人预测它将来会很火。而阿里捐献给 Apache 的 RocketMQ 对 OpenMessaging 提供了部分实现。本文将讲解他们之间的开发实战。
前面我已经介绍过 《OpenMessaging 的架构原理》,没阅读过的可以看看,加深理解。
OpenMessaging包括建立行业准则和消息传递,流媒体规范为金融,电子商务,物联网和大数据区域提供了一个共同的框架。设计原则是分布式异构环境中面向云,简单,灵活和语言无关。符合这些规范将有可能在所有主要平台和操作系统上开发异构消息应用程序。
以下示例显示如何在同步,异步或单向传输中向RocketMQ代理发送消息。ProducerApp 生产者:
package io.openmessaging.samples.producer; import io.openmessaging.Future; import io.openmessaging.FutureListener; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.SendResult; import java.nio.charset.Charset; public class ProducerApp { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace"); // 业余草:www.xttblog.com final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); System.out.println("MessagingAccessPoint startup OK"); producer.startup(); System.out.println("Producer startup OK"); //Add a shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { producer.shutdown(); messagingAccessPoint.shutdown(); } })); //Sync www.xttblog.com { SendResult sendResult = producer.send(producer.createTopicBytesMessage( "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.println("Send sync message OK, message id is: " + sendResult.messageId()); } //Async with Promise { final Future<SendResult> result = producer.sendAsync(producer.createTopicBytesMessage( "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); final SendResult sendResult = result.get(3000L); System.out.println("Send async message OK, message id is: " + sendResult.messageId()); } //Async with FutureListener { final Future<SendResult> result = producer.sendAsync(producer.createTopicBytesMessage( "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new FutureListener<SendResult>() { @Override public void operationSucceeded(Future<SendResult> promise) { System.out.println("Send async message OK, message id is: " + promise.get().messageId()); } @Override public void operationFailed(Future<SendResult> promise) { System.out.println("Send async message Failed, cause is: " + promise.getThrowable().getMessage()); } }); } //Oneway { producer.sendOneway(producer.createTopicBytesMessage( "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.println("Send oneway message OK"); } } }
使用PullConsumerApp轮询来自指定队列的消息。PullConsumerApp 消费者:
package io.openmessaging.samples.consumer; import io.openmessaging.ResourceManager; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; import io.openmessaging.consumer.PullConsumer; public class PullConsumerApp { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace"); messagingAccessPoint.startup(); System.out.println("MessagingAccessPoint startup OK"); ResourceManager resourceManager = messagingAccessPoint.getResourceManager(); // www.xttblog.com resourceManager.createQueue("NS1", "HELLO_QUEUE", OMS.newKeyValue()); //PullConsumer only can pull messages from one queue. final PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer("HELLO_QUEUE"); pullConsumer.startup(); //Poll one message from queue. Message message = pullConsumer.poll(); //Acknowledges the consumed message pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.MessageId)); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { pullConsumer.shutdown(); messagingAccessPoint.shutdown(); } })); } }
将PushConsumer附加到指定的队列,并通过MessageListener消费消息。
package io.openmessaging.samples.consumer; import io.openmessaging.ResourceManager; import io.openmessaging.Message; import io.openmessaging.consumer.MessageListener; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; import io.openmessaging.consumer.PushConsumer; import io.openmessaging.exception.OMSResourceNotExistException; public class PushConsumerApp { public static void main(String[] args) throws OMSResourceNotExistException { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace"); messagingAccessPoint.startup(); System.out.println("MessagingAccessPoint startup OK"); ResourceManager resourceManager = messagingAccessPoint.getResourceManager(); final PushConsumer consumer = messagingAccessPoint.createPushConsumer(); // Consume messages from a simple queue. { // 业余草:www.xttblog.com String simpleQueue = "HELLO_QUEUE"; resourceManager.createQueue("NS1", simpleQueue, OMS.newKeyValue()); //This queue doesn't has a source topic, so only the message delivered to the queue directly can //be consumed by this consumer. consumer.attachQueue(simpleQueue, new MessageListener() { @Override public void onReceived(Message message, Context context) { System.out.println("Received one message: " + message); context.ack(); } }); consumer.startup(); System.out.println("Consumer startup OK"); } //Consume messages from a complex queue. final PushConsumer anotherConsumer = messagingAccessPoint.createPushConsumer(); { String complexQueue = "QUEUE_HAS_SOURCE_TOPIC"; String sourceTopic = "SOURCE_TOPIC"; //Create the complex queue. resourceManager.createQueue("NS_01", complexQueue, OMS.newKeyValue()); //Create the source topic. resourceManager.createTopic("NS_01", sourceTopic, OMS.newKeyValue()); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { consumer.shutdown(); anotherConsumer.shutdown(); messagingAccessPoint.shutdown(); } })); } }
相关源代码,请一步阅读:https://github.com/openmessaging/openmessaging-java。
最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!
本文原文出处:业余草: » OpenMessaging访问RocketMQ教程(开发实战)