实战应用 maven依赖
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.9.1</version > </dependency >
普通消息 消息发送 1、Producer端发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void syncProducer () throws Exception { DefaultMQProducer producer = new DefaultMQProducer("Group_A" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); for (int i = 0 ; i < 100 ; i++) { Message message = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); }
2、发送异步消息 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void asyncProducer () throws Exception { DefaultMQProducer producer = new DefaultMQProducer("Group_A" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0 ); final CountDownLatch2 countDownLatch2 = new CountDownLatch2(messageCount); for (int i = 0 ; i < messageCount; i++){ final int index = i; Message message = new Message("TopicTest" , "TagB" , "OrderID909" , "Hello World" .getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(message, new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.printf("%-10d OK %s %n" , index, sendResult.getMsgId()); } @Override public void onException (Throwable throwable) { System.out.printf("%-10d Exception %s %n" , index, throwable); throwable.printStackTrace(); } }); } countDownLatch2.await(5 , TimeUnit.SECONDS); producer.shutdown(); }
3、发送单向消息 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
1 2 3 4 5 6 7 8 9 10 11 12 public static void onewayProducer () throws Exception { DefaultMQProducer producer = new DefaultMQProducer("Group_A" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); for (int i = 0 ; i < messageCount; i++) { Message message = new Message("TopicTest" , "TagC" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(message); } producer.shutdown(); }
消费消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void consumer () throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer_GroupA" ); consumer.setNamesrvAddr("localhost:9876" ); consumer.subscribe("TopicTest" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgList, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgList); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started" ); }
发送延时消息 只需要给消息设置 DelayTimeLevel
延时等级即可
1 2 3 4 5 6 7 for (int i = 0 ; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic" , ("Hello scheduled message " + i).getBytes()); message.setDelayTimeLevel(3 ); producer.send(message); }
顺序消息 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取 ,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
在局部有序中:一个MessageQueue只能由一个消费者消费,且只能单线程消费。但是这个消费者可以开启多线程,同时消费多个MessageQueue。
顺序消息生产 下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。
生产消息时,订单号相同的消息会被先后发送到同一个队列 MessageQueue
中;消费时,同一个OrderId获取到的肯定是同一个队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public static void orderProducer () throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerA" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); String[] tags = new String[]{"TagA" , "TagC" , "TagD" }; List<OrderStep> orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ); String dateStr = sdf.format(date); for (int i = 0 ; i < 10 ; i++) { String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("TopicTest" , tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int ) index); } }, orderList.get(i).getOrderId()); System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s" , sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); }
顺序消息消费 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public static void orderConsumer () throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerA" ); consumer.setNamesrvAddr("localhost:9876" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest" , "TagA || TagC || TagD" ); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true ); for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(random.nextInt(10 )); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started." ); }
重点:每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
从 consumer.registerMessageListener(new MessageListenerOrderly()
代码可以知道,顺序消息使用 MessageListenerOrderly
来告诉消费者进行顺序消费消息,并且只能单线程去消费同一个queue。而普通消息使用 MessageListenerConcurrently
进行并发消费消息。
事务消息 事务消息共有三种状态,提交状态、回滚状态、中间状态:
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
创建事务型生产者 使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public static void transactionProducer () throws Exception { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer" ); ExecutorService executorService = new ThreadPoolExecutor(2 , 5 , 100 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000 ), new ThreadFactory() { @Override public Thread newThread (Runnable runnable) { Thread thread = new Thread(runnable); thread.setName("client-transaction-msg-check-thread" ); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA" , "TagB" , "TagC" , "TagD" , "TagE" }; for (int i = 0 ; i < 10 ; i++) { try { Message message = new Message("MyTopic" , tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null ); System.out.println(sendResult); Thread.sleep(10 ); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0 ; i < 100000 ; i++) { Thread.sleep(1000 ); } producer.shutdown(); }
实现事务的监听接口 当发送半消息成功时,我们使用 executeLocalTransaction
方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction
方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0 ); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction (Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3 ; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction (MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status){ switch (status){ case 0 : return LocalTransactionState.UNKNOW; case 1 : return LocalTransactionState.COMMIT_MESSAGE; case 2 : return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
事务消息流程
分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1、事务消息发送及提交:
(1) 发送消息(half半消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2、补偿流程(回查):
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
最佳实践 一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。
Keys的使用 每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
事务消息 执行本地事务 executeLocalTransaction
时,如果业务执行失败,可明确告知回滚,直接返回 Rollback
;如果业务成功,不建议直接返回 Commit
,而是建议返回 UNKNOW
。
然后在进行事务回查 checkLocalTransaction
时如果能明确事务成功,才返回 Commit
。如果不能明确本地事务成功,返回 UNKNOW
,服务端默认回查15次。
消息发送失败处理方式 Producer的send方法本身支持内部重试,重试逻辑如下:
至多重试2次。
如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
如果本身向broker发送消息产生超时异常,就不会再重试。
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
参考资料 rocketmq/docs/cn at master · apache/rocketmq (github.com)