001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.IOException;
020import java.util.Map;
021import java.util.concurrent.atomic.AtomicReference;
022import java.util.zip.Adler32;
023import java.util.zip.Checksum;
024
025import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
026import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
027import org.apache.activemq.util.ByteSequence;
028import org.apache.activemq.util.RecoverableRandomAccessFile;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * File Appender instance that performs batched writes in the thread where the write is
034 * queued.  This appender does not honor the maxFileLength value in the journal as the
035 * files created here are out-of-band logs used for other purposes such as journal level
036 * compaction.
037 */
038public class TargetedDataFileAppender implements FileAppender {
039
040    private static final Logger LOG = LoggerFactory.getLogger(TargetedDataFileAppender.class);
041
042    private final Journal journal;
043    private final DataFile target;
044    private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
045    private final int maxWriteBatchSize;
046
047    private boolean closed;
048    private boolean preallocate;
049    private WriteBatch nextWriteBatch;
050    private int statIdx = 0;
051    private int[] stats = new int[maxStat];
052
053    public class WriteBatch {
054
055        protected final int offset;
056
057        public final DataFile dataFile;
058        public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
059        public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
060        public AtomicReference<IOException> exception = new AtomicReference<IOException>();
061
062        public WriteBatch(DataFile dataFile, int offset) {
063            this.dataFile = dataFile;
064            this.offset = offset;
065            this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
066            this.size = Journal.BATCH_CONTROL_RECORD_SIZE;
067            journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
068        }
069
070        public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
071            this(dataFile, offset);
072            append(write);
073        }
074
075        public boolean canAppend(Journal.WriteCommand write) {
076            int newSize = size + write.location.getSize();
077            if (newSize >= maxWriteBatchSize) {
078                return false;
079            }
080            return true;
081        }
082
083        public void append(Journal.WriteCommand write) throws IOException {
084            this.writes.addLast(write);
085            write.location.setDataFileId(dataFile.getDataFileId());
086            write.location.setOffset(offset + size);
087            int s = write.location.getSize();
088            size += s;
089            dataFile.incrementLength(s);
090            journal.addToTotalLength(s);
091        }
092    }
093
094    /**
095     * Construct a Store writer
096     */
097    public TargetedDataFileAppender(Journal journal, DataFile target) {
098        this.journal = journal;
099        this.target = target;
100        this.inflightWrites = this.journal.getInflightWrites();
101        this.maxWriteBatchSize = this.journal.getWriteBatchSize();
102    }
103
104    @Override
105    public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
106        checkClosed();
107
108        // Write the packet our internal buffer.
109        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
110
111        final Location location = new Location();
112        location.setSize(size);
113        location.setType(type);
114
115        Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
116
117        enqueueWrite(write);
118
119        if (sync) {
120            writePendingBatch();
121        }
122
123        return location;
124    }
125
126    @Override
127    public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
128        checkClosed();
129
130        // Write the packet our internal buffer.
131        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
132
133        final Location location = new Location();
134        location.setSize(size);
135        location.setType(type);
136
137        Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
138
139        enqueueWrite(write);
140
141        return location;
142    }
143
144    @Override
145    public void close() throws IOException {
146        if (!closed) {
147            if (nextWriteBatch != null) {
148                // force sync of current in-progress batched write.
149                LOG.debug("Close of targeted appender flushing last batch.");
150                writePendingBatch();
151            }
152
153            closed = true;
154        }
155    }
156
157    //----- Appender Configuration -------------------------------------------//
158
159    public boolean isPreallocate() {
160        return preallocate;
161    }
162
163    public void setPreallocate(boolean preallocate) {
164        this.preallocate = preallocate;
165    }
166
167    //----- Internal Implementation ------------------------------------------//
168
169    private void checkClosed() throws IOException {
170        if (closed) {
171            throw new IOException("The appender is clsoed");
172        }
173    }
174
175    private WriteBatch enqueueWrite(Journal.WriteCommand write) throws IOException {
176        while (true) {
177            if (nextWriteBatch == null) {
178                nextWriteBatch = new WriteBatch(target, target.getLength(), write);
179                break;
180            } else {
181                // Append to current batch if possible..
182                if (nextWriteBatch.canAppend(write)) {
183                    nextWriteBatch.append(write);
184                    break;
185                } else {
186                    // Flush current batch and start a new one.
187                    writePendingBatch();
188                    nextWriteBatch = null;
189                }
190            }
191        }
192
193        if (!write.sync) {
194            inflightWrites.put(new Journal.WriteKey(write.location), write);
195        }
196
197        return nextWriteBatch;
198    }
199
200    private void writePendingBatch() throws IOException {
201        DataFile dataFile = nextWriteBatch.dataFile;
202
203        try (RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
204             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) {
205
206            // preallocate on first open of new file (length == 0) if configured to do so.
207            // NOTE: dataFile.length cannot be used because it is updated in enqueue
208            if (file.length() == 0L && isPreallocate()) {
209                journal.preallocateEntireJournalDataFile(file);
210            }
211
212            Journal.WriteCommand write = nextWriteBatch.writes.getHead();
213
214            // Write an empty batch control record.
215            buff.reset();
216            buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
217            buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
218            buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
219            buff.writeInt(0);
220            buff.writeLong(0);
221
222            while (write != null) {
223                buff.writeInt(write.location.getSize());
224                buff.writeByte(write.location.getType());
225                buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
226                write = write.getNext();
227            }
228
229            // append 'unset' next batch (5 bytes) so read can always find eof
230            buff.writeInt(0);
231            buff.writeByte(0);
232
233            ByteSequence sequence = buff.toByteSequence();
234
235            // Now we can fill in the batch control record properly.
236            buff.reset();
237            buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
238            buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
239            if (journal.isChecksum()) {
240                Checksum checksum = new Adler32();
241                checksum.update(sequence.getData(),
242                                sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE,
243                                sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
244                buff.writeLong(checksum.getValue());
245            }
246
247            // Now do the 1 big write.
248            file.seek(nextWriteBatch.offset);
249            if (maxStat > 0) {
250                if (statIdx < maxStat) {
251                    stats[statIdx++] = sequence.getLength();
252                } else {
253                    long all = 0;
254                    for (; statIdx > 0;) {
255                        all += stats[--statIdx];
256                    }
257                    LOG.trace("Ave writeSize: {}", all / maxStat);
258                }
259            }
260
261            file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
262
263            ReplicationTarget replicationTarget = journal.getReplicationTarget();
264            if (replicationTarget != null) {
265                replicationTarget.replicate(nextWriteBatch.writes.getHead().location, sequence, true);
266            }
267
268            file.sync();
269
270            signalDone(nextWriteBatch);
271        } catch (IOException e) {
272            LOG.info("Journal failed while writing at: {}", nextWriteBatch.offset);
273            throw e;
274        }
275    }
276
277    private void signalDone(WriteBatch writeBatch) {
278        // Now that the data is on disk, remove the writes from the in
279        // flight cache and signal any onComplete requests.
280        Journal.WriteCommand write = writeBatch.writes.getHead();
281        while (write != null) {
282            if (!write.sync) {
283                inflightWrites.remove(new Journal.WriteKey(write.location));
284            }
285
286            if (write.onComplete != null) {
287                try {
288                    write.onComplete.run();
289                } catch (Throwable e) {
290                    LOG.info("Add exception was raised while executing the run command for onComplete", e);
291                }
292            }
293
294            write = write.getNext();
295        }
296    }
297}