001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.List;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.broker.region.QueueMessageReference;
026import org.apache.activemq.command.MessageId;
027
028/**
029 * An abstraction that keeps the correct order of messages that need to be dispatched
030 * to consumers, but also hides the fact that there might be redelivered messages that
031 * should be dispatched ahead of any other paged in messages.
032 *
033 * Direct usage of this class is recommended as you can control when redeliveries need
034 * to be added vs regular pending messages (the next set of messages that can be dispatched)
035 *
036 * Created by ceposta
037 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
038 */
039public class QueueDispatchPendingList implements PendingList {
040
041    private PendingList pagedInPendingDispatch = new OrderedPendingList();
042    private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
043    private boolean prioritized = false;
044
045
046    @Override
047    public boolean isEmpty() {
048        return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty();
049    }
050
051    @Override
052    public void clear() {
053        pagedInPendingDispatch.clear();
054        redeliveredWaitingDispatch.clear();
055    }
056
057    /**
058     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
059     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
060     * method
061     * @param message
062     *      The MessageReference that is to be added to this list.
063     *
064     * @return the pending node.
065     */
066    @Override
067    public PendingNode addMessageFirst(MessageReference message) {
068        return pagedInPendingDispatch.addMessageFirst(message);
069    }
070
071    /**
072     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
073     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
074     * method
075     * @param message
076     *      The MessageReference that is to be added to this list.
077     *
078     * @return the pending node.
079     */
080    @Override
081    public PendingNode addMessageLast(MessageReference message) {
082        return pagedInPendingDispatch.addMessageLast(message);
083    }
084
085    @Override
086    public PendingNode remove(MessageReference message) {
087        if (pagedInPendingDispatch.contains(message)) {
088            return pagedInPendingDispatch.remove(message);
089        } else if (redeliveredWaitingDispatch.contains(message)) {
090            return redeliveredWaitingDispatch.remove(message);
091        }
092        return null;
093    }
094
095    @Override
096    public int size() {
097        return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size();
098    }
099
100    @Override
101    public long messageSize() {
102        return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize();
103    }
104
105    @Override
106    public Iterator<MessageReference> iterator() {
107        if (prioritized && hasRedeliveries()) {
108            final QueueDispatchPendingList delegate = this;
109            final PrioritizedPendingList  priorityOrderedRedeliveredAndPending = new PrioritizedPendingList();
110            priorityOrderedRedeliveredAndPending.addAll(redeliveredWaitingDispatch);
111            priorityOrderedRedeliveredAndPending.addAll(pagedInPendingDispatch);
112
113            return new Iterator<MessageReference>() {
114
115                Iterator<MessageReference> combinedIterator = priorityOrderedRedeliveredAndPending.iterator();
116                MessageReference current = null;
117
118                @Override
119                public boolean hasNext() {
120                    return combinedIterator.hasNext();
121                }
122
123                @Override
124                public MessageReference next() {
125                    current = combinedIterator.next();
126                    return current;
127                }
128
129                @Override
130                public void remove() {
131                    if (current!=null) {
132                        delegate.remove(current);
133                    }
134                }
135            };
136
137        } else {
138
139            return new Iterator<MessageReference>() {
140
141                Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator();
142                Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator();
143                Iterator<MessageReference> current = redeliveries;
144
145
146                @Override
147                public boolean hasNext() {
148                    if (!redeliveries.hasNext() && (current == redeliveries)) {
149                        current = pendingDispatch;
150                    }
151                    return current.hasNext();
152                }
153
154                @Override
155                public MessageReference next() {
156                    return current.next();
157                }
158
159                @Override
160                public void remove() {
161                    current.remove();
162                }
163            };
164        }
165    }
166
167    @Override
168    public boolean contains(MessageReference message) {
169        return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message);
170    }
171
172    @Override
173    public Collection<MessageReference> values() {
174        List<MessageReference> messageReferences = new ArrayList<MessageReference>();
175        Iterator<MessageReference> iterator = iterator();
176        while (iterator.hasNext()) {
177            messageReferences.add(iterator.next());
178        }
179        return messageReferences;
180    }
181
182    @Override
183    public void addAll(PendingList pendingList) {
184        pagedInPendingDispatch.addAll(pendingList);
185    }
186
187    @Override
188    public MessageReference get(MessageId messageId) {
189        MessageReference rc = pagedInPendingDispatch.get(messageId);
190        if (rc == null) {
191            return redeliveredWaitingDispatch.get(messageId);
192        }
193        return rc;
194    }
195
196    public void setPrioritizedMessages(boolean prioritizedMessages) {
197        prioritized = prioritizedMessages;
198        if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
199            pagedInPendingDispatch = new PrioritizedPendingList();
200            redeliveredWaitingDispatch = new PrioritizedPendingList();
201        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
202            pagedInPendingDispatch = new OrderedPendingList();
203            redeliveredWaitingDispatch = new OrderedPendingList();
204        }
205    }
206
207    public void addMessageForRedelivery(QueueMessageReference qmr) {
208        redeliveredWaitingDispatch.addMessageLast(qmr);
209    }
210
211    public boolean hasRedeliveries(){
212        return !redeliveredWaitingDispatch.isEmpty();
213    }
214}