001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.concurrent.atomic.AtomicReference; 022import java.util.zip.Adler32; 023import java.util.zip.Checksum; 024 025import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; 026import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 027import org.apache.activemq.util.ByteSequence; 028import org.apache.activemq.util.RecoverableRandomAccessFile; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * File Appender instance that performs batched writes in the thread where the write is 034 * queued. This appender does not honor the maxFileLength value in the journal as the 035 * files created here are out-of-band logs used for other purposes such as journal level 036 * compaction. 037 */ 038public class TargetedDataFileAppender implements FileAppender { 039 040 private static final Logger LOG = LoggerFactory.getLogger(TargetedDataFileAppender.class); 041 042 private final Journal journal; 043 private final DataFile target; 044 private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites; 045 private final int maxWriteBatchSize; 046 047 private boolean closed; 048 private boolean preallocate; 049 private WriteBatch nextWriteBatch; 050 private int statIdx = 0; 051 private int[] stats = new int[maxStat]; 052 053 public class WriteBatch { 054 055 protected final int offset; 056 057 public final DataFile dataFile; 058 public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>(); 059 public int size = Journal.BATCH_CONTROL_RECORD_SIZE; 060 public AtomicReference<IOException> exception = new AtomicReference<IOException>(); 061 062 public WriteBatch(DataFile dataFile, int offset) { 063 this.dataFile = dataFile; 064 this.offset = offset; 065 this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE); 066 this.size = Journal.BATCH_CONTROL_RECORD_SIZE; 067 journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE); 068 } 069 070 public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { 071 this(dataFile, offset); 072 append(write); 073 } 074 075 public boolean canAppend(Journal.WriteCommand write) { 076 int newSize = size + write.location.getSize(); 077 if (newSize >= maxWriteBatchSize) { 078 return false; 079 } 080 return true; 081 } 082 083 public void append(Journal.WriteCommand write) throws IOException { 084 this.writes.addLast(write); 085 write.location.setDataFileId(dataFile.getDataFileId()); 086 write.location.setOffset(offset + size); 087 int s = write.location.getSize(); 088 size += s; 089 dataFile.incrementLength(s); 090 journal.addToTotalLength(s); 091 } 092 } 093 094 /** 095 * Construct a Store writer 096 */ 097 public TargetedDataFileAppender(Journal journal, DataFile target) { 098 this.journal = journal; 099 this.target = target; 100 this.inflightWrites = this.journal.getInflightWrites(); 101 this.maxWriteBatchSize = this.journal.getWriteBatchSize(); 102 } 103 104 @Override 105 public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { 106 checkClosed(); 107 108 // Write the packet our internal buffer. 109 int size = data.getLength() + Journal.RECORD_HEAD_SPACE; 110 111 final Location location = new Location(); 112 location.setSize(size); 113 location.setType(type); 114 115 Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync); 116 117 enqueueWrite(write); 118 119 if (sync) { 120 writePendingBatch(); 121 } 122 123 return location; 124 } 125 126 @Override 127 public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException { 128 checkClosed(); 129 130 // Write the packet our internal buffer. 131 int size = data.getLength() + Journal.RECORD_HEAD_SPACE; 132 133 final Location location = new Location(); 134 location.setSize(size); 135 location.setType(type); 136 137 Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete); 138 139 enqueueWrite(write); 140 141 return location; 142 } 143 144 @Override 145 public void close() throws IOException { 146 if (!closed) { 147 if (nextWriteBatch != null) { 148 // force sync of current in-progress batched write. 149 LOG.debug("Close of targeted appender flushing last batch."); 150 writePendingBatch(); 151 } 152 153 closed = true; 154 } 155 } 156 157 //----- Appender Configuration -------------------------------------------// 158 159 public boolean isPreallocate() { 160 return preallocate; 161 } 162 163 public void setPreallocate(boolean preallocate) { 164 this.preallocate = preallocate; 165 } 166 167 //----- Internal Implementation ------------------------------------------// 168 169 private void checkClosed() throws IOException { 170 if (closed) { 171 throw new IOException("The appender is clsoed"); 172 } 173 } 174 175 private WriteBatch enqueueWrite(Journal.WriteCommand write) throws IOException { 176 while (true) { 177 if (nextWriteBatch == null) { 178 nextWriteBatch = new WriteBatch(target, target.getLength(), write); 179 break; 180 } else { 181 // Append to current batch if possible.. 182 if (nextWriteBatch.canAppend(write)) { 183 nextWriteBatch.append(write); 184 break; 185 } else { 186 // Flush current batch and start a new one. 187 writePendingBatch(); 188 nextWriteBatch = null; 189 } 190 } 191 } 192 193 if (!write.sync) { 194 inflightWrites.put(new Journal.WriteKey(write.location), write); 195 } 196 197 return nextWriteBatch; 198 } 199 200 private void writePendingBatch() throws IOException { 201 DataFile dataFile = nextWriteBatch.dataFile; 202 203 try (RecoverableRandomAccessFile file = dataFile.openRandomAccessFile(); 204 DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) { 205 206 // preallocate on first open of new file (length == 0) if configured to do so. 207 // NOTE: dataFile.length cannot be used because it is updated in enqueue 208 if (file.length() == 0L && isPreallocate()) { 209 journal.preallocateEntireJournalDataFile(file); 210 } 211 212 Journal.WriteCommand write = nextWriteBatch.writes.getHead(); 213 214 // Write an empty batch control record. 215 buff.reset(); 216 buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE); 217 buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE); 218 buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC); 219 buff.writeInt(0); 220 buff.writeLong(0); 221 222 while (write != null) { 223 buff.writeInt(write.location.getSize()); 224 buff.writeByte(write.location.getType()); 225 buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength()); 226 write = write.getNext(); 227 } 228 229 // append 'unset' next batch (5 bytes) so read can always find eof 230 buff.writeInt(0); 231 buff.writeByte(0); 232 233 ByteSequence sequence = buff.toByteSequence(); 234 235 // Now we can fill in the batch control record properly. 236 buff.reset(); 237 buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length); 238 buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5); 239 if (journal.isChecksum()) { 240 Checksum checksum = new Adler32(); 241 checksum.update(sequence.getData(), 242 sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE, 243 sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5); 244 buff.writeLong(checksum.getValue()); 245 } 246 247 // Now do the 1 big write. 248 file.seek(nextWriteBatch.offset); 249 if (maxStat > 0) { 250 if (statIdx < maxStat) { 251 stats[statIdx++] = sequence.getLength(); 252 } else { 253 long all = 0; 254 for (; statIdx > 0;) { 255 all += stats[--statIdx]; 256 } 257 LOG.trace("Ave writeSize: {}", all / maxStat); 258 } 259 } 260 261 file.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 262 263 ReplicationTarget replicationTarget = journal.getReplicationTarget(); 264 if (replicationTarget != null) { 265 replicationTarget.replicate(nextWriteBatch.writes.getHead().location, sequence, true); 266 } 267 268 file.sync(); 269 270 signalDone(nextWriteBatch); 271 } catch (IOException e) { 272 LOG.info("Journal failed while writing at: {}", nextWriteBatch.offset); 273 throw e; 274 } 275 } 276 277 private void signalDone(WriteBatch writeBatch) { 278 // Now that the data is on disk, remove the writes from the in 279 // flight cache and signal any onComplete requests. 280 Journal.WriteCommand write = writeBatch.writes.getHead(); 281 while (write != null) { 282 if (!write.sync) { 283 inflightWrites.remove(new Journal.WriteKey(write.location)); 284 } 285 286 if (write.onComplete != null) { 287 try { 288 write.onComplete.run(); 289 } catch (Throwable e) { 290 LOG.info("Add exception was raised while executing the run command for onComplete", e); 291 } 292 } 293 294 write = write.getNext(); 295 } 296 } 297}