001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicInteger;
024import java.util.concurrent.atomic.AtomicLong;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.ActiveMQMessageAudit;
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
035import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
036import org.apache.activemq.command.ConsumerControl;
037import org.apache.activemq.command.ConsumerInfo;
038import org.apache.activemq.command.Message;
039import org.apache.activemq.command.MessageAck;
040import org.apache.activemq.command.MessageDispatch;
041import org.apache.activemq.command.MessageDispatchNotification;
042import org.apache.activemq.command.MessageId;
043import org.apache.activemq.command.MessagePull;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.thread.Scheduler;
046import org.apache.activemq.transaction.Synchronization;
047import org.apache.activemq.transport.TransmitCallback;
048import org.apache.activemq.usage.SystemUsage;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052public class TopicSubscription extends AbstractSubscription {
053
054    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
055    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
056
057    protected PendingMessageCursor matched;
058    protected final SystemUsage usageManager;
059    boolean singleDestination = true;
060    Destination destination;
061    private final Scheduler scheduler;
062
063    private int maximumPendingMessages = -1;
064    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
065    private int discarded;
066    private final Object matchedListMutex = new Object();
067    private final AtomicInteger prefetchExtension = new AtomicInteger(0);
068    private int memoryUsageHighWaterMark = 95;
069    // allow duplicate suppression in a ring network of brokers
070    protected int maxProducersToAudit = 1024;
071    protected int maxAuditDepth = 1000;
072    protected boolean enableAudit = false;
073    protected ActiveMQMessageAudit audit;
074    protected boolean active = false;
075    protected boolean discarding = false;
076
077    //Used for inflight message size calculations
078    protected final Object dispatchLock = new Object();
079    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
080
081    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
082        super(broker, context, info);
083        this.usageManager = usageManager;
084        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
085        if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) {
086            this.matched = new VMPendingMessageCursor(false);
087        } else {
088            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
089        }
090
091        this.scheduler = broker.getScheduler();
092    }
093
094    public void init() throws Exception {
095        this.matched.setSystemUsage(usageManager);
096        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
097        this.matched.start();
098        if (enableAudit) {
099            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
100        }
101        this.active=true;
102    }
103
104    @Override
105    public void add(MessageReference node) throws Exception {
106        if (isDuplicate(node)) {
107            return;
108        }
109        // Lets use an indirect reference so that we can associate a unique
110        // locator /w the message.
111        node = new IndirectMessageReference(node.getMessage());
112        getSubscriptionStatistics().getEnqueues().increment();
113        synchronized (matchedListMutex) {
114            // if this subscriber is already discarding a message, we don't want to add
115            // any more messages to it as those messages can only be advisories generated in the process,
116            // which can trigger the recursive call loop
117            if (discarding) return;
118
119            if (!isFull() && matched.isEmpty()) {
120                // if maximumPendingMessages is set we will only discard messages which
121                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
122                dispatch(node);
123                setSlowConsumer(false);
124            } else {
125                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
126                    // Slow consumers should log and set their state as such.
127                    if (!isSlowConsumer()) {
128                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
129                        setSlowConsumer(true);
130                        for (Destination dest: destinations) {
131                            dest.slowConsumer(getContext(), this);
132                        }
133                    }
134                }
135                if (maximumPendingMessages != 0) {
136                    boolean warnedAboutWait = false;
137                    while (active) {
138                        while (matched.isFull()) {
139                            if (getContext().getStopping().get()) {
140                                LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
141                                getSubscriptionStatistics().getEnqueues().decrement();
142                                return;
143                            }
144                            if (!warnedAboutWait) {
145                                LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
146                                        new Object[]{
147                                                toString(),
148                                                matched,
149                                                matched.getSystemUsage().getTempUsage().getPercentUsage(),
150                                                matched.getSystemUsage().getMemoryUsage().getPercentUsage()
151                                        });
152                                warnedAboutWait = true;
153                            }
154                            matchedListMutex.wait(20);
155                        }
156                        // Temporary storage could be full - so just try to add the message
157                        // see https://issues.apache.org/activemq/browse/AMQ-2475
158                        if (matched.tryAddMessageLast(node, 10)) {
159                            break;
160                        }
161                    }
162                    if (maximumPendingMessages > 0) {
163                        // calculate the high water mark from which point we
164                        // will eagerly evict expired messages
165                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
166                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
167                            max = maximumPendingMessages;
168                        }
169                        if (!matched.isEmpty() && matched.size() > max) {
170                            removeExpiredMessages();
171                        }
172                        // lets discard old messages as we are a slow consumer
173                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
174                            int pageInSize = matched.size() - maximumPendingMessages;
175                            // only page in a 1000 at a time - else we could blow the memory
176                            pageInSize = Math.max(1000, pageInSize);
177                            LinkedList<MessageReference> list = null;
178                            MessageReference[] oldMessages=null;
179                            synchronized(matched){
180                                list = matched.pageInList(pageInSize);
181                                oldMessages = messageEvictionStrategy.evictMessages(list);
182                                for (MessageReference ref : list) {
183                                    ref.decrementReferenceCount();
184                                }
185                            }
186                            int messagesToEvict = 0;
187                            if (oldMessages != null){
188                                messagesToEvict = oldMessages.length;
189                                for (int i = 0; i < messagesToEvict; i++) {
190                                    MessageReference oldMessage = oldMessages[i];
191                                    discard(oldMessage);
192                                }
193                            }
194                            // lets avoid an infinite loop if we are given a bad eviction strategy
195                            // for a bad strategy lets just not evict
196                            if (messagesToEvict == 0) {
197                                LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{
198                                        destination, messageEvictionStrategy, list.size()
199                                });
200                                break;
201                            }
202                        }
203                    }
204                    dispatchMatched();
205                }
206            }
207        }
208    }
209
210    private boolean isDuplicate(MessageReference node) {
211        boolean duplicate = false;
212        if (enableAudit && audit != null) {
213            duplicate = audit.isDuplicate(node);
214            if (LOG.isDebugEnabled()) {
215                if (duplicate) {
216                    LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId());
217                }
218            }
219        }
220        return duplicate;
221    }
222
223    /**
224     * Discard any expired messages from the matched list. Called from a
225     * synchronized block.
226     *
227     * @throws IOException
228     */
229    protected void removeExpiredMessages() throws IOException {
230        try {
231            matched.reset();
232            while (matched.hasNext()) {
233                MessageReference node = matched.next();
234                node.decrementReferenceCount();
235                if (broker.isExpired(node)) {
236                    matched.remove();
237                    getSubscriptionStatistics().getDispatched().increment();
238                    node.decrementReferenceCount();
239                    ((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
240                    broker.messageExpired(getContext(), node, this);
241                    break;
242                }
243            }
244        } finally {
245            matched.release();
246        }
247    }
248
249    @Override
250    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
251        synchronized (matchedListMutex) {
252            try {
253                matched.reset();
254                while (matched.hasNext()) {
255                    MessageReference node = matched.next();
256                    node.decrementReferenceCount();
257                    if (node.getMessageId().equals(mdn.getMessageId())) {
258                        synchronized(dispatchLock) {
259                            matched.remove();
260                            getSubscriptionStatistics().getDispatched().increment();
261                            dispatched.add(node);
262                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
263                            node.decrementReferenceCount();
264                        }
265                        break;
266                    }
267                }
268            } finally {
269                matched.release();
270            }
271        }
272    }
273
274    @Override
275    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
276        super.acknowledge(context, ack);
277
278        // Handle the standard acknowledgment case.
279        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
280            if (context.isInTransaction()) {
281                context.getTransaction().addSynchronization(new Synchronization() {
282                    @Override
283                    public void afterCommit() throws Exception {
284                        updateStatsOnAck(ack);
285                        dispatchMatched();
286                    }
287                });
288            } else {
289                updateStatsOnAck(ack);
290            }
291            updatePrefetch(ack);
292            dispatchMatched();
293            return;
294        } else if (ack.isDeliveredAck()) {
295            // Message was delivered but not acknowledged: update pre-fetch counters.
296            prefetchExtension.addAndGet(ack.getMessageCount());
297            dispatchMatched();
298            return;
299        } else if (ack.isExpiredAck()) {
300            updateStatsOnAck(ack);
301            updatePrefetch(ack);
302            dispatchMatched();
303            return;
304        } else if (ack.isRedeliveredAck()) {
305            // nothing to do atm
306            return;
307        }
308        throw new JMSException("Invalid acknowledgment: " + ack);
309    }
310
311    @Override
312    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
313
314        // The slave should not deliver pull messages.
315        if (getPrefetchSize() == 0) {
316
317            final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount();
318            prefetchExtension.set(pull.getQuantity());
319            dispatchMatched();
320
321            // If there was nothing dispatched.. we may need to setup a timeout.
322            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
323
324                // immediate timeout used by receiveNoWait()
325                if (pull.getTimeout() == -1) {
326                    // Send a NULL message to signal nothing pending.
327                    dispatch(null);
328                    prefetchExtension.set(0);
329                }
330
331                if (pull.getTimeout() > 0) {
332                    scheduler.executeAfterDelay(new Runnable() {
333
334                        @Override
335                        public void run() {
336                            pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
337                        }
338                    }, pull.getTimeout());
339                }
340            }
341        }
342        return null;
343    }
344
345    /**
346     * Occurs when a pull times out. If nothing has been dispatched since the
347     * timeout was setup, then send the NULL message.
348     */
349    private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
350        synchronized (matchedListMutex) {
351            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) {
352                try {
353                    dispatch(null);
354                } catch (Exception e) {
355                    context.getConnection().serviceException(e);
356                } finally {
357                    prefetchExtension.set(0);
358                }
359            }
360        }
361    }
362
363    /**
364     * Update the statistics on message ack.
365     * @param ack
366     */
367    private void updateStatsOnAck(final MessageAck ack) {
368        synchronized(dispatchLock) {
369            boolean inAckRange = false;
370            List<MessageReference> removeList = new ArrayList<MessageReference>();
371            for (final MessageReference node : dispatched) {
372                MessageId messageId = node.getMessageId();
373                if (ack.getFirstMessageId() == null
374                        || ack.getFirstMessageId().equals(messageId)) {
375                    inAckRange = true;
376                }
377                if (inAckRange) {
378                    removeList.add(node);
379                    if (ack.getLastMessageId().equals(messageId)) {
380                        break;
381                    }
382                }
383            }
384
385            for (final MessageReference node : removeList) {
386                dispatched.remove(node);
387                getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
388                getSubscriptionStatistics().getDequeues().increment();
389                ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
390                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
391                if (info.isNetworkSubscription()) {
392                    ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
393                }
394                if (ack.isExpiredAck()) {
395                    destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
396                }
397            }
398        }
399    }
400
401    private void updatePrefetch(MessageAck ack) {
402        while (true) {
403            int currentExtension = prefetchExtension.get();
404            int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
405            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
406                break;
407            }
408        }
409    }
410
411    @Override
412    public int countBeforeFull() {
413        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
414    }
415
416    @Override
417    public int getPendingQueueSize() {
418        return matched();
419    }
420
421    @Override
422    public long getPendingMessageSize() {
423        synchronized (matchedListMutex) {
424            return matched.messageSize();
425        }
426    }
427
428    @Override
429    public int getDispatchedQueueSize() {
430        return (int)(getSubscriptionStatistics().getDispatched().getCount() -
431                prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
432    }
433
434    public int getMaximumPendingMessages() {
435        return maximumPendingMessages;
436    }
437
438    @Override
439    public long getDispatchedCounter() {
440        return getSubscriptionStatistics().getDispatched().getCount();
441    }
442
443    @Override
444    public long getEnqueueCounter() {
445        return getSubscriptionStatistics().getEnqueues().getCount();
446    }
447
448    @Override
449    public long getDequeueCounter() {
450        return getSubscriptionStatistics().getDequeues().getCount();
451    }
452
453    /**
454     * @return the number of messages discarded due to being a slow consumer
455     */
456    public int discarded() {
457        synchronized (matchedListMutex) {
458            return discarded;
459        }
460    }
461
462    /**
463     * @return the number of matched messages (messages targeted for the
464     *         subscription but not yet able to be dispatched due to the
465     *         prefetch buffer being full).
466     */
467    public int matched() {
468        synchronized (matchedListMutex) {
469            return matched.size();
470        }
471    }
472
473    /**
474     * Sets the maximum number of pending messages that can be matched against
475     * this consumer before old messages are discarded.
476     */
477    public void setMaximumPendingMessages(int maximumPendingMessages) {
478        this.maximumPendingMessages = maximumPendingMessages;
479    }
480
481    public MessageEvictionStrategy getMessageEvictionStrategy() {
482        return messageEvictionStrategy;
483    }
484
485    /**
486     * Sets the eviction strategy used to decide which message to evict when the
487     * slow consumer needs to discard messages
488     */
489    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
490        this.messageEvictionStrategy = messageEvictionStrategy;
491    }
492
493    public int getMaxProducersToAudit() {
494        return maxProducersToAudit;
495    }
496
497    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
498        this.maxProducersToAudit = maxProducersToAudit;
499        if (audit != null) {
500            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
501        }
502    }
503
504    public int getMaxAuditDepth() {
505        return maxAuditDepth;
506    }
507
508    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
509        this.maxAuditDepth = maxAuditDepth;
510        if (audit != null) {
511            audit.setAuditDepth(maxAuditDepth);
512        }
513    }
514
515    public boolean isEnableAudit() {
516        return enableAudit;
517    }
518
519    public synchronized void setEnableAudit(boolean enableAudit) {
520        this.enableAudit = enableAudit;
521        if (enableAudit && audit == null) {
522            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
523        }
524    }
525
526    // Implementation methods
527    // -------------------------------------------------------------------------
528    @Override
529    public boolean isFull() {
530        return getDispatchedQueueSize() >= info.getPrefetchSize();
531    }
532
533    @Override
534    public int getInFlightSize() {
535        return getDispatchedQueueSize();
536    }
537
538    /**
539     * @return true when 60% or more room is left for dispatching messages
540     */
541    @Override
542    public boolean isLowWaterMark() {
543        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
544    }
545
546    /**
547     * @return true when 10% or less room is left for dispatching messages
548     */
549    @Override
550    public boolean isHighWaterMark() {
551        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
552    }
553
554    /**
555     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
556     */
557    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
558        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
559    }
560
561    /**
562     * @return the memoryUsageHighWaterMark
563     */
564    public int getMemoryUsageHighWaterMark() {
565        return this.memoryUsageHighWaterMark;
566    }
567
568    /**
569     * @return the usageManager
570     */
571    public SystemUsage getUsageManager() {
572        return this.usageManager;
573    }
574
575    /**
576     * @return the matched
577     */
578    public PendingMessageCursor getMatched() {
579        return this.matched;
580    }
581
582    /**
583     * @param matched the matched to set
584     */
585    public void setMatched(PendingMessageCursor matched) {
586        this.matched = matched;
587    }
588
589    /**
590     * inform the MessageConsumer on the client to change it's prefetch
591     *
592     * @param newPrefetch
593     */
594    @Override
595    public void updateConsumerPrefetch(int newPrefetch) {
596        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
597            ConsumerControl cc = new ConsumerControl();
598            cc.setConsumerId(info.getConsumerId());
599            cc.setPrefetch(newPrefetch);
600            context.getConnection().dispatchAsync(cc);
601        }
602    }
603
604    private void dispatchMatched() throws IOException {
605        synchronized (matchedListMutex) {
606            if (!matched.isEmpty() && !isFull()) {
607                try {
608                    matched.reset();
609
610                    while (matched.hasNext() && !isFull()) {
611                        MessageReference message = matched.next();
612                        message.decrementReferenceCount();
613                        matched.remove();
614                        // Message may have been sitting in the matched list a while
615                        // waiting for the consumer to ak the message.
616                        if (message.isExpired()) {
617                            discard(message);
618                            continue; // just drop it.
619                        }
620                        dispatch(message);
621                    }
622                } finally {
623                    matched.release();
624                }
625            }
626        }
627    }
628
629    private void dispatch(final MessageReference node) throws IOException {
630        Message message = node != null ? node.getMessage() : null;
631        if (node != null) {
632            node.incrementReferenceCount();
633        }
634        // Make sure we can dispatch a message.
635        MessageDispatch md = new MessageDispatch();
636        md.setMessage(message);
637        md.setConsumerId(info.getConsumerId());
638        if (node != null) {
639            md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
640            synchronized(dispatchLock) {
641                getSubscriptionStatistics().getDispatched().increment();
642                dispatched.add(node);
643                getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
644            }
645
646            // Keep track if this subscription is receiving messages from a single destination.
647            if (singleDestination) {
648                if (destination == null) {
649                    destination = (Destination)node.getRegionDestination();
650                } else {
651                    if (destination != node.getRegionDestination()) {
652                        singleDestination = false;
653                    }
654                }
655            }
656        }
657        if (info.isDispatchAsync()) {
658            if (node != null) {
659                md.setTransmitCallback(new TransmitCallback() {
660
661                    @Override
662                    public void onSuccess() {
663                        Destination regionDestination = (Destination) node.getRegionDestination();
664                        regionDestination.getDestinationStatistics().getDispatched().increment();
665                        regionDestination.getDestinationStatistics().getInflight().increment();
666                        node.decrementReferenceCount();
667                    }
668
669                    @Override
670                    public void onFailure() {
671                        Destination regionDestination = (Destination) node.getRegionDestination();
672                        regionDestination.getDestinationStatistics().getDispatched().increment();
673                        regionDestination.getDestinationStatistics().getInflight().increment();
674                        node.decrementReferenceCount();
675                    }
676                });
677            }
678            context.getConnection().dispatchAsync(md);
679        } else {
680            context.getConnection().dispatchSync(md);
681            if (node != null) {
682                Destination regionDestination = (Destination) node.getRegionDestination();
683                regionDestination.getDestinationStatistics().getDispatched().increment();
684                regionDestination.getDestinationStatistics().getInflight().increment();
685                node.decrementReferenceCount();
686            }
687        }
688    }
689
690    private void discard(MessageReference message) {
691        discarding = true;
692        try {
693            message.decrementReferenceCount();
694            matched.remove(message);
695            discarded++;
696            if (destination != null) {
697                destination.getDestinationStatistics().getDequeues().increment();
698            }
699            LOG.debug("{}, discarding message {}", this, message);
700            Destination dest = (Destination) message.getRegionDestination();
701            if (dest != null) {
702                dest.messageDiscarded(getContext(), this, message);
703            }
704            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
705        } finally {
706            discarding = false;
707        }
708    }
709
710    @Override
711    public String toString() {
712        return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
713                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
714    }
715
716    @Override
717    public void destroy() {
718        this.active=false;
719        synchronized (matchedListMutex) {
720            try {
721                matched.destroy();
722            } catch (Exception e) {
723                LOG.warn("Failed to destroy cursor", e);
724            }
725        }
726        setSlowConsumer(false);
727        synchronized(dispatchLock) {
728            dispatched.clear();
729        }
730    }
731
732    @Override
733    public int getPrefetchSize() {
734        return info.getPrefetchSize();
735    }
736
737    @Override
738    public void setPrefetchSize(int newSize) {
739        info.setPrefetchSize(newSize);
740        try {
741            dispatchMatched();
742        } catch(Exception e) {
743            LOG.trace("Caught exception on dispatch after prefetch size change.");
744        }
745    }
746}