001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025 026import org.apache.activemq.broker.region.MessageReference; 027import org.apache.activemq.command.MessageId; 028import org.apache.activemq.management.SizeStatisticImpl; 029 030public class OrderedPendingList implements PendingList { 031 032 private PendingNode root = null; 033 private PendingNode tail = null; 034 private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>(); 035 private final SizeStatisticImpl messageSize; 036 private final PendingMessageHelper pendingMessageHelper; 037 038 public OrderedPendingList() { 039 messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages"); 040 messageSize.setEnabled(true); 041 pendingMessageHelper = new PendingMessageHelper(map, messageSize); 042 } 043 044 @Override 045 public PendingNode addMessageFirst(MessageReference message) { 046 PendingNode node = new PendingNode(this, message); 047 if (root == null) { 048 root = node; 049 tail = node; 050 } else { 051 root.linkBefore(node); 052 root = node; 053 } 054 pendingMessageHelper.addToMap(message, node); 055 return node; 056 } 057 058 @Override 059 public PendingNode addMessageLast(MessageReference message) { 060 PendingNode node = new PendingNode(this, message); 061 if (root == null) { 062 root = node; 063 } else { 064 tail.linkAfter(node); 065 } 066 tail = node; 067 pendingMessageHelper.addToMap(message, node); 068 return node; 069 } 070 071 @Override 072 public void clear() { 073 this.root = null; 074 this.tail = null; 075 this.map.clear(); 076 this.messageSize.reset(); 077 } 078 079 @Override 080 public boolean isEmpty() { 081 return this.map.isEmpty(); 082 } 083 084 @Override 085 public Iterator<MessageReference> iterator() { 086 return new Iterator<MessageReference>() { 087 private PendingNode current = null; 088 private PendingNode next = root; 089 090 @Override 091 public boolean hasNext() { 092 return next != null; 093 } 094 095 @Override 096 public MessageReference next() { 097 MessageReference result = null; 098 this.current = this.next; 099 result = this.current.getMessage(); 100 this.next = (PendingNode) this.next.getNext(); 101 return result; 102 } 103 104 @Override 105 public void remove() { 106 if (this.current != null && this.current.getMessage() != null) { 107 pendingMessageHelper.removeFromMap(this.current.getMessage()); 108 } 109 removeNode(this.current); 110 } 111 }; 112 } 113 114 @Override 115 public PendingNode remove(MessageReference message) { 116 PendingNode node = null; 117 if (message != null) { 118 node = pendingMessageHelper.removeFromMap(message); 119 removeNode(node); 120 } 121 return node; 122 } 123 124 @Override 125 public int size() { 126 return this.map.size(); 127 } 128 129 @Override 130 public long messageSize() { 131 return this.messageSize.getTotalSize(); 132 } 133 134 void removeNode(PendingNode node) { 135 if (node != null) { 136 pendingMessageHelper.removeFromMap(node.getMessage()); 137 if (root == node) { 138 root = (PendingNode) node.getNext(); 139 } 140 if (tail == node) { 141 tail = (PendingNode) node.getPrevious(); 142 } 143 node.unlink(); 144 } 145 } 146 147 List<PendingNode> getAsList() { 148 List<PendingNode> result = new ArrayList<PendingNode>(size()); 149 PendingNode node = root; 150 while (node != null) { 151 result.add(node); 152 node = (PendingNode) node.getNext(); 153 } 154 return result; 155 } 156 157 @Override 158 public String toString() { 159 return "OrderedPendingList(" + System.identityHashCode(this) + ")"; 160 } 161 162 @Override 163 public boolean contains(MessageReference message) { 164 if (message != null) { 165 return this.map.containsKey(message.getMessageId()); 166 } 167 return false; 168 } 169 170 @Override 171 public Collection<MessageReference> values() { 172 List<MessageReference> messageReferences = new ArrayList<MessageReference>(); 173 Iterator<MessageReference> iterator = iterator(); 174 while (iterator.hasNext()) { 175 messageReferences.add(iterator.next()); 176 } 177 return messageReferences; 178 } 179 180 @Override 181 public void addAll(PendingList pendingList) { 182 if (pendingList != null) { 183 for(MessageReference messageReference : pendingList) { 184 addMessageLast(messageReference); 185 } 186 } 187 } 188 189 @Override 190 public MessageReference get(MessageId messageId) { 191 PendingNode node = map.get(messageId); 192 if (node != null) { 193 return node.getMessage(); 194 } 195 return null; 196 } 197}