001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.region.MessageReference;
021import org.apache.activemq.broker.region.Queue;
022import org.apache.activemq.command.Message;
023import org.apache.activemq.usage.SystemUsage;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Store based Cursor for Queues
029 */
030public class StoreQueueCursor extends AbstractPendingMessageCursor {
031
032    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
033    private final Broker broker;
034    private int pendingCount;
035    private final Queue queue;
036    private PendingMessageCursor nonPersistent;
037    private final QueueStorePrefetch persistent;
038    private boolean started;
039    private PendingMessageCursor currentCursor;
040
041    /**
042     * Construct
043     * @param broker
044     * @param queue
045     */
046    public StoreQueueCursor(Broker broker,Queue queue) {
047        super((queue != null ? queue.isPrioritizedMessages():false));
048        this.broker=broker;
049        this.queue = queue;
050        this.persistent = new QueueStorePrefetch(queue, broker);
051        currentCursor = persistent;
052    }
053
054    @Override
055    public synchronized void start() throws Exception {
056        started = true;
057        super.start();
058        if (nonPersistent == null) {
059            if (broker.getBrokerService().isPersistent()) {
060                nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
061            }else {
062                nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
063            }
064            nonPersistent.setMaxBatchSize(getMaxBatchSize());
065            nonPersistent.setSystemUsage(systemUsage);
066            nonPersistent.setEnableAudit(isEnableAudit());
067            nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
068            nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
069        }
070        nonPersistent.setMessageAudit(getMessageAudit());
071        nonPersistent.start();
072        persistent.setMessageAudit(getMessageAudit());
073        persistent.start();
074        pendingCount = persistent.size() + nonPersistent.size();
075    }
076
077    @Override
078    public synchronized void stop() throws Exception {
079        started = false;
080        if (nonPersistent != null) {
081//            nonPersistent.clear();
082//            nonPersistent.stop();
083//            nonPersistent.gc();
084          nonPersistent.destroy();
085        }
086        persistent.stop();
087        persistent.gc();
088        super.stop();
089        pendingCount = 0;
090    }
091
092    @Override
093    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) throws Exception {
094        boolean result = true;
095        if (node != null) {
096            Message msg = node.getMessage();
097            if (started) {
098                pendingCount++;
099                if (!msg.isPersistent()) {
100                    result = nonPersistent.tryAddMessageLast(node, maxWait);
101                }
102            }
103            if (msg.isPersistent()) {
104                result = persistent.addMessageLast(node);
105            }
106        }
107        return result;
108    }
109
110    @Override
111    public synchronized void addMessageFirst(MessageReference node) throws Exception {
112        if (node != null) {
113            Message msg = node.getMessage();
114            if (started) {
115                pendingCount++;
116                if (!msg.isPersistent()) {
117                    nonPersistent.addMessageFirst(node);
118                }
119            }
120            if (msg.isPersistent()) {
121                persistent.addMessageFirst(node);
122            }
123        }
124    }
125
126    @Override
127    public synchronized void clear() {
128        pendingCount = 0;
129    }
130
131    @Override
132    public synchronized boolean hasNext() {
133        try {
134            getNextCursor();
135        } catch (Exception e) {
136            LOG.error("Failed to get current cursor ", e);
137            throw new RuntimeException(e);
138       }
139       return currentCursor != null ? currentCursor.hasNext() : false;
140    }
141
142    @Override
143    public synchronized MessageReference next() {
144        MessageReference result = currentCursor != null ? currentCursor.next() : null;
145        return result;
146    }
147
148    @Override
149    public synchronized void remove() {
150        if (currentCursor != null) {
151            currentCursor.remove();
152        }
153        pendingCount--;
154    }
155
156    @Override
157    public synchronized void remove(MessageReference node) {
158        if (!node.isPersistent()) {
159            nonPersistent.remove(node);
160        } else {
161            persistent.remove(node);
162        }
163        pendingCount--;
164    }
165
166    @Override
167    public synchronized void reset() {
168        nonPersistent.reset();
169        persistent.reset();
170        pendingCount = persistent.size() + nonPersistent.size();
171    }
172
173    @Override
174    public void release() {
175        nonPersistent.release();
176        persistent.release();
177    }
178
179
180    @Override
181    public synchronized int size() {
182        if (pendingCount < 0) {
183            pendingCount = persistent.size() + nonPersistent.size();
184        }
185        return pendingCount;
186    }
187
188    @Override
189    public synchronized long messageSize() {
190        return persistent.messageSize() + nonPersistent.messageSize();
191    }
192
193    @Override
194    public synchronized boolean isEmpty() {
195        // if negative, more messages arrived in store since last reset so non empty
196        return pendingCount == 0;
197    }
198
199    /**
200     * Informs the Broker if the subscription needs to intervention to recover
201     * it's state e.g. DurableTopicSubscriber may do
202     *
203     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
204     * @return true if recovery required
205     */
206    @Override
207    public boolean isRecoveryRequired() {
208        return false;
209    }
210
211    /**
212     * @return the nonPersistent Cursor
213     */
214    public PendingMessageCursor getNonPersistent() {
215        return this.nonPersistent;
216    }
217
218    /**
219     * @param nonPersistent cursor to set
220     */
221    public void setNonPersistent(PendingMessageCursor nonPersistent) {
222        this.nonPersistent = nonPersistent;
223    }
224
225    @Override
226    public void setMaxBatchSize(int maxBatchSize) {
227        persistent.setMaxBatchSize(maxBatchSize);
228        if (nonPersistent != null) {
229            nonPersistent.setMaxBatchSize(maxBatchSize);
230        }
231        super.setMaxBatchSize(maxBatchSize);
232    }
233
234
235    @Override
236    public void setMaxProducersToAudit(int maxProducersToAudit) {
237        super.setMaxProducersToAudit(maxProducersToAudit);
238        if (persistent != null) {
239            persistent.setMaxProducersToAudit(maxProducersToAudit);
240        }
241        if (nonPersistent != null) {
242            nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
243        }
244    }
245
246    @Override
247    public void setMaxAuditDepth(int maxAuditDepth) {
248        super.setMaxAuditDepth(maxAuditDepth);
249        if (persistent != null) {
250            persistent.setMaxAuditDepth(maxAuditDepth);
251        }
252        if (nonPersistent != null) {
253            nonPersistent.setMaxAuditDepth(maxAuditDepth);
254        }
255    }
256
257    @Override
258    public void setEnableAudit(boolean enableAudit) {
259        super.setEnableAudit(enableAudit);
260        if (persistent != null) {
261            persistent.setEnableAudit(enableAudit);
262        }
263        if (nonPersistent != null) {
264            nonPersistent.setEnableAudit(enableAudit);
265        }
266    }
267
268    @Override
269    public void setUseCache(boolean useCache) {
270        super.setUseCache(useCache);
271        if (persistent != null) {
272            persistent.setUseCache(useCache);
273        }
274        if (nonPersistent != null) {
275            nonPersistent.setUseCache(useCache);
276        }
277    }
278
279    @Override
280    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
281        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
282        if (persistent != null) {
283            persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
284        }
285        if (nonPersistent != null) {
286            nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
287        }
288    }
289
290
291
292    @Override
293    public synchronized void gc() {
294        if (persistent != null) {
295            persistent.gc();
296        }
297        if (nonPersistent != null) {
298            nonPersistent.gc();
299        }
300        pendingCount = persistent.size() + nonPersistent.size();
301    }
302
303    @Override
304    public void setSystemUsage(SystemUsage usageManager) {
305        super.setSystemUsage(usageManager);
306        if (persistent != null) {
307            persistent.setSystemUsage(usageManager);
308        }
309        if (nonPersistent != null) {
310            nonPersistent.setSystemUsage(usageManager);
311        }
312    }
313
314    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
315        if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
316            currentCursor = currentCursor == persistent ? nonPersistent : persistent;
317            // sanity check
318            if (currentCursor.isEmpty()) {
319                currentCursor = currentCursor == persistent ? nonPersistent : persistent;
320            }
321        }
322        return currentCursor;
323    }
324
325    @Override
326    public boolean isCacheEnabled() {
327        boolean cacheEnabled = isUseCache();
328        if (cacheEnabled) {
329            if (persistent != null) {
330                cacheEnabled &= persistent.isCacheEnabled();
331            }
332            if (nonPersistent != null) {
333                cacheEnabled &= nonPersistent.isCacheEnabled();
334            }
335            setCacheEnabled(cacheEnabled);
336        }
337        return cacheEnabled;
338    }
339
340    @Override
341    public void rebase() {
342        persistent.rebase();
343        reset();
344    }
345
346}