001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.Iterator; 022import java.util.LinkedHashMap; 023import java.util.Map; 024import java.util.Map.Entry; 025 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.Message; 029import org.apache.activemq.command.MessageAck; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.store.IndexListener; 032import org.apache.activemq.store.MessageRecoveryListener; 033import org.apache.activemq.store.AbstractMessageStore; 034import org.apache.activemq.store.MessageStoreStatistics; 035 036/** 037 * An implementation of {@link org.apache.activemq.store.MessageStore} which 038 * uses a 039 * 040 * 041 */ 042public class MemoryMessageStore extends AbstractMessageStore { 043 044 protected final Map<MessageId, Message> messageTable; 045 protected MessageId lastBatchId; 046 protected long sequenceId; 047 048 public MemoryMessageStore(ActiveMQDestination destination) { 049 this(destination, new LinkedHashMap<MessageId, Message>()); 050 } 051 052 public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) { 053 super(destination); 054 this.messageTable = Collections.synchronizedMap(messageTable); 055 } 056 057 @Override 058 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 059 synchronized (messageTable) { 060 messageTable.put(message.getMessageId(), message); 061 incMessageStoreStatistics(getMessageStoreStatistics(), message); 062 } 063 message.incrementReferenceCount(); 064 message.getMessageId().setFutureOrSequenceLong(sequenceId++); 065 if (indexListener != null) { 066 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 067 } 068 } 069 070 // public void addMessageReference(ConnectionContext context,MessageId 071 // messageId,long expirationTime,String messageRef) 072 // throws IOException{ 073 // synchronized(messageTable){ 074 // messageTable.put(messageId,messageRef); 075 // } 076 // } 077 078 @Override 079 public Message getMessage(MessageId identity) throws IOException { 080 return messageTable.get(identity); 081 } 082 083 // public String getMessageReference(MessageId identity) throws IOException{ 084 // return (String)messageTable.get(identity); 085 // } 086 087 @Override 088 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 089 removeMessage(ack.getLastMessageId()); 090 } 091 092 public void removeMessage(MessageId msgId) throws IOException { 093 synchronized (messageTable) { 094 Message removed = messageTable.remove(msgId); 095 if( removed !=null ) { 096 removed.decrementReferenceCount(); 097 decMessageStoreStatistics(getMessageStoreStatistics(), removed); 098 } 099 if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { 100 lastBatchId = null; 101 } 102 } 103 } 104 105 @Override 106 public void recover(MessageRecoveryListener listener) throws Exception { 107 // the message table is a synchronizedMap - so just have to synchronize 108 // here 109 synchronized (messageTable) { 110 for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) { 111 Message msg = iter.next(); 112 listener.recoverMessage(msg); 113 } 114 } 115 } 116 117 @Override 118 public void removeAllMessages(ConnectionContext context) throws IOException { 119 synchronized (messageTable) { 120 messageTable.clear(); 121 getMessageStoreStatistics().reset(); 122 } 123 } 124 125 public void delete() { 126 synchronized (messageTable) { 127 messageTable.clear(); 128 getMessageStoreStatistics().reset(); 129 } 130 } 131 132 @Override 133 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 134 synchronized (messageTable) { 135 boolean pastLackBatch = lastBatchId == null; 136 int count = 0; 137 for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) { 138 Map.Entry entry = (Entry)iter.next(); 139 if (pastLackBatch) { 140 count++; 141 Object msg = entry.getValue(); 142 lastBatchId = (MessageId)entry.getKey(); 143 if (msg.getClass() == MessageId.class) { 144 listener.recoverMessageReference((MessageId)msg); 145 } else { 146 listener.recoverMessage((Message)msg); 147 } 148 } else { 149 pastLackBatch = entry.getKey().equals(lastBatchId); 150 } 151 } 152 } 153 } 154 155 @Override 156 public void resetBatching() { 157 lastBatchId = null; 158 } 159 160 @Override 161 public void setBatch(MessageId messageId) { 162 lastBatchId = messageId; 163 } 164 165 @Override 166 public void updateMessage(Message message) { 167 synchronized (messageTable) { 168 Message original = messageTable.get(message.getMessageId()); 169 170 //if can't be found then increment count, else remove old size 171 if (original == null) { 172 getMessageStoreStatistics().getMessageCount().increment(); 173 } else { 174 getMessageStoreStatistics().getMessageSize().addSize(-original.getSize()); 175 } 176 messageTable.put(message.getMessageId(), message); 177 getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); 178 } 179 } 180 181 @Override 182 public void recoverMessageStoreStatistics() throws IOException { 183 synchronized (messageTable) { 184 long size = 0; 185 int count = 0; 186 for (Iterator<Message> iter = messageTable.values().iterator(); iter 187 .hasNext();) { 188 Message msg = iter.next(); 189 size += msg.getSize(); 190 } 191 192 getMessageStoreStatistics().reset(); 193 getMessageStoreStatistics().getMessageCount().setCount(count); 194 getMessageStoreStatistics().getMessageSize().setTotalSize(size); 195 } 196 } 197 198 protected static final void incMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) { 199 if (stats != null && message != null) { 200 stats.getMessageCount().increment(); 201 stats.getMessageSize().addSize(message.getSize()); 202 } 203 } 204 205 protected static final void decMessageStoreStatistics(final MessageStoreStatistics stats, final Message message) { 206 if (stats != null && message != null) { 207 stats.getMessageCount().decrement(); 208 stats.getMessageSize().addSize(-message.getSize()); 209 } 210 } 211 212}