001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.List; 023 024import org.apache.activemq.broker.region.MessageReference; 025import org.apache.activemq.broker.region.QueueMessageReference; 026import org.apache.activemq.command.MessageId; 027 028/** 029 * An abstraction that keeps the correct order of messages that need to be dispatched 030 * to consumers, but also hides the fact that there might be redelivered messages that 031 * should be dispatched ahead of any other paged in messages. 032 * 033 * Direct usage of this class is recommended as you can control when redeliveries need 034 * to be added vs regular pending messages (the next set of messages that can be dispatched) 035 * 036 * Created by ceposta 037 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. 038 */ 039public class QueueDispatchPendingList implements PendingList { 040 041 private PendingList pagedInPendingDispatch = new OrderedPendingList(); 042 private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); 043 private boolean prioritized = false; 044 045 046 @Override 047 public boolean isEmpty() { 048 return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty(); 049 } 050 051 @Override 052 public void clear() { 053 pagedInPendingDispatch.clear(); 054 redeliveredWaitingDispatch.clear(); 055 } 056 057 /** 058 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 059 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 060 * method 061 * @param message 062 * The MessageReference that is to be added to this list. 063 * 064 * @return the pending node. 065 */ 066 @Override 067 public PendingNode addMessageFirst(MessageReference message) { 068 return pagedInPendingDispatch.addMessageFirst(message); 069 } 070 071 /** 072 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 073 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 074 * method 075 * @param message 076 * The MessageReference that is to be added to this list. 077 * 078 * @return the pending node. 079 */ 080 @Override 081 public PendingNode addMessageLast(MessageReference message) { 082 return pagedInPendingDispatch.addMessageLast(message); 083 } 084 085 @Override 086 public PendingNode remove(MessageReference message) { 087 if (pagedInPendingDispatch.contains(message)) { 088 return pagedInPendingDispatch.remove(message); 089 } else if (redeliveredWaitingDispatch.contains(message)) { 090 return redeliveredWaitingDispatch.remove(message); 091 } 092 return null; 093 } 094 095 @Override 096 public int size() { 097 return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size(); 098 } 099 100 @Override 101 public long messageSize() { 102 return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize(); 103 } 104 105 @Override 106 public Iterator<MessageReference> iterator() { 107 if (prioritized && hasRedeliveries()) { 108 final QueueDispatchPendingList delegate = this; 109 final PrioritizedPendingList priorityOrderedRedeliveredAndPending = new PrioritizedPendingList(); 110 priorityOrderedRedeliveredAndPending.addAll(redeliveredWaitingDispatch); 111 priorityOrderedRedeliveredAndPending.addAll(pagedInPendingDispatch); 112 113 return new Iterator<MessageReference>() { 114 115 Iterator<MessageReference> combinedIterator = priorityOrderedRedeliveredAndPending.iterator(); 116 MessageReference current = null; 117 118 @Override 119 public boolean hasNext() { 120 return combinedIterator.hasNext(); 121 } 122 123 @Override 124 public MessageReference next() { 125 current = combinedIterator.next(); 126 return current; 127 } 128 129 @Override 130 public void remove() { 131 if (current!=null) { 132 delegate.remove(current); 133 } 134 } 135 }; 136 137 } else { 138 139 return new Iterator<MessageReference>() { 140 141 Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator(); 142 Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator(); 143 Iterator<MessageReference> current = redeliveries; 144 145 146 @Override 147 public boolean hasNext() { 148 if (!redeliveries.hasNext() && (current == redeliveries)) { 149 current = pendingDispatch; 150 } 151 return current.hasNext(); 152 } 153 154 @Override 155 public MessageReference next() { 156 return current.next(); 157 } 158 159 @Override 160 public void remove() { 161 current.remove(); 162 } 163 }; 164 } 165 } 166 167 @Override 168 public boolean contains(MessageReference message) { 169 return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message); 170 } 171 172 @Override 173 public Collection<MessageReference> values() { 174 List<MessageReference> messageReferences = new ArrayList<MessageReference>(); 175 Iterator<MessageReference> iterator = iterator(); 176 while (iterator.hasNext()) { 177 messageReferences.add(iterator.next()); 178 } 179 return messageReferences; 180 } 181 182 @Override 183 public void addAll(PendingList pendingList) { 184 pagedInPendingDispatch.addAll(pendingList); 185 } 186 187 @Override 188 public MessageReference get(MessageId messageId) { 189 MessageReference rc = pagedInPendingDispatch.get(messageId); 190 if (rc == null) { 191 return redeliveredWaitingDispatch.get(messageId); 192 } 193 return rc; 194 } 195 196 public void setPrioritizedMessages(boolean prioritizedMessages) { 197 prioritized = prioritizedMessages; 198 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { 199 pagedInPendingDispatch = new PrioritizedPendingList(); 200 redeliveredWaitingDispatch = new PrioritizedPendingList(); 201 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { 202 pagedInPendingDispatch = new OrderedPendingList(); 203 redeliveredWaitingDispatch = new OrderedPendingList(); 204 } 205 } 206 207 public void addMessageForRedelivery(QueueMessageReference qmr) { 208 redeliveredWaitingDispatch.addMessageLast(qmr); 209 } 210 211 public boolean hasRedeliveries(){ 212 return !redeliveredWaitingDispatch.isEmpty(); 213 } 214}