001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.activemq.broker.ConnectionContext;
044import org.apache.activemq.broker.region.BaseDestination;
045import org.apache.activemq.broker.scheduler.JobSchedulerStore;
046import org.apache.activemq.command.ActiveMQDestination;
047import org.apache.activemq.command.ActiveMQQueue;
048import org.apache.activemq.command.ActiveMQTempQueue;
049import org.apache.activemq.command.ActiveMQTempTopic;
050import org.apache.activemq.command.ActiveMQTopic;
051import org.apache.activemq.command.Message;
052import org.apache.activemq.command.MessageAck;
053import org.apache.activemq.command.MessageId;
054import org.apache.activemq.command.ProducerId;
055import org.apache.activemq.command.SubscriptionInfo;
056import org.apache.activemq.command.TransactionId;
057import org.apache.activemq.openwire.OpenWireFormat;
058import org.apache.activemq.protobuf.Buffer;
059import org.apache.activemq.store.AbstractMessageStore;
060import org.apache.activemq.store.IndexListener;
061import org.apache.activemq.store.ListenableFuture;
062import org.apache.activemq.store.MessageRecoveryListener;
063import org.apache.activemq.store.MessageStore;
064import org.apache.activemq.store.MessageStoreStatistics;
065import org.apache.activemq.store.PersistenceAdapter;
066import org.apache.activemq.store.TopicMessageStore;
067import org.apache.activemq.store.TransactionIdTransformer;
068import org.apache.activemq.store.TransactionStore;
069import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
070import org.apache.activemq.store.kahadb.data.KahaDestination;
071import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
072import org.apache.activemq.store.kahadb.data.KahaLocation;
073import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
074import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
075import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
076import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
077import org.apache.activemq.store.kahadb.disk.journal.Location;
078import org.apache.activemq.store.kahadb.disk.page.Transaction;
079import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
080import org.apache.activemq.usage.MemoryUsage;
081import org.apache.activemq.usage.SystemUsage;
082import org.apache.activemq.util.ServiceStopper;
083import org.apache.activemq.util.ThreadPoolUtils;
084import org.apache.activemq.wireformat.WireFormat;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
089    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
090    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
091
092    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
093    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
094            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
095    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
096    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
097            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
098
099    protected ExecutorService queueExecutor;
100    protected ExecutorService topicExecutor;
101    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
102    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
103    final WireFormat wireFormat = new OpenWireFormat();
104    private SystemUsage usageManager;
105    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
106    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
107    Semaphore globalQueueSemaphore;
108    Semaphore globalTopicSemaphore;
109    private boolean concurrentStoreAndDispatchQueues = true;
110    // when true, message order may be compromised when cache is exhausted if store is out
111    // or order w.r.t cache
112    private boolean concurrentStoreAndDispatchTopics = false;
113    private final boolean concurrentStoreAndDispatchTransactions = false;
114    private int maxAsyncJobs = MAX_ASYNC_JOBS;
115    private final KahaDBTransactionStore transactionStore;
116    private TransactionIdTransformer transactionIdTransformer;
117
118    public KahaDBStore() {
119        this.transactionStore = new KahaDBTransactionStore(this);
120        this.transactionIdTransformer = new TransactionIdTransformer() {
121            @Override
122            public TransactionId transform(TransactionId txid) {
123                return txid;
124            }
125        };
126    }
127
128    @Override
129    public String toString() {
130        return "KahaDB:[" + directory.getAbsolutePath() + "]";
131    }
132
133    @Override
134    public void setBrokerName(String brokerName) {
135    }
136
137    @Override
138    public void setUsageManager(SystemUsage usageManager) {
139        this.usageManager = usageManager;
140    }
141
142    public SystemUsage getUsageManager() {
143        return this.usageManager;
144    }
145
146    /**
147     * @return the concurrentStoreAndDispatch
148     */
149    public boolean isConcurrentStoreAndDispatchQueues() {
150        return this.concurrentStoreAndDispatchQueues;
151    }
152
153    /**
154     * @param concurrentStoreAndDispatch
155     *            the concurrentStoreAndDispatch to set
156     */
157    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
158        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
159    }
160
161    /**
162     * @return the concurrentStoreAndDispatch
163     */
164    public boolean isConcurrentStoreAndDispatchTopics() {
165        return this.concurrentStoreAndDispatchTopics;
166    }
167
168    /**
169     * @param concurrentStoreAndDispatch
170     *            the concurrentStoreAndDispatch to set
171     */
172    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
173        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
174    }
175
176    public boolean isConcurrentStoreAndDispatchTransactions() {
177        return this.concurrentStoreAndDispatchTransactions;
178    }
179
180    /**
181     * @return the maxAsyncJobs
182     */
183    public int getMaxAsyncJobs() {
184        return this.maxAsyncJobs;
185    }
186
187    /**
188     * @param maxAsyncJobs
189     *            the maxAsyncJobs to set
190     */
191    public void setMaxAsyncJobs(int maxAsyncJobs) {
192        this.maxAsyncJobs = maxAsyncJobs;
193    }
194
195
196    @Override
197    protected void configureMetadata() {
198        if (brokerService != null) {
199            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
200            wireFormat.setVersion(metadata.openwireVersion);
201
202            if (LOG.isDebugEnabled()) {
203                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
204            }
205
206        }
207    }
208
209    @Override
210    public void doStart() throws Exception {
211        //configure the metadata before start, right now
212        //this is just the open wire version
213        configureMetadata();
214
215        super.doStart();
216
217        if (brokerService != null) {
218            // In case the recovered store used a different OpenWire version log a warning
219            // to assist in determining why journal reads fail.
220            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
221                LOG.warn("Existing Store uses a different OpenWire version[{}] " +
222                         "than the version configured[{}] reverting to the version " +
223                         "used by this store, some newer broker features may not work" +
224                         "as expected.",
225                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
226
227                // Update the broker service instance to the actual version in use.
228                wireFormat.setVersion(metadata.openwireVersion);
229                brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
230            }
231        }
232
233        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
234        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
235        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
236        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
237        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
238            asyncQueueJobQueue, new ThreadFactory() {
239                @Override
240                public Thread newThread(Runnable runnable) {
241                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
242                    thread.setDaemon(true);
243                    return thread;
244                }
245            });
246        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
247            asyncTopicJobQueue, new ThreadFactory() {
248                @Override
249                public Thread newThread(Runnable runnable) {
250                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
251                    thread.setDaemon(true);
252                    return thread;
253                }
254            });
255    }
256
257    @Override
258    public void doStop(ServiceStopper stopper) throws Exception {
259        // drain down async jobs
260        LOG.info("Stopping async queue tasks");
261        if (this.globalQueueSemaphore != null) {
262            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
263        }
264        synchronized (this.asyncQueueMaps) {
265            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
266                synchronized (m) {
267                    for (StoreTask task : m.values()) {
268                        task.cancel();
269                    }
270                }
271            }
272            this.asyncQueueMaps.clear();
273        }
274        LOG.info("Stopping async topic tasks");
275        if (this.globalTopicSemaphore != null) {
276            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
277        }
278        synchronized (this.asyncTopicMaps) {
279            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
280                synchronized (m) {
281                    for (StoreTask task : m.values()) {
282                        task.cancel();
283                    }
284                }
285            }
286            this.asyncTopicMaps.clear();
287        }
288        if (this.globalQueueSemaphore != null) {
289            this.globalQueueSemaphore.drainPermits();
290        }
291        if (this.globalTopicSemaphore != null) {
292            this.globalTopicSemaphore.drainPermits();
293        }
294        if (this.queueExecutor != null) {
295            ThreadPoolUtils.shutdownNow(queueExecutor);
296            queueExecutor = null;
297        }
298        if (this.topicExecutor != null) {
299            ThreadPoolUtils.shutdownNow(topicExecutor);
300            topicExecutor = null;
301        }
302        LOG.info("Stopped KahaDB");
303        super.doStop(stopper);
304    }
305
306    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
307        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
308            @Override
309            public Location execute(Transaction tx) throws IOException {
310                StoredDestination sd = getStoredDestination(destination, tx);
311                Long sequence = sd.messageIdIndex.get(tx, key);
312                if (sequence == null) {
313                    return null;
314                }
315                return sd.orderIndex.get(tx, sequence).location;
316            }
317        });
318    }
319
320    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
321        StoreQueueTask task = null;
322        synchronized (store.asyncTaskMap) {
323            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
324        }
325        return task;
326    }
327
328    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
329        synchronized (store.asyncTaskMap) {
330            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
331        }
332        this.queueExecutor.execute(task);
333    }
334
335    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
336        StoreTopicTask task = null;
337        synchronized (store.asyncTaskMap) {
338            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
339        }
340        return task;
341    }
342
343    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
344        synchronized (store.asyncTaskMap) {
345            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
346        }
347        this.topicExecutor.execute(task);
348    }
349
350    @Override
351    public TransactionStore createTransactionStore() throws IOException {
352        return this.transactionStore;
353    }
354
355    public boolean getForceRecoverIndex() {
356        return this.forceRecoverIndex;
357    }
358
359    public void setForceRecoverIndex(boolean forceRecoverIndex) {
360        this.forceRecoverIndex = forceRecoverIndex;
361    }
362
363    public class KahaDBMessageStore extends AbstractMessageStore {
364        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
365        protected KahaDestination dest;
366        private final int maxAsyncJobs;
367        private final Semaphore localDestinationSemaphore;
368
369        double doneTasks, canceledTasks = 0;
370
371        public KahaDBMessageStore(ActiveMQDestination destination) {
372            super(destination);
373            this.dest = convert(destination);
374            this.maxAsyncJobs = getMaxAsyncJobs();
375            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
376        }
377
378        @Override
379        public ActiveMQDestination getDestination() {
380            return destination;
381        }
382
383        @Override
384        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
385                throws IOException {
386            if (isConcurrentStoreAndDispatchQueues()) {
387                message.beforeMarshall(wireFormat);
388                StoreQueueTask result = new StoreQueueTask(this, context, message);
389                ListenableFuture<Object> future = result.getFuture();
390                message.getMessageId().setFutureOrSequenceLong(future);
391                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
392                result.aquireLocks();
393                addQueueTask(this, result);
394                if (indexListener != null) {
395                    indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
396                }
397                return future;
398            } else {
399                return super.asyncAddQueueMessage(context, message);
400            }
401        }
402
403        @Override
404        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
405            if (isConcurrentStoreAndDispatchQueues()) {
406                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
407                StoreQueueTask task = null;
408                synchronized (asyncTaskMap) {
409                    task = (StoreQueueTask) asyncTaskMap.get(key);
410                }
411                if (task != null) {
412                    if (ack.isInTransaction() || !task.cancel()) {
413                        try {
414                            task.future.get();
415                        } catch (InterruptedException e) {
416                            throw new InterruptedIOException(e.toString());
417                        } catch (Exception ignored) {
418                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
419                        }
420                        removeMessage(context, ack);
421                    } else {
422                        synchronized (asyncTaskMap) {
423                            asyncTaskMap.remove(key);
424                        }
425                    }
426                } else {
427                    removeMessage(context, ack);
428                }
429            } else {
430                removeMessage(context, ack);
431            }
432        }
433
434        @Override
435        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
436            final KahaAddMessageCommand command = new KahaAddMessageCommand();
437            command.setDestination(dest);
438            command.setMessageId(message.getMessageId().toProducerKey());
439            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
440            command.setPriority(message.getPriority());
441            command.setPrioritySupported(isPrioritizedMessages());
442            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
443            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
444            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
445                // sync add? (for async, future present from getFutureOrSequenceLong)
446                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
447
448                @Override
449                public void sequenceAssignedWithIndexLocked(final long sequence) {
450                    message.getMessageId().setFutureOrSequenceLong(sequence);
451                    if (indexListener != null) {
452                        if (possibleFuture == null) {
453                            trackPendingAdd(dest, sequence);
454                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
455                                @Override
456                                public void run() {
457                                    trackPendingAddComplete(dest, sequence);
458                                }
459                            }));
460                        }
461                    }
462                }
463            }, null);
464        }
465
466        @Override
467        public void updateMessage(Message message) throws IOException {
468            if (LOG.isTraceEnabled()) {
469                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
470            }
471            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
472            KahaAddMessageCommand command = new KahaAddMessageCommand();
473            command.setDestination(dest);
474            command.setMessageId(message.getMessageId().toProducerKey());
475            command.setPriority(message.getPriority());
476            command.setPrioritySupported(prioritizedMessages);
477            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
478            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
479            updateMessageCommand.setMessage(command);
480            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
481        }
482
483        @Override
484        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
485            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
486            command.setDestination(dest);
487            command.setMessageId(ack.getLastMessageId().toProducerKey());
488            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
489
490            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
491            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
492            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
493        }
494
495        @Override
496        public void removeAllMessages(ConnectionContext context) throws IOException {
497            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
498            command.setDestination(dest);
499            store(command, true, null, null);
500        }
501
502        @Override
503        public Message getMessage(MessageId identity) throws IOException {
504            final String key = identity.toProducerKey();
505
506            // Hopefully one day the page file supports concurrent read
507            // operations... but for now we must
508            // externally synchronize...
509            Location location;
510            indexLock.writeLock().lock();
511            try {
512                location = findMessageLocation(key, dest);
513            } finally {
514                indexLock.writeLock().unlock();
515            }
516            if (location == null) {
517                return null;
518            }
519
520            return loadMessage(location);
521        }
522
523        @Override
524        public boolean isEmpty() throws IOException {
525            indexLock.writeLock().lock();
526            try {
527                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
528                    @Override
529                    public Boolean execute(Transaction tx) throws IOException {
530                        // Iterate through all index entries to get a count of
531                        // messages in the destination.
532                        StoredDestination sd = getStoredDestination(dest, tx);
533                        return sd.locationIndex.isEmpty(tx);
534                    }
535                });
536            } finally {
537                indexLock.writeLock().unlock();
538            }
539        }
540
541        @Override
542        public void recover(final MessageRecoveryListener listener) throws Exception {
543            // recovery may involve expiry which will modify
544            indexLock.writeLock().lock();
545            try {
546                pageFile.tx().execute(new Transaction.Closure<Exception>() {
547                    @Override
548                    public void execute(Transaction tx) throws Exception {
549                        StoredDestination sd = getStoredDestination(dest, tx);
550                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
551                        sd.orderIndex.resetCursorPosition();
552                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
553                                .hasNext(); ) {
554                            Entry<Long, MessageKeys> entry = iterator.next();
555                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
556                                continue;
557                            }
558                            Message msg = loadMessage(entry.getValue().location);
559                            listener.recoverMessage(msg);
560                        }
561                    }
562                });
563            } finally {
564                indexLock.writeLock().unlock();
565            }
566        }
567
568        @Override
569        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
570            indexLock.writeLock().lock();
571            try {
572                pageFile.tx().execute(new Transaction.Closure<Exception>() {
573                    @Override
574                    public void execute(Transaction tx) throws Exception {
575                        StoredDestination sd = getStoredDestination(dest, tx);
576                        Entry<Long, MessageKeys> entry = null;
577                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
578                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
579                            entry = iterator.next();
580                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
581                                continue;
582                            }
583                            Message msg = loadMessage(entry.getValue().location);
584                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
585                            listener.recoverMessage(msg);
586                            counter++;
587                            if (counter >= maxReturned) {
588                                break;
589                            }
590                        }
591                        sd.orderIndex.stoppedIterating();
592                    }
593                });
594            } finally {
595                indexLock.writeLock().unlock();
596            }
597        }
598
599        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
600            int counter = 0;
601            String id;
602            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
603                id = iterator.next();
604                iterator.remove();
605                Long sequence = sd.messageIdIndex.get(tx, id);
606                if (sequence != null) {
607                    if (sd.orderIndex.alreadyDispatched(sequence)) {
608                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
609                        counter++;
610                        if (counter >= maxReturned) {
611                            break;
612                        }
613                    } else {
614                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
615                    }
616                } else {
617                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
618                }
619            }
620            return counter;
621        }
622
623
624        @Override
625        public void resetBatching() {
626            if (pageFile.isLoaded()) {
627                indexLock.writeLock().lock();
628                try {
629                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
630                        @Override
631                        public void execute(Transaction tx) throws Exception {
632                            StoredDestination sd = getExistingStoredDestination(dest, tx);
633                            if (sd != null) {
634                                sd.orderIndex.resetCursorPosition();}
635                            }
636                        });
637                } catch (Exception e) {
638                    LOG.error("Failed to reset batching",e);
639                } finally {
640                    indexLock.writeLock().unlock();
641                }
642            }
643        }
644
645        @Override
646        public void setBatch(final MessageId identity) throws IOException {
647            indexLock.writeLock().lock();
648            try {
649                pageFile.tx().execute(new Transaction.Closure<IOException>() {
650                    @Override
651                    public void execute(Transaction tx) throws IOException {
652                        StoredDestination sd = getStoredDestination(dest, tx);
653                        Long location = (Long) identity.getFutureOrSequenceLong();
654                        Long pending = sd.orderIndex.minPendingAdd();
655                        if (pending != null) {
656                            location = Math.min(location, pending-1);
657                        }
658                        sd.orderIndex.setBatch(tx, location);
659                    }
660                });
661            } finally {
662                indexLock.writeLock().unlock();
663            }
664        }
665
666        @Override
667        public void setMemoryUsage(MemoryUsage memoryUsage) {
668        }
669        @Override
670        public void start() throws Exception {
671            super.start();
672        }
673        @Override
674        public void stop() throws Exception {
675            super.stop();
676        }
677
678        protected void lockAsyncJobQueue() {
679            try {
680                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
681                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
682                }
683            } catch (Exception e) {
684                LOG.error("Failed to lock async jobs for " + this.destination, e);
685            }
686        }
687
688        protected void unlockAsyncJobQueue() {
689            this.localDestinationSemaphore.release(this.maxAsyncJobs);
690        }
691
692        protected void acquireLocalAsyncLock() {
693            try {
694                this.localDestinationSemaphore.acquire();
695            } catch (InterruptedException e) {
696                LOG.error("Failed to aquire async lock for " + this.destination, e);
697            }
698        }
699
700        protected void releaseLocalAsyncLock() {
701            this.localDestinationSemaphore.release();
702        }
703
704        @Override
705        public String toString(){
706            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
707        }
708
709        @Override
710        protected void recoverMessageStoreStatistics() throws IOException {
711            try {
712                MessageStoreStatistics recoveredStatistics;
713                lockAsyncJobQueue();
714                indexLock.writeLock().lock();
715                try {
716                    recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
717                        @Override
718                        public MessageStoreStatistics execute(Transaction tx) throws IOException {
719                            MessageStoreStatistics statistics = new MessageStoreStatistics();
720
721                            // Iterate through all index entries to get the size of each message
722                            StoredDestination sd = getStoredDestination(dest, tx);
723                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
724                                int locationSize = iterator.next().getKey().getSize();
725                                statistics.getMessageCount().increment();
726                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
727                            }
728                           return statistics;
729                        }
730                    });
731                    getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
732                    getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
733                } finally {
734                    indexLock.writeLock().unlock();
735                }
736            } finally {
737                unlockAsyncJobQueue();
738            }
739        }
740    }
741
742    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
743        private final AtomicInteger subscriptionCount = new AtomicInteger();
744        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
745            super(destination);
746            this.subscriptionCount.set(getAllSubscriptions().length);
747            if (isConcurrentStoreAndDispatchTopics()) {
748                asyncTopicMaps.add(asyncTaskMap);
749            }
750        }
751
752        @Override
753        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
754                throws IOException {
755            if (isConcurrentStoreAndDispatchTopics()) {
756                message.beforeMarshall(wireFormat);
757                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
758                result.aquireLocks();
759                addTopicTask(this, result);
760                return result.getFuture();
761            } else {
762                return super.asyncAddTopicMessage(context, message);
763            }
764        }
765
766        @Override
767        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
768                                MessageId messageId, MessageAck ack) throws IOException {
769            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
770            if (isConcurrentStoreAndDispatchTopics()) {
771                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
772                StoreTopicTask task = null;
773                synchronized (asyncTaskMap) {
774                    task = (StoreTopicTask) asyncTaskMap.get(key);
775                }
776                if (task != null) {
777                    if (task.addSubscriptionKey(subscriptionKey)) {
778                        removeTopicTask(this, messageId);
779                        if (task.cancel()) {
780                            synchronized (asyncTaskMap) {
781                                asyncTaskMap.remove(key);
782                            }
783                        }
784                    }
785                } else {
786                    doAcknowledge(context, subscriptionKey, messageId, ack);
787                }
788            } else {
789                doAcknowledge(context, subscriptionKey, messageId, ack);
790            }
791        }
792
793        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
794                throws IOException {
795            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
796            command.setDestination(dest);
797            command.setSubscriptionKey(subscriptionKey);
798            command.setMessageId(messageId.toProducerKey());
799            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
800            if (ack != null && ack.isUnmatchedAck()) {
801                command.setAck(UNMATCHED);
802            } else {
803                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
804                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
805            }
806            store(command, false, null, null);
807        }
808
809        @Override
810        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
811            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
812                    .getSubscriptionName());
813            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
814            command.setDestination(dest);
815            command.setSubscriptionKey(subscriptionKey.toString());
816            command.setRetroactive(retroactive);
817            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
818            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
819            store(command, isEnableJournalDiskSyncs() && true, null, null);
820            this.subscriptionCount.incrementAndGet();
821        }
822
823        @Override
824        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
825            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
826            command.setDestination(dest);
827            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
828            store(command, isEnableJournalDiskSyncs() && true, null, null);
829            this.subscriptionCount.decrementAndGet();
830        }
831
832        @Override
833        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
834
835            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
836            indexLock.writeLock().lock();
837            try {
838                pageFile.tx().execute(new Transaction.Closure<IOException>() {
839                    @Override
840                    public void execute(Transaction tx) throws IOException {
841                        StoredDestination sd = getStoredDestination(dest, tx);
842                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
843                                .hasNext();) {
844                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
845                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
846                                    .getValue().getSubscriptionInfo().newInput()));
847                            subscriptions.add(info);
848
849                        }
850                    }
851                });
852            } finally {
853                indexLock.writeLock().unlock();
854            }
855
856            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
857            subscriptions.toArray(rc);
858            return rc;
859        }
860
861        @Override
862        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
863            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
864            indexLock.writeLock().lock();
865            try {
866                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
867                    @Override
868                    public SubscriptionInfo execute(Transaction tx) throws IOException {
869                        StoredDestination sd = getStoredDestination(dest, tx);
870                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
871                        if (command == null) {
872                            return null;
873                        }
874                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
875                                .getSubscriptionInfo().newInput()));
876                    }
877                });
878            } finally {
879                indexLock.writeLock().unlock();
880            }
881        }
882
883        @Override
884        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
885            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
886            indexLock.writeLock().lock();
887            try {
888                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
889                    @Override
890                    public Integer execute(Transaction tx) throws IOException {
891                        StoredDestination sd = getStoredDestination(dest, tx);
892                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
893                        if (cursorPos == null) {
894                            // The subscription might not exist.
895                            return 0;
896                        }
897
898                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
899                    }
900                });
901            } finally {
902                indexLock.writeLock().unlock();
903            }
904        }
905
906
907        @Override
908        public long getMessageSize(String clientId, String subscriptionName) throws IOException {
909            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
910            indexLock.writeLock().lock();
911            try {
912                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
913                    @Override
914                    public Integer execute(Transaction tx) throws IOException {
915                        StoredDestination sd = getStoredDestination(dest, tx);
916                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
917                        if (cursorPos == null) {
918                            // The subscription might not exist.
919                            return 0;
920                        }
921
922                        return (int) getStoredMessageSize(tx, sd, subscriptionKey);
923                    }
924                });
925            } finally {
926                indexLock.writeLock().unlock();
927            }
928        }
929
930        @Override
931        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
932                throws Exception {
933            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
934            @SuppressWarnings("unused")
935            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
936            indexLock.writeLock().lock();
937            try {
938                pageFile.tx().execute(new Transaction.Closure<Exception>() {
939                    @Override
940                    public void execute(Transaction tx) throws Exception {
941                        StoredDestination sd = getStoredDestination(dest, tx);
942                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
943                        sd.orderIndex.setBatch(tx, cursorPos);
944                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
945                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
946                                .hasNext();) {
947                            Entry<Long, MessageKeys> entry = iterator.next();
948                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
949                                continue;
950                            }
951                            listener.recoverMessage(loadMessage(entry.getValue().location));
952                        }
953                        sd.orderIndex.resetCursorPosition();
954                    }
955                });
956            } finally {
957                indexLock.writeLock().unlock();
958            }
959        }
960
961        @Override
962        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
963                final MessageRecoveryListener listener) throws Exception {
964            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
965            @SuppressWarnings("unused")
966            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
967            indexLock.writeLock().lock();
968            try {
969                pageFile.tx().execute(new Transaction.Closure<Exception>() {
970                    @Override
971                    public void execute(Transaction tx) throws Exception {
972                        StoredDestination sd = getStoredDestination(dest, tx);
973                        sd.orderIndex.resetCursorPosition();
974                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
975                        if (moc == null) {
976                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
977                            if (pos == null) {
978                                // sub deleted
979                                return;
980                            }
981                            sd.orderIndex.setBatch(tx, pos);
982                            moc = sd.orderIndex.cursor;
983                        } else {
984                            sd.orderIndex.cursor.sync(moc);
985                        }
986
987                        Entry<Long, MessageKeys> entry = null;
988                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
989                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
990                                .hasNext();) {
991                            entry = iterator.next();
992                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
993                                continue;
994                            }
995                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
996                                counter++;
997                            }
998                            if (counter >= maxReturned || listener.hasSpace() == false) {
999                                break;
1000                            }
1001                        }
1002                        sd.orderIndex.stoppedIterating();
1003                        if (entry != null) {
1004                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1005                            sd.subscriptionCursors.put(subscriptionKey, copy);
1006                        }
1007                    }
1008                });
1009            } finally {
1010                indexLock.writeLock().unlock();
1011            }
1012        }
1013
1014        @Override
1015        public void resetBatching(String clientId, String subscriptionName) {
1016            try {
1017                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1018                indexLock.writeLock().lock();
1019                try {
1020                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1021                        @Override
1022                        public void execute(Transaction tx) throws IOException {
1023                            StoredDestination sd = getStoredDestination(dest, tx);
1024                            sd.subscriptionCursors.remove(subscriptionKey);
1025                        }
1026                    });
1027                }finally {
1028                    indexLock.writeLock().unlock();
1029                }
1030            } catch (IOException e) {
1031                throw new RuntimeException(e);
1032            }
1033        }
1034    }
1035
1036    String subscriptionKey(String clientId, String subscriptionName) {
1037        return clientId + ":" + subscriptionName;
1038    }
1039
1040    @Override
1041    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1042        String key = key(convert(destination));
1043        MessageStore store = storeCache.get(key(convert(destination)));
1044        if (store == null) {
1045            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1046            store = storeCache.putIfAbsent(key, queueStore);
1047            if (store == null) {
1048                store = queueStore;
1049            }
1050        }
1051
1052        return store;
1053    }
1054
1055    @Override
1056    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1057        String key = key(convert(destination));
1058        MessageStore store = storeCache.get(key(convert(destination)));
1059        if (store == null) {
1060            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1061            store = storeCache.putIfAbsent(key, topicStore);
1062            if (store == null) {
1063                store = topicStore;
1064            }
1065        }
1066
1067        return (TopicMessageStore) store;
1068    }
1069
1070    /**
1071     * Cleanup method to remove any state associated with the given destination.
1072     * This method does not stop the message store (it might not be cached).
1073     *
1074     * @param destination
1075     *            Destination to forget
1076     */
1077    @Override
1078    public void removeQueueMessageStore(ActiveMQQueue destination) {
1079    }
1080
1081    /**
1082     * Cleanup method to remove any state associated with the given destination
1083     * This method does not stop the message store (it might not be cached).
1084     *
1085     * @param destination
1086     *            Destination to forget
1087     */
1088    @Override
1089    public void removeTopicMessageStore(ActiveMQTopic destination) {
1090    }
1091
1092    @Override
1093    public void deleteAllMessages() throws IOException {
1094        deleteAllMessages = true;
1095    }
1096
1097    @Override
1098    public Set<ActiveMQDestination> getDestinations() {
1099        try {
1100            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1101            indexLock.writeLock().lock();
1102            try {
1103                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1104                    @Override
1105                    public void execute(Transaction tx) throws IOException {
1106                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1107                                .hasNext();) {
1108                            Entry<String, StoredDestination> entry = iterator.next();
1109                            //Removing isEmpty topic check - see AMQ-5875
1110                            rc.add(convert(entry.getKey()));
1111                        }
1112                    }
1113                });
1114            }finally {
1115                indexLock.writeLock().unlock();
1116            }
1117            return rc;
1118        } catch (IOException e) {
1119            throw new RuntimeException(e);
1120        }
1121    }
1122
1123    @Override
1124    public long getLastMessageBrokerSequenceId() throws IOException {
1125        return 0;
1126    }
1127
1128    @Override
1129    public long getLastProducerSequenceId(ProducerId id) {
1130        indexLock.writeLock().lock();
1131        try {
1132            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1133        } finally {
1134            indexLock.writeLock().unlock();
1135        }
1136    }
1137
1138    @Override
1139    public long size() {
1140        try {
1141            return journalSize.get() + getPageFile().getDiskSize();
1142        } catch (IOException e) {
1143            throw new RuntimeException(e);
1144        }
1145    }
1146
1147    @Override
1148    public void beginTransaction(ConnectionContext context) throws IOException {
1149        throw new IOException("Not yet implemented.");
1150    }
1151    @Override
1152    public void commitTransaction(ConnectionContext context) throws IOException {
1153        throw new IOException("Not yet implemented.");
1154    }
1155    @Override
1156    public void rollbackTransaction(ConnectionContext context) throws IOException {
1157        throw new IOException("Not yet implemented.");
1158    }
1159
1160    @Override
1161    public void checkpoint(boolean sync) throws IOException {
1162        super.checkpointCleanup(sync);
1163    }
1164
1165    // /////////////////////////////////////////////////////////////////
1166    // Internal helper methods.
1167    // /////////////////////////////////////////////////////////////////
1168
1169    /**
1170     * @param location
1171     * @return
1172     * @throws IOException
1173     */
1174    Message loadMessage(Location location) throws IOException {
1175        JournalCommand<?> command = load(location);
1176        KahaAddMessageCommand addMessage = null;
1177        switch (command.type()) {
1178            case KAHA_UPDATE_MESSAGE_COMMAND:
1179                addMessage = ((KahaUpdateMessageCommand)command).getMessage();
1180                break;
1181            default:
1182                addMessage = (KahaAddMessageCommand) command;
1183        }
1184        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1185        return msg;
1186    }
1187
1188    // /////////////////////////////////////////////////////////////////
1189    // Internal conversion methods.
1190    // /////////////////////////////////////////////////////////////////
1191
1192    KahaLocation convert(Location location) {
1193        KahaLocation rc = new KahaLocation();
1194        rc.setLogId(location.getDataFileId());
1195        rc.setOffset(location.getOffset());
1196        return rc;
1197    }
1198
1199    KahaDestination convert(ActiveMQDestination dest) {
1200        KahaDestination rc = new KahaDestination();
1201        rc.setName(dest.getPhysicalName());
1202        switch (dest.getDestinationType()) {
1203        case ActiveMQDestination.QUEUE_TYPE:
1204            rc.setType(DestinationType.QUEUE);
1205            return rc;
1206        case ActiveMQDestination.TOPIC_TYPE:
1207            rc.setType(DestinationType.TOPIC);
1208            return rc;
1209        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1210            rc.setType(DestinationType.TEMP_QUEUE);
1211            return rc;
1212        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1213            rc.setType(DestinationType.TEMP_TOPIC);
1214            return rc;
1215        default:
1216            return null;
1217        }
1218    }
1219
1220    ActiveMQDestination convert(String dest) {
1221        int p = dest.indexOf(":");
1222        if (p < 0) {
1223            throw new IllegalArgumentException("Not in the valid destination format");
1224        }
1225        int type = Integer.parseInt(dest.substring(0, p));
1226        String name = dest.substring(p + 1);
1227        return convert(type, name);
1228    }
1229
1230    private ActiveMQDestination convert(KahaDestination commandDestination) {
1231        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1232    }
1233
1234    private ActiveMQDestination convert(int type, String name) {
1235        switch (KahaDestination.DestinationType.valueOf(type)) {
1236        case QUEUE:
1237            return new ActiveMQQueue(name);
1238        case TOPIC:
1239            return new ActiveMQTopic(name);
1240        case TEMP_QUEUE:
1241            return new ActiveMQTempQueue(name);
1242        case TEMP_TOPIC:
1243            return new ActiveMQTempTopic(name);
1244        default:
1245            throw new IllegalArgumentException("Not in the valid destination format");
1246        }
1247    }
1248
1249    public TransactionIdTransformer getTransactionIdTransformer() {
1250        return transactionIdTransformer;
1251    }
1252
1253    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1254        this.transactionIdTransformer = transactionIdTransformer;
1255    }
1256
1257    static class AsyncJobKey {
1258        MessageId id;
1259        ActiveMQDestination destination;
1260
1261        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1262            this.id = id;
1263            this.destination = destination;
1264        }
1265
1266        @Override
1267        public boolean equals(Object obj) {
1268            if (obj == this) {
1269                return true;
1270            }
1271            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1272                    && destination.equals(((AsyncJobKey) obj).destination);
1273        }
1274
1275        @Override
1276        public int hashCode() {
1277            return id.hashCode() + destination.hashCode();
1278        }
1279
1280        @Override
1281        public String toString() {
1282            return destination.getPhysicalName() + "-" + id;
1283        }
1284    }
1285
1286    public interface StoreTask {
1287        public boolean cancel();
1288
1289        public void aquireLocks();
1290
1291        public void releaseLocks();
1292    }
1293
1294    class StoreQueueTask implements Runnable, StoreTask {
1295        protected final Message message;
1296        protected final ConnectionContext context;
1297        protected final KahaDBMessageStore store;
1298        protected final InnerFutureTask future;
1299        protected final AtomicBoolean done = new AtomicBoolean();
1300        protected final AtomicBoolean locked = new AtomicBoolean();
1301
1302        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1303            this.store = store;
1304            this.context = context;
1305            this.message = message;
1306            this.future = new InnerFutureTask(this);
1307        }
1308
1309        public ListenableFuture<Object> getFuture() {
1310            return this.future;
1311        }
1312
1313        @Override
1314        public boolean cancel() {
1315            if (this.done.compareAndSet(false, true)) {
1316                return this.future.cancel(false);
1317            }
1318            return false;
1319        }
1320
1321        @Override
1322        public void aquireLocks() {
1323            if (this.locked.compareAndSet(false, true)) {
1324                try {
1325                    globalQueueSemaphore.acquire();
1326                    store.acquireLocalAsyncLock();
1327                    message.incrementReferenceCount();
1328                } catch (InterruptedException e) {
1329                    LOG.warn("Failed to aquire lock", e);
1330                }
1331            }
1332
1333        }
1334
1335        @Override
1336        public void releaseLocks() {
1337            if (this.locked.compareAndSet(true, false)) {
1338                store.releaseLocalAsyncLock();
1339                globalQueueSemaphore.release();
1340                message.decrementReferenceCount();
1341            }
1342        }
1343
1344        @Override
1345        public void run() {
1346            this.store.doneTasks++;
1347            try {
1348                if (this.done.compareAndSet(false, true)) {
1349                    this.store.addMessage(context, message);
1350                    removeQueueTask(this.store, this.message.getMessageId());
1351                    this.future.complete();
1352                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1353                    System.err.println(this.store.dest.getName() + " cancelled: "
1354                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1355                    this.store.canceledTasks = this.store.doneTasks = 0;
1356                }
1357            } catch (Exception e) {
1358                this.future.setException(e);
1359            }
1360        }
1361
1362        protected Message getMessage() {
1363            return this.message;
1364        }
1365
1366        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1367
1368            private Runnable listener;
1369            public InnerFutureTask(Runnable runnable) {
1370                super(runnable, null);
1371
1372            }
1373
1374            public void setException(final Exception e) {
1375                super.setException(e);
1376            }
1377
1378            public void complete() {
1379                super.set(null);
1380            }
1381
1382            @Override
1383            public void done() {
1384                fireListener();
1385            }
1386
1387            @Override
1388            public void addListener(Runnable listener) {
1389                this.listener = listener;
1390                if (isDone()) {
1391                    fireListener();
1392                }
1393            }
1394
1395            private void fireListener() {
1396                if (listener != null) {
1397                    try {
1398                        listener.run();
1399                    } catch (Exception ignored) {
1400                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1401                    }
1402                }
1403            }
1404        }
1405    }
1406
1407    class StoreTopicTask extends StoreQueueTask {
1408        private final int subscriptionCount;
1409        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1410        private final KahaDBTopicMessageStore topicStore;
1411        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1412                int subscriptionCount) {
1413            super(store, context, message);
1414            this.topicStore = store;
1415            this.subscriptionCount = subscriptionCount;
1416
1417        }
1418
1419        @Override
1420        public void aquireLocks() {
1421            if (this.locked.compareAndSet(false, true)) {
1422                try {
1423                    globalTopicSemaphore.acquire();
1424                    store.acquireLocalAsyncLock();
1425                    message.incrementReferenceCount();
1426                } catch (InterruptedException e) {
1427                    LOG.warn("Failed to aquire lock", e);
1428                }
1429            }
1430        }
1431
1432        @Override
1433        public void releaseLocks() {
1434            if (this.locked.compareAndSet(true, false)) {
1435                message.decrementReferenceCount();
1436                store.releaseLocalAsyncLock();
1437                globalTopicSemaphore.release();
1438            }
1439        }
1440
1441        /**
1442         * add a key
1443         *
1444         * @param key
1445         * @return true if all acknowledgements received
1446         */
1447        public boolean addSubscriptionKey(String key) {
1448            synchronized (this.subscriptionKeys) {
1449                this.subscriptionKeys.add(key);
1450            }
1451            return this.subscriptionKeys.size() >= this.subscriptionCount;
1452        }
1453
1454        @Override
1455        public void run() {
1456            this.store.doneTasks++;
1457            try {
1458                if (this.done.compareAndSet(false, true)) {
1459                    this.topicStore.addMessage(context, message);
1460                    // apply any acks we have
1461                    synchronized (this.subscriptionKeys) {
1462                        for (String key : this.subscriptionKeys) {
1463                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1464
1465                        }
1466                    }
1467                    removeTopicTask(this.topicStore, this.message.getMessageId());
1468                    this.future.complete();
1469                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1470                    System.err.println(this.store.dest.getName() + " cancelled: "
1471                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1472                    this.store.canceledTasks = this.store.doneTasks = 0;
1473                }
1474            } catch (Exception e) {
1475                this.future.setException(e);
1476            }
1477        }
1478    }
1479
1480    public class StoreTaskExecutor extends ThreadPoolExecutor {
1481
1482        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1483            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1484        }
1485
1486        @Override
1487        protected void afterExecute(Runnable runnable, Throwable throwable) {
1488            super.afterExecute(runnable, throwable);
1489
1490            if (runnable instanceof StoreTask) {
1491               ((StoreTask)runnable).releaseLocks();
1492            }
1493        }
1494    }
1495
1496    @Override
1497    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1498        return new JobSchedulerStoreImpl();
1499    }
1500}