001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.*; 020import java.nio.ByteBuffer; 021import java.nio.channels.FileChannel; 022import java.util.*; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.atomic.AtomicLong; 025import java.util.concurrent.atomic.AtomicReference; 026import java.util.zip.Adler32; 027import java.util.zip.Checksum; 028import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 029import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 030import org.apache.activemq.util.*; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 034import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask; 035import org.apache.activemq.store.kahadb.disk.util.Sequence; 036 037/** 038 * Manages DataFiles 039 * 040 * 041 */ 042public class Journal { 043 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 044 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 045 046 private static final int MAX_BATCH_SIZE = 32*1024*1024; 047 048 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 049 public static final int RECORD_HEAD_SPACE = 4 + 1; 050 051 public static final byte USER_RECORD_TYPE = 1; 052 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 053 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 054 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 055 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8; 056 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 057 058 // tackle corruption when checksum is disabled or corrupt with zeros, minimise data loss 059 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 060 DataFile dataFile = getDataFile(recoveryPosition); 061 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 062 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 063 try { 064 int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); 065 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); 066 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 067 068 // skip corruption on getNextLocation 069 recoveryPosition.setOffset((int) sequence.getLast() + 1); 070 recoveryPosition.setSize(-1); 071 072 dataFile.corruptedBlocks.add(sequence); 073 074 } catch (IOException e) { 075 } finally { 076 accessorPool.closeDataFileAccessor(reader); 077 } 078 } 079 080 public enum PreallocationStrategy { 081 SPARSE_FILE, 082 OS_KERNEL_COPY, 083 ZEROS; 084 } 085 086 public enum PreallocationScope { 087 ENTIRE_JOURNAL; 088 } 089 090 private static byte[] createBatchControlRecordHeader() { 091 try { 092 DataByteArrayOutputStream os = new DataByteArrayOutputStream(); 093 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 094 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 095 os.write(BATCH_CONTROL_RECORD_MAGIC); 096 ByteSequence sequence = os.toByteSequence(); 097 sequence.compact(); 098 return sequence.getData(); 099 } catch (IOException e) { 100 throw new RuntimeException("Could not create batch control record header.", e); 101 } 102 } 103 104 public static final String DEFAULT_DIRECTORY = "."; 105 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 106 public static final String DEFAULT_FILE_PREFIX = "db-"; 107 public static final String DEFAULT_FILE_SUFFIX = ".log"; 108 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 109 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 110 public static final int PREFERED_DIFF = 1024 * 512; 111 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 112 113 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 114 115 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 116 117 protected File directory = new File(DEFAULT_DIRECTORY); 118 protected File directoryArchive; 119 private boolean directoryArchiveOverridden = false; 120 121 protected String filePrefix = DEFAULT_FILE_PREFIX; 122 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 123 protected boolean started; 124 125 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 126 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 127 128 protected FileAppender appender; 129 protected DataFileAccessorPool accessorPool; 130 131 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 132 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 133 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 134 135 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 136 protected Runnable cleanupTask; 137 protected AtomicLong totalLength = new AtomicLong(); 138 protected boolean archiveDataLogs; 139 private ReplicationTarget replicationTarget; 140 protected boolean checksum; 141 protected boolean checkForCorruptionOnStartup; 142 protected boolean enableAsyncDiskSync = true; 143 private Timer timer; 144 private int nextDataFileId = 1; 145 146 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 147 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 148 149 public interface DataFileRemovedListener { 150 void fileRemoved(DataFile datafile); 151 } 152 153 private DataFileRemovedListener dataFileRemovedListener; 154 155 public synchronized void start() throws IOException { 156 if (started) { 157 return; 158 } 159 160 long start = System.currentTimeMillis(); 161 accessorPool = new DataFileAccessorPool(this); 162 started = true; 163 164 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 165 166 File[] files = directory.listFiles(new FilenameFilter() { 167 @Override 168 public boolean accept(File dir, String n) { 169 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 170 } 171 }); 172 173 if (files != null) { 174 for (File file : files) { 175 try { 176 String n = file.getName(); 177 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 178 int num = Integer.parseInt(numStr); 179 DataFile dataFile = new DataFile(file, num); 180 fileMap.put(dataFile.getDataFileId(), dataFile); 181 totalLength.addAndGet(dataFile.getLength()); 182 } catch (NumberFormatException e) { 183 // Ignore file that do not match the pattern. 184 } 185 } 186 187 // Sort the list so that we can link the DataFiles together in the 188 // right order. 189 List<DataFile> l = new ArrayList<DataFile>(fileMap.values()); 190 Collections.sort(l); 191 for (DataFile df : l) { 192 if (df.getLength() == 0) { 193 // possibly the result of a previous failed write 194 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 195 continue; 196 } 197 dataFiles.addLast(df); 198 fileByFileMap.put(df.getFile(), df); 199 200 if( isCheckForCorruptionOnStartup() ) { 201 lastAppendLocation.set(recoveryCheck(df)); 202 } 203 } 204 } 205 206 nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1; 207 208 getOrCreateCurrentWriteFile(); 209 210 if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) { 211 LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies."); 212 } 213 214 if( lastAppendLocation.get()==null ) { 215 DataFile df = dataFiles.getTail(); 216 lastAppendLocation.set(recoveryCheck(df)); 217 } 218 219 // ensure we don't report unused space of last journal file in size metric 220 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 221 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 222 } 223 224 225 cleanupTask = new Runnable() { 226 @Override 227 public void run() { 228 cleanup(); 229 } 230 }; 231 this.timer = new Timer("KahaDB Scheduler", true); 232 TimerTask task = new SchedulerTimerTask(cleanupTask); 233 this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL); 234 long end = System.currentTimeMillis(); 235 LOG.trace("Startup took: "+(end-start)+" ms"); 236 } 237 238 239 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 240 241 if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) { 242 243 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 244 doPreallocationKernelCopy(file); 245 246 }else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 247 doPreallocationZeros(file); 248 } 249 else { 250 doPreallocationSparseFile(file); 251 } 252 }else { 253 LOG.info("Using journal preallocation scope of batch allocation"); 254 } 255 } 256 257 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 258 try { 259 file.seek(maxFileLength - 1); 260 file.write((byte)0x00); 261 } catch (IOException e) { 262 LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e); 263 } 264 } 265 266 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 267 ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); 268 for (int i = 0; i < maxFileLength; i++) { 269 buffer.put((byte) 0x00); 270 } 271 buffer.flip(); 272 273 try { 274 FileChannel channel = file.getChannel(); 275 channel.write(buffer); 276 channel.force(false); 277 channel.position(0); 278 } catch (IOException e) { 279 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 280 } 281 } 282 283 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 284 285 // create a template file that will be used to pre-allocate the journal files 286 File templateFile = createJournalTemplateFile(); 287 288 RandomAccessFile templateRaf = null; 289 try { 290 templateRaf = new RandomAccessFile(templateFile, "rw"); 291 templateRaf.setLength(maxFileLength); 292 templateRaf.getChannel().force(true); 293 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 294 templateRaf.close(); 295 templateFile.delete(); 296 } catch (FileNotFoundException e) { 297 LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e); 298 } catch (IOException e) { 299 LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e); 300 } 301 } 302 303 private File createJournalTemplateFile() { 304 String fileName = "db-log.template"; 305 File rc = new File(directory, fileName); 306 if (rc.exists()) { 307 System.out.println("deleting file because it already exists..."); 308 rc.delete(); 309 310 } 311 return rc; 312 } 313 314 315 private static byte[] bytes(String string) { 316 try { 317 return string.getBytes("UTF-8"); 318 } catch (UnsupportedEncodingException e) { 319 throw new RuntimeException(e); 320 } 321 } 322 323 protected Location recoveryCheck(DataFile dataFile) throws IOException { 324 Location location = new Location(); 325 location.setDataFileId(dataFile.getDataFileId()); 326 location.setOffset(0); 327 328 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 329 try { 330 while (true) { 331 int size = checkBatchRecord(reader, location.getOffset()); 332 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { 333 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 334 } else { 335 336 // Perhaps it's just some corruption... scan through the 337 // file to find the next valid batch record. We 338 // may have subsequent valid batch records. 339 int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); 340 if (nextOffset >= 0) { 341 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 342 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 343 dataFile.corruptedBlocks.add(sequence); 344 location.setOffset(nextOffset); 345 } else { 346 break; 347 } 348 } 349 } 350 351 } catch (IOException e) { 352 } finally { 353 accessorPool.closeDataFileAccessor(reader); 354 } 355 356 int existingLen = dataFile.getLength(); 357 dataFile.setLength(location.getOffset()); 358 if (existingLen > dataFile.getLength()) { 359 totalLength.addAndGet(dataFile.getLength() - existingLen); 360 } 361 362 if (!dataFile.corruptedBlocks.isEmpty()) { 363 // Is the end of the data file corrupted? 364 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 365 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 366 } 367 } 368 369 return location; 370 } 371 372 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 373 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 374 byte data[] = new byte[1024*4]; 375 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 376 377 int pos = 0; 378 while (true) { 379 pos = bs.indexOf(header, pos); 380 if (pos >= 0) { 381 return offset + pos; 382 } else { 383 // need to load the next data chunck in.. 384 if (bs.length != data.length) { 385 // If we had a short read then we were at EOF 386 return -1; 387 } 388 offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; 389 bs = new ByteSequence(data, 0, reader.read(offset, data)); 390 pos = 0; 391 } 392 } 393 } 394 395 396 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 397 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 398 DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord); 399 400 reader.readFully(offset, controlRecord); 401 402 // Assert that it's a batch record. 403 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 404 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 405 return -1; 406 } 407 } 408 409 int size = controlIs.readInt(); 410 if (size > MAX_BATCH_SIZE) { 411 return -1; 412 } 413 414 if (isChecksum()) { 415 416 long expectedChecksum = controlIs.readLong(); 417 if (expectedChecksum == 0) { 418 // Checksuming was not enabled when the record was stored. 419 // we can't validate the record :( 420 return size; 421 } 422 423 byte data[] = new byte[size]; 424 reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); 425 426 Checksum checksum = new Adler32(); 427 checksum.update(data, 0, data.length); 428 429 if (expectedChecksum != checksum.getValue()) { 430 return -1; 431 } 432 433 } 434 return size; 435 } 436 437 438 void addToTotalLength(int size) { 439 totalLength.addAndGet(size); 440 } 441 442 public long length() { 443 return totalLength.get(); 444 } 445 446 synchronized DataFile getOrCreateCurrentWriteFile() throws IOException { 447 if (dataFiles.isEmpty()) { 448 rotateWriteFile(); 449 } 450 451 DataFile current = dataFiles.getTail(); 452 453 if (current != null) { 454 return current; 455 } else { 456 return rotateWriteFile(); 457 } 458 } 459 460 synchronized DataFile rotateWriteFile() { 461 int nextNum = nextDataFileId++; 462 File file = getFile(nextNum); 463 DataFile nextWriteFile = new DataFile(file, nextNum); 464 fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); 465 fileByFileMap.put(file, nextWriteFile); 466 dataFiles.addLast(nextWriteFile); 467 return nextWriteFile; 468 } 469 470 public synchronized DataFile reserveDataFile() { 471 int nextNum = nextDataFileId++; 472 File file = getFile(nextNum); 473 DataFile reservedDataFile = new DataFile(file, nextNum); 474 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 475 fileByFileMap.put(file, reservedDataFile); 476 if (dataFiles.isEmpty()) { 477 dataFiles.addLast(reservedDataFile); 478 } else { 479 dataFiles.getTail().linkBefore(reservedDataFile); 480 } 481 return reservedDataFile; 482 } 483 484 public File getFile(int nextNum) { 485 String fileName = filePrefix + nextNum + fileSuffix; 486 File file = new File(directory, fileName); 487 return file; 488 } 489 490 synchronized DataFile getDataFile(Location item) throws IOException { 491 Integer key = Integer.valueOf(item.getDataFileId()); 492 DataFile dataFile = fileMap.get(key); 493 if (dataFile == null) { 494 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 495 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 496 } 497 return dataFile; 498 } 499 500 synchronized File getFile(Location item) throws IOException { 501 Integer key = Integer.valueOf(item.getDataFileId()); 502 DataFile dataFile = fileMap.get(key); 503 if (dataFile == null) { 504 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 505 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 506 } 507 return dataFile.getFile(); 508 } 509 510 public void close() throws IOException { 511 synchronized (this) { 512 if (!started) { 513 return; 514 } 515 if (this.timer != null) { 516 this.timer.cancel(); 517 } 518 accessorPool.close(); 519 } 520 // the appender can be calling back to to the journal blocking a close AMQ-5620 521 appender.close(); 522 synchronized (this) { 523 fileMap.clear(); 524 fileByFileMap.clear(); 525 dataFiles.clear(); 526 lastAppendLocation.set(null); 527 started = false; 528 } 529 } 530 531 protected synchronized void cleanup() { 532 if (accessorPool != null) { 533 accessorPool.disposeUnused(); 534 } 535 } 536 537 public synchronized boolean delete() throws IOException { 538 539 // Close all open file handles... 540 appender.close(); 541 accessorPool.close(); 542 543 boolean result = true; 544 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 545 DataFile dataFile = i.next(); 546 result &= dataFile.delete(); 547 } 548 549 totalLength.set(0); 550 fileMap.clear(); 551 fileByFileMap.clear(); 552 lastAppendLocation.set(null); 553 dataFiles = new LinkedNodeList<DataFile>(); 554 555 // reopen open file handles... 556 accessorPool = new DataFileAccessorPool(this); 557 appender = new DataFileAppender(this); 558 return result; 559 } 560 561 public synchronized void removeDataFiles(Set<Integer> files) throws IOException { 562 for (Integer key : files) { 563 // Can't remove the data file (or subsequent files) that is currently being written to. 564 if (key >= lastAppendLocation.get().getDataFileId()) { 565 continue; 566 } 567 DataFile dataFile = fileMap.get(key); 568 if (dataFile != null) { 569 forceRemoveDataFile(dataFile); 570 } 571 } 572 } 573 574 private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException { 575 accessorPool.disposeDataFileAccessors(dataFile); 576 fileByFileMap.remove(dataFile.getFile()); 577 fileMap.remove(dataFile.getDataFileId()); 578 totalLength.addAndGet(-dataFile.getLength()); 579 dataFile.unlink(); 580 if (archiveDataLogs) { 581 File directoryArchive = getDirectoryArchive(); 582 if (directoryArchive.exists()) { 583 LOG.debug("Archive directory exists: {}", directoryArchive); 584 } else { 585 if (directoryArchive.isAbsolute()) 586 if (LOG.isDebugEnabled()) { 587 LOG.debug("Archive directory [{}] does not exist - creating it now", 588 directoryArchive.getAbsolutePath()); 589 } 590 IOHelper.mkdirs(directoryArchive); 591 } 592 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 593 dataFile.move(directoryArchive); 594 LOG.debug("Successfully moved data file"); 595 } else { 596 LOG.debug("Deleting data file: {}", dataFile); 597 if (dataFile.delete()) { 598 LOG.debug("Discarded data file: {}", dataFile); 599 } else { 600 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 601 } 602 } 603 if (dataFileRemovedListener != null) { 604 dataFileRemovedListener.fileRemoved(dataFile); 605 } 606 } 607 608 /** 609 * @return the maxFileLength 610 */ 611 public int getMaxFileLength() { 612 return maxFileLength; 613 } 614 615 /** 616 * @param maxFileLength the maxFileLength to set 617 */ 618 public void setMaxFileLength(int maxFileLength) { 619 this.maxFileLength = maxFileLength; 620 } 621 622 @Override 623 public String toString() { 624 return directory.toString(); 625 } 626 627 public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { 628 629 Location cur = null; 630 while (true) { 631 if (cur == null) { 632 if (location == null) { 633 DataFile head = dataFiles.getHead(); 634 if (head == null) { 635 return null; 636 } 637 cur = new Location(); 638 cur.setDataFileId(head.getDataFileId()); 639 cur.setOffset(0); 640 } else { 641 // Set to the next offset.. 642 if (location.getSize() == -1) { 643 cur = new Location(location); 644 } else { 645 cur = new Location(location); 646 cur.setOffset(location.getOffset() + location.getSize()); 647 } 648 } 649 } else { 650 cur.setOffset(cur.getOffset() + cur.getSize()); 651 } 652 653 DataFile dataFile = getDataFile(cur); 654 655 // Did it go into the next file?? 656 if (dataFile.getLength() <= cur.getOffset()) { 657 dataFile = dataFile.getNext(); 658 if (dataFile == null) { 659 return null; 660 } else { 661 cur.setDataFileId(dataFile.getDataFileId().intValue()); 662 cur.setOffset(0); 663 } 664 } 665 666 // Load in location size and type. 667 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 668 try { 669 reader.readLocationDetails(cur); 670 } finally { 671 accessorPool.closeDataFileAccessor(reader); 672 } 673 674 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 675 if (corruptedRange != null) { 676 // skip corruption 677 cur.setSize((int) corruptedRange.range()); 678 } else if (cur.getType() == 0) { 679 // eof - jump to next datafile 680 cur.setOffset(maxFileLength); 681 } else if (cur.getType() == USER_RECORD_TYPE) { 682 // Only return user records. 683 return cur; 684 } 685 } 686 } 687 688 public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { 689 DataFile dataFile = getDataFile(location); 690 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 691 ByteSequence rc = null; 692 try { 693 rc = reader.readRecord(location); 694 } finally { 695 accessorPool.closeDataFileAccessor(reader); 696 } 697 return rc; 698 } 699 700 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 701 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 702 return loc; 703 } 704 705 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 706 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 707 return loc; 708 } 709 710 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 711 DataFile dataFile = getDataFile(location); 712 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 713 try { 714 updater.updateRecord(location, data, sync); 715 } finally { 716 accessorPool.closeDataFileAccessor(updater); 717 } 718 } 719 720 public PreallocationStrategy getPreallocationStrategy() { 721 return preallocationStrategy; 722 } 723 724 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 725 this.preallocationStrategy = preallocationStrategy; 726 } 727 728 public PreallocationScope getPreallocationScope() { 729 return preallocationScope; 730 } 731 732 public void setPreallocationScope(PreallocationScope preallocationScope) { 733 this.preallocationScope = preallocationScope; 734 } 735 736 public File getDirectory() { 737 return directory; 738 } 739 740 public void setDirectory(File directory) { 741 this.directory = directory; 742 } 743 744 public String getFilePrefix() { 745 return filePrefix; 746 } 747 748 public void setFilePrefix(String filePrefix) { 749 this.filePrefix = filePrefix; 750 } 751 752 public Map<WriteKey, WriteCommand> getInflightWrites() { 753 return inflightWrites; 754 } 755 756 public Location getLastAppendLocation() { 757 return lastAppendLocation.get(); 758 } 759 760 public void setLastAppendLocation(Location lastSyncedLocation) { 761 this.lastAppendLocation.set(lastSyncedLocation); 762 } 763 764 public File getDirectoryArchive() { 765 if (!directoryArchiveOverridden && (directoryArchive == null)) { 766 // create the directoryArchive relative to the journal location 767 directoryArchive = new File(directory.getAbsolutePath() + 768 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 769 } 770 return directoryArchive; 771 } 772 773 public void setDirectoryArchive(File directoryArchive) { 774 directoryArchiveOverridden = true; 775 this.directoryArchive = directoryArchive; 776 } 777 778 public boolean isArchiveDataLogs() { 779 return archiveDataLogs; 780 } 781 782 public void setArchiveDataLogs(boolean archiveDataLogs) { 783 this.archiveDataLogs = archiveDataLogs; 784 } 785 786 public synchronized DataFile getDataFileById(int dataFileId) { 787 if (dataFiles.isEmpty()) { 788 return null; 789 } 790 791 return fileMap.get(Integer.valueOf(dataFileId)); 792 } 793 794 public synchronized DataFile getCurrentDataFile() { 795 if (dataFiles.isEmpty()) { 796 return null; 797 } 798 799 DataFile current = dataFiles.getTail(); 800 801 if (current != null) { 802 return current; 803 } else { 804 return null; 805 } 806 } 807 808 public synchronized Integer getCurrentDataFileId() { 809 DataFile current = getCurrentDataFile(); 810 if (current != null) { 811 return current.getDataFileId(); 812 } else { 813 return null; 814 } 815 } 816 817 /** 818 * Get a set of files - only valid after start() 819 * 820 * @return files currently being used 821 */ 822 public Set<File> getFiles() { 823 return fileByFileMap.keySet(); 824 } 825 826 public synchronized Map<Integer, DataFile> getFileMap() { 827 return new TreeMap<Integer, DataFile>(fileMap); 828 } 829 830 public long getDiskSize() { 831 return totalLength.get(); 832 } 833 834 public void setReplicationTarget(ReplicationTarget replicationTarget) { 835 this.replicationTarget = replicationTarget; 836 } 837 public ReplicationTarget getReplicationTarget() { 838 return replicationTarget; 839 } 840 841 public String getFileSuffix() { 842 return fileSuffix; 843 } 844 845 public void setFileSuffix(String fileSuffix) { 846 this.fileSuffix = fileSuffix; 847 } 848 849 public boolean isChecksum() { 850 return checksum; 851 } 852 853 public void setChecksum(boolean checksumWrites) { 854 this.checksum = checksumWrites; 855 } 856 857 public boolean isCheckForCorruptionOnStartup() { 858 return checkForCorruptionOnStartup; 859 } 860 861 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 862 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 863 } 864 865 public void setWriteBatchSize(int writeBatchSize) { 866 this.writeBatchSize = writeBatchSize; 867 } 868 869 public int getWriteBatchSize() { 870 return writeBatchSize; 871 } 872 873 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 874 this.totalLength = storeSizeAccumulator; 875 } 876 877 public void setEnableAsyncDiskSync(boolean val) { 878 this.enableAsyncDiskSync = val; 879 } 880 881 public boolean isEnableAsyncDiskSync() { 882 return enableAsyncDiskSync; 883 } 884 885 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 886 this.dataFileRemovedListener = dataFileRemovedListener; 887 } 888 889 public static class WriteCommand extends LinkedNode<WriteCommand> { 890 public final Location location; 891 public final ByteSequence data; 892 final boolean sync; 893 public final Runnable onComplete; 894 895 public WriteCommand(Location location, ByteSequence data, boolean sync) { 896 this.location = location; 897 this.data = data; 898 this.sync = sync; 899 this.onComplete = null; 900 } 901 902 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 903 this.location = location; 904 this.data = data; 905 this.onComplete = onComplete; 906 this.sync = false; 907 } 908 } 909 910 public static class WriteKey { 911 private final int file; 912 private final long offset; 913 private final int hash; 914 915 public WriteKey(Location item) { 916 file = item.getDataFileId(); 917 offset = item.getOffset(); 918 // TODO: see if we can build a better hash 919 hash = (int)(file ^ offset); 920 } 921 922 @Override 923 public int hashCode() { 924 return hash; 925 } 926 927 @Override 928 public boolean equals(Object obj) { 929 if (obj instanceof WriteKey) { 930 WriteKey di = (WriteKey)obj; 931 return di.file == file && di.offset == offset; 932 } 933 return false; 934 } 935 } 936}