diff --git a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/cache2/FileOperator.kt b/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/cache2/FileOperator.kt deleted file mode 100644 index 1d1022104..000000000 --- a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/cache2/FileOperator.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (C) 2016 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.cache2 - -import java.io.IOException -import java.nio.channels.FileChannel -import okio.Buffer - -/** - * Read and write a target file. Unlike Okio's built-in `Okio.source(java.io.File file)` and `Okio.sink(java.io.File file)` - * this class offers: - * - * * **Read/write:** read and write using the same operator. - * * **Random access:** access any position within the file. - * * **Shared channels:** read and write a file channel that's shared between - * multiple operators. Note that although the underlying [FileChannel] may be shared, - * each [FileOperator] should not be. - */ -internal class FileOperator( - private val fileChannel: FileChannel, -) { - /** Write [byteCount] bytes from [source] to the file at [pos]. */ - @Throws(IOException::class) - fun write( - pos: Long, - source: Buffer, - byteCount: Long, - ) { - if (byteCount < 0L || byteCount > source.size) { - throw IndexOutOfBoundsException() - } - var mutablePos = pos - var mutableByteCount = byteCount - - while (mutableByteCount > 0L) { - val bytesWritten = fileChannel.transferFrom(source, mutablePos, mutableByteCount) - mutablePos += bytesWritten - mutableByteCount -= bytesWritten - } - } - - /** - * Copy [byteCount] bytes from the file at [pos] into `sink`. It is the - * caller's responsibility to make sure there are sufficient bytes to read: if there aren't this - * method throws an `EOFException`. - */ - fun read( - pos: Long, - sink: Buffer, - byteCount: Long, - ) { - if (byteCount < 0L) { - throw IndexOutOfBoundsException() - } - var mutablePos = pos - var mutableByteCount = byteCount - - while (mutableByteCount > 0L) { - val bytesRead = fileChannel.transferTo(mutablePos, mutableByteCount, sink) - mutablePos += bytesRead - mutableByteCount -= bytesRead - } - } -} diff --git a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/cache2/Relay.kt b/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/cache2/Relay.kt deleted file mode 100644 index fbd4e243e..000000000 --- a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/cache2/Relay.kt +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Copyright (C) 2016 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.cache2 - -import java.io.File -import java.io.IOException -import java.io.RandomAccessFile -import okhttp3.internal.closeQuietly -import okhttp3.internal.concurrent.Lockable -import okhttp3.internal.concurrent.notifyAll -import okio.Buffer -import okio.ByteString -import okio.ByteString.Companion.encodeUtf8 -import okio.Source -import okio.Timeout - -/** - * Replicates a single upstream source into multiple downstream sources. Each downstream source - * returns the same bytes as the upstream source. Downstream sources may read data either as it - * is returned by upstream, or after the upstream source has been exhausted. - * - * As bytes are returned from upstream they are written to a local file. Downstream sources read - * from this file as necessary. - * - * This class also keeps a small buffer of bytes recently read from upstream. This is intended to - * save a small amount of file I/O and data copying. - */ -class Relay private constructor( - /** - * Read/write persistence of the upstream source and its metadata. Its layout is as follows: - * - * * 16 bytes: either `OkHttp cache v1\n` if the persisted file is complete. This is another - * sequence of bytes if the file is incomplete and should not be used. - * * 8 bytes: *n*: upstream data size - * * 8 bytes: *m*: metadata size - * * *n* bytes: upstream data - * * *m* bytes: metadata - * - * This is closed and assigned to null when the last source is closed and no further sources - * are permitted. - */ - var file: RandomAccessFile?, - /** - * Null once the file has a complete copy of the upstream bytes. Only the [upstreamReader] thread - * may access this source. - */ - var upstream: Source?, - /** The number of bytes consumed from [upstream]. Guarded by this. */ - var upstreamPos: Long, - /** User-supplied additional data persisted with the source data. */ - private val metadata: ByteString, - /** The maximum size of [buffer]. */ - val bufferMaxSize: Long, -) : Lockable { - /** The thread that currently has access to upstream. Possibly null. Guarded by this. */ - var upstreamReader: Thread? = null - - /** - * A buffer for [upstreamReader] to use when pulling bytes from upstream. Only the - * [upstreamReader] thread may access this buffer. - */ - val upstreamBuffer = Buffer() - - /** True if there are no further bytes to read from [upstream]. Guarded by this. */ - var complete = (upstream == null) - - /** The most recently read bytes from [upstream]. This is a suffix of [file]. Guarded by this. */ - val buffer = Buffer() - - /** - * Reference count of the number of active sources reading this stream. When decremented to 0 - * resources are released and all following calls to [.newSource] return null. Guarded by this. - */ - var sourceCount = 0 - - val isClosed: Boolean - get() = file == null - - @Throws(IOException::class) - private fun writeHeader( - prefix: ByteString, - upstreamSize: Long, - metadataSize: Long, - ) { - val header = - Buffer().apply { - write(prefix) - writeLong(upstreamSize) - writeLong(metadataSize) - require(size == FILE_HEADER_SIZE) - } - - val fileOperator = FileOperator(file!!.channel) - fileOperator.write(0, header, FILE_HEADER_SIZE) - } - - @Throws(IOException::class) - private fun writeMetadata(upstreamSize: Long) { - val metadataBuffer = Buffer() - metadataBuffer.write(metadata) - - val fileOperator = FileOperator(file!!.channel) - fileOperator.write(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadata.size.toLong()) - } - - @Throws(IOException::class) - fun commit(upstreamSize: Long) { - // Write metadata to the end of the file. - writeMetadata(upstreamSize) - file!!.channel.force(false) - - // Once everything else is in place we can swap the dirty header for a clean one. - writeHeader(PREFIX_CLEAN, upstreamSize, metadata.size.toLong()) - file!!.channel.force(false) - - // This file is complete. - synchronized(this@Relay) { - complete = true - } - - upstream?.closeQuietly() - upstream = null - } - - fun metadata(): ByteString = metadata - - /** - * Returns a new source that returns the same bytes as upstream. Returns null if this relay has - * been closed and no further sources are possible. In that case callers should retry after - * building a new relay with [.read]. - */ - fun newSource(): Source? { - synchronized(this@Relay) { - if (file == null) return null - sourceCount++ - } - - return RelaySource() - } - - internal inner class RelaySource : Source { - private val timeout = Timeout() - - /** The operator to read and write the shared file. Null if this source is closed. */ - private var fileOperator: FileOperator? = FileOperator(file!!.channel) - - /** The next byte to read. This is always less than or equal to [upstreamPos]. */ - private var sourcePos = 0L - - /** - * Selects where to find the bytes for a read and read them. This is one of three sources. - * - * ## Upstream - * - * In this case the current thread is assigned as the upstream reader. We read bytes from - * upstream and copy them to both the file and to the buffer. Finally we release the upstream - * reader lock and return the new bytes. - * - * ## The file - * - * In this case we copy bytes from the file to the [sink]. - * - * ## The buffer - * - * In this case the bytes are immediately copied into [sink] and the number of bytes copied is - * returned. - * - * If upstream would be selected but another thread is already reading upstream this will - * block until that read completes. It is possible to time out while waiting for that. - */ - @Throws(IOException::class) - override fun read( - sink: Buffer, - byteCount: Long, - ): Long { - check(fileOperator != null) - - val source: Int = - synchronized(this@Relay) { - // We need new data from upstream. - while (true) { - val upstreamPos = this@Relay.upstreamPos - if (sourcePos != upstreamPos) break - - // No more data upstream. We're done. - if (complete) return -1L - - // Another thread is already reading. Wait for that. - if (upstreamReader != null) { - timeout.waitUntilNotified(this@Relay) - continue - } - - // We will do the read. - upstreamReader = Thread.currentThread() - return@synchronized SOURCE_UPSTREAM - } - - val bufferPos = upstreamPos - buffer.size - - // Bytes of the read precede the buffer. Read from the file. - if (sourcePos < bufferPos) { - return@synchronized SOURCE_FILE - } - - // The buffer has the data we need. Read from there and return immediately. - val bytesToRead = minOf(byteCount, upstreamPos - sourcePos) - buffer.copyTo(sink, sourcePos - bufferPos, bytesToRead) - sourcePos += bytesToRead - return bytesToRead - } - - // Read from the file. - if (source == SOURCE_FILE) { - val bytesToRead = minOf(byteCount, upstreamPos - sourcePos) - fileOperator!!.read(FILE_HEADER_SIZE + sourcePos, sink, bytesToRead) - sourcePos += bytesToRead - return bytesToRead - } - - // Read from upstream. This always reads a full buffer: that might be more than what the - // current call to Source.read() has requested. - try { - val upstreamBytesRead = upstream!!.read(upstreamBuffer, bufferMaxSize) - - // If we've exhausted upstream, we're done. - if (upstreamBytesRead == -1L) { - commit(upstreamPos) - return -1L - } - - // Update this source and prepare this call's result. - val bytesRead = minOf(upstreamBytesRead, byteCount) - upstreamBuffer.copyTo(sink, 0, bytesRead) - sourcePos += bytesRead - - // Append the upstream bytes to the file. - fileOperator!!.write( - FILE_HEADER_SIZE + upstreamPos, - upstreamBuffer.clone(), - upstreamBytesRead, - ) - - synchronized(this@Relay) { - // Append new upstream bytes into the buffer. Trim it to its max size. - buffer.write(upstreamBuffer, upstreamBytesRead) - if (buffer.size > bufferMaxSize) { - buffer.skip(buffer.size - bufferMaxSize) - } - - // Now that the file and buffer have bytes, adjust upstreamPos. - this@Relay.upstreamPos += upstreamBytesRead - } - - return bytesRead - } finally { - synchronized(this@Relay) { - upstreamReader = null - this@Relay.notifyAll() - } - } - } - - override fun timeout(): Timeout = timeout - - @Throws(IOException::class) - override fun close() { - if (fileOperator == null) return // Already closed. - fileOperator = null - - var fileToClose: RandomAccessFile? = null - synchronized(this@Relay) { - sourceCount-- - if (sourceCount == 0) { - fileToClose = file - file = null - } - } - - fileToClose?.closeQuietly() - } - } - - companion object { - // TODO(jwilson): what to do about timeouts? They could be different and unfortunately when any - // timeout is hit we like to tear down the whole stream. - - private const val SOURCE_UPSTREAM = 1 - private const val SOURCE_FILE = 2 - - @JvmField val PREFIX_CLEAN = "OkHttp cache v1\n".encodeUtf8() - - @JvmField val PREFIX_DIRTY = "OkHttp DIRTY :(\n".encodeUtf8() - private const val FILE_HEADER_SIZE = 32L - - /** - * Creates a new relay that reads a live stream from [upstream], using [file] to share that data - * with other sources. - * - * **Warning:** callers to this method must immediately call [newSource] to create a source and - * close that when they're done. Otherwise a handle to [file] will be leaked. - */ - @Throws(IOException::class) - fun edit( - file: File, - upstream: Source, - metadata: ByteString, - bufferMaxSize: Long, - ): Relay { - val randomAccessFile = RandomAccessFile(file, "rw") - val result = Relay(randomAccessFile, upstream, 0L, metadata, bufferMaxSize) - - // Write a dirty header. That way if we crash we won't attempt to recover this. - randomAccessFile.setLength(0L) - result.writeHeader(PREFIX_DIRTY, -1L, -1L) - - return result - } - - /** - * Creates a relay that reads a recorded stream from [file]. - * - * **Warning:** callers to this method must immediately call [newSource] to create a source and - * close that when they're done. Otherwise a handle to [file] will be leaked. - */ - @Throws(IOException::class) - fun read(file: File): Relay { - val randomAccessFile = RandomAccessFile(file, "rw") - val fileOperator = FileOperator(randomAccessFile.channel) - - // Read the header. - val header = Buffer() - fileOperator.read(0, header, FILE_HEADER_SIZE) - val prefix = header.readByteString(PREFIX_CLEAN.size.toLong()) - if (prefix != PREFIX_CLEAN) throw IOException("unreadable cache file") - val upstreamSize = header.readLong() - val metadataSize = header.readLong() - - // Read the metadata. - val metadataBuffer = Buffer() - fileOperator.read(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadataSize) - val metadata = metadataBuffer.readByteString() - - // Return the result. - return Relay(randomAccessFile, null, upstreamSize, metadata, 0L) - } - } -} diff --git a/okhttp/src/jvmTest/kotlin/okhttp3/internal/cache2/FileOperatorTest.kt b/okhttp/src/jvmTest/kotlin/okhttp3/internal/cache2/FileOperatorTest.kt deleted file mode 100644 index ef18c3b15..000000000 --- a/okhttp/src/jvmTest/kotlin/okhttp3/internal/cache2/FileOperatorTest.kt +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (C) 2016 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.cache2 - -import assertk.assertThat -import assertk.assertions.isEqualTo -import java.io.File -import java.io.RandomAccessFile -import java.util.Random -import kotlin.test.assertFailsWith -import okio.Buffer -import okio.ByteString -import okio.ByteString.Companion.encodeUtf8 -import okio.buffer -import okio.sink -import okio.source -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.io.TempDir - -class FileOperatorTest { - @TempDir - var tempDir: File? = null - private var file: File? = null - private var randomAccessFile: RandomAccessFile? = null - - @BeforeEach - fun setUp() { - file = File(tempDir, "test") - randomAccessFile = RandomAccessFile(file, "rw") - } - - @AfterEach - fun tearDown() { - randomAccessFile!!.close() - } - - @Test - fun read() { - write("Hello, World".encodeUtf8()) - val operator = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val buffer = Buffer() - operator.read(0, buffer, 5) - assertThat(buffer.readUtf8()).isEqualTo("Hello") - operator.read(4, buffer, 5) - assertThat(buffer.readUtf8()).isEqualTo("o, Wo") - } - - @Test - fun write() { - val operator = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val buffer1 = Buffer().writeUtf8("Hello, World") - operator.write(0, buffer1, 5) - assertThat(buffer1.readUtf8()).isEqualTo(", World") - val buffer2 = Buffer().writeUtf8("icopter!") - operator.write(3, buffer2, 7) - assertThat(buffer2.readUtf8()).isEqualTo("!") - assertThat(snapshot()).isEqualTo("Helicopter".encodeUtf8()) - } - - @Test - fun readAndWrite() { - val operator = - FileOperator( - randomAccessFile!!.getChannel(), - ) - write("woman god creates dinosaurs destroys. ".encodeUtf8()) - val buffer = Buffer() - operator.read(6, buffer, 21) - operator.read(36, buffer, 1) - operator.read(5, buffer, 5) - operator.read(28, buffer, 8) - operator.read(17, buffer, 10) - operator.read(36, buffer, 2) - operator.read(2, buffer, 4) - operator.write(0, buffer, buffer.size) - operator.read(0, buffer, 12) - operator.read(47, buffer, 3) - operator.read(45, buffer, 2) - operator.read(47, buffer, 3) - operator.read(26, buffer, 10) - operator.read(23, buffer, 3) - operator.write(47, buffer, buffer.size) - operator.read(62, buffer, 6) - operator.read(4, buffer, 19) - operator.write(80, buffer, buffer.size) - assertThat(snapshot()).isEqualTo( - ( - "" + - "god creates dinosaurs. " + - "god destroys dinosaurs. " + - "god creates man. " + - "man destroys god. " + - "man creates dinosaurs. " - ).encodeUtf8(), - ) - } - - @Test - fun multipleOperatorsShareOneFile() { - val operatorA = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val operatorB = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val bufferA = Buffer() - val bufferB = Buffer() - bufferA.writeUtf8("Dodgson!\n") - operatorA.write(0, bufferA, 9) - bufferB.writeUtf8("You shouldn't use my name.\n") - operatorB.write(9, bufferB, 27) - bufferA.writeUtf8("Dodgson, we've got Dodgson here!\n") - operatorA.write(36, bufferA, 33) - operatorB.read(0, bufferB, 9) - assertThat(bufferB.readUtf8()).isEqualTo("Dodgson!\n") - operatorA.read(9, bufferA, 27) - assertThat(bufferA.readUtf8()).isEqualTo("You shouldn't use my name.\n") - operatorB.read(36, bufferB, 33) - assertThat(bufferB.readUtf8()).isEqualTo("Dodgson, we've got Dodgson here!\n") - } - - @Test - fun largeRead() { - val data = randomByteString(1000000) - write(data) - val operator = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val buffer = Buffer() - operator.read(0, buffer, data.size.toLong()) - assertThat(buffer.readByteString()).isEqualTo(data) - } - - @Test - fun largeWrite() { - val data = randomByteString(1000000) - val operator = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val buffer = Buffer().write(data) - operator.write(0, buffer, data.size.toLong()) - assertThat(snapshot()).isEqualTo(data) - } - - @Test - fun readBounds() { - val operator = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val buffer = Buffer() - assertFailsWith { - operator.read(0, buffer, -1L) - } - } - - @Test - fun writeBounds() { - val operator = - FileOperator( - randomAccessFile!!.getChannel(), - ) - val buffer = Buffer().writeUtf8("abc") - assertFailsWith { - operator.write(0, buffer, -1L) - } - assertFailsWith { - operator.write(0, buffer, 4L) - } - } - - private fun randomByteString(byteCount: Int): ByteString { - val bytes = ByteArray(byteCount) - Random(0).nextBytes(bytes) - return ByteString.of(*bytes) - } - - private fun snapshot(): ByteString { - randomAccessFile!!.getChannel().force(false) - val source = file!!.source().buffer() - return source.readByteString() - } - - private fun write(data: ByteString) { - val sink = file!!.sink().buffer() - sink.write(data) - sink.close() - } -} diff --git a/okhttp/src/jvmTest/kotlin/okhttp3/internal/cache2/RelayTest.kt b/okhttp/src/jvmTest/kotlin/okhttp3/internal/cache2/RelayTest.kt deleted file mode 100644 index 2292c29af..000000000 --- a/okhttp/src/jvmTest/kotlin/okhttp3/internal/cache2/RelayTest.kt +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (C) 2016 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.cache2 - -import assertk.assertThat -import assertk.assertions.isEqualTo -import assertk.assertions.isFalse -import assertk.assertions.isNull -import assertk.assertions.isTrue -import java.io.File -import java.io.IOException -import java.util.concurrent.Callable -import java.util.concurrent.Executors -import kotlin.test.assertFailsWith -import okhttp3.TestUtil.threadFactory -import okhttp3.internal.cache2.Relay.Companion.edit -import okhttp3.internal.cache2.Relay.Companion.read -import okio.Buffer -import okio.ByteString -import okio.ByteString.Companion.encodeUtf8 -import okio.Pipe -import okio.Source -import okio.buffer -import okio.source -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Tag -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.io.TempDir - -@Tag("Slowish") -class RelayTest { - @TempDir - var tempDir: File? = null - private val executor = Executors.newCachedThreadPool(threadFactory("RelayTest")) - private val metadata: ByteString = "great metadata!".encodeUtf8() - private lateinit var file: File - - @BeforeEach - fun setUp() { - file = File(tempDir, "test") - } - - @AfterEach - fun tearDown() { - executor.shutdown() - } - - @Test - fun singleSource() { - val upstream = Buffer() - upstream.writeUtf8("abcdefghijklm") - val relay = edit(file, upstream, metadata, 1024) - val source = relay.newSource() - val sourceBuffer = Buffer() - assertThat(source!!.read(sourceBuffer, 5)).isEqualTo(5) - assertThat(sourceBuffer.readUtf8()).isEqualTo("abcde") - assertThat(source.read(sourceBuffer, 1024)).isEqualTo(8) - assertThat(sourceBuffer.readUtf8()).isEqualTo("fghijklm") - assertThat(source.read(sourceBuffer, 1024)).isEqualTo(-1) - assertThat(sourceBuffer.size).isEqualTo(0) - source.close() - assertThat(relay.isClosed).isTrue() - assertFile(Relay.PREFIX_CLEAN, 13L, metadata.size, "abcdefghijklm", metadata) - } - - @Test - fun multipleSources() { - val upstream = Buffer() - upstream.writeUtf8("abcdefghijklm") - val relay = edit(file, upstream, metadata, 1024) - val source1 = relay.newSource()!!.buffer() - val source2 = relay.newSource()!!.buffer() - assertThat(source1.readUtf8()).isEqualTo("abcdefghijklm") - assertThat(source2.readUtf8()).isEqualTo("abcdefghijklm") - source1.close() - source2.close() - assertThat(relay.isClosed).isTrue() - assertFile(Relay.PREFIX_CLEAN, 13L, metadata.size, "abcdefghijklm", metadata) - } - - @Test - fun readFromBuffer() { - val upstream = Buffer() - upstream.writeUtf8("abcdefghij") - val relay = edit(file, upstream, metadata, 5) - val source1 = relay.newSource()!!.buffer() - val source2 = relay.newSource()!!.buffer() - assertThat(source1.readUtf8(5)).isEqualTo("abcde") - assertThat(source2.readUtf8(5)).isEqualTo("abcde") - assertThat(source2.readUtf8(5)).isEqualTo("fghij") - assertThat(source1.readUtf8(5)).isEqualTo("fghij") - assertThat(source1.exhausted()).isTrue() - assertThat(source2.exhausted()).isTrue() - source1.close() - source2.close() - assertThat(relay.isClosed).isTrue() - assertFile(Relay.PREFIX_CLEAN, 10L, metadata.size, "abcdefghij", metadata) - } - - @Test - fun readFromFile() { - val upstream = Buffer() - upstream.writeUtf8("abcdefghijklmnopqrst") - val relay = edit(file, upstream, metadata, 5) - val source1 = relay.newSource()!!.buffer() - val source2 = relay.newSource()!!.buffer() - assertThat(source1.readUtf8(10)).isEqualTo("abcdefghij") - assertThat(source2.readUtf8(10)).isEqualTo("abcdefghij") - assertThat(source2.readUtf8(10)).isEqualTo("klmnopqrst") - assertThat(source1.readUtf8(10)).isEqualTo("klmnopqrst") - assertThat(source1.exhausted()).isTrue() - assertThat(source2.exhausted()).isTrue() - source1.close() - source2.close() - assertThat(relay.isClosed).isTrue() - assertFile(Relay.PREFIX_CLEAN, 20L, metadata.size, "abcdefghijklmnopqrst", metadata) - } - - @Test - fun readAfterEdit() { - val upstream = Buffer() - upstream.writeUtf8("abcdefghij") - val relay1 = edit(file, upstream, metadata, 5) - val source1 = relay1.newSource()!!.buffer() - assertThat(source1.readUtf8(10)).isEqualTo("abcdefghij") - assertThat(source1.exhausted()).isTrue() - source1.close() - assertThat(relay1.isClosed).isTrue() - - // Since relay1 is closed, new sources cannot be created. - assertThat(relay1.newSource()).isNull() - val relay2 = read(file) - assertThat(relay2.metadata()).isEqualTo(metadata) - val source2 = relay2.newSource()!!.buffer() - assertThat(source2.readUtf8(10)).isEqualTo("abcdefghij") - assertThat(source2.exhausted()).isTrue() - source2.close() - assertThat(relay2.isClosed).isTrue() - - // Since relay2 is closed, new sources cannot be created. - assertThat(relay2.newSource()).isNull() - assertFile(Relay.PREFIX_CLEAN, 10L, metadata.size, "abcdefghij", metadata) - } - - @Test - fun closeBeforeExhaustLeavesDirtyFile() { - val upstream = Buffer() - upstream.writeUtf8("abcdefghij") - val relay1 = edit(file, upstream, metadata, 5) - val source1 = relay1.newSource()!!.buffer() - assertThat(source1.readUtf8(10)).isEqualTo("abcdefghij") - source1.close() // Not exhausted! - assertThat(relay1.isClosed).isTrue() - assertFailsWith { - read(file) - }.also { expected -> - assertThat(expected.message).isEqualTo("unreadable cache file") - } - assertFile(Relay.PREFIX_DIRTY, -1L, -1, null, null) - } - - @Test - fun redundantCallsToCloseAreIgnored() { - val upstream = Buffer() - upstream.writeUtf8("abcde") - val relay = edit(file, upstream, metadata, 1024) - val source1 = relay.newSource() - val source2 = relay.newSource() - source1!!.close() - source1.close() // Unnecessary. Shouldn't decrement the reference count. - assertThat(relay.isClosed).isFalse() - source2!!.close() - assertThat(relay.isClosed).isTrue() - assertFile(Relay.PREFIX_DIRTY, -1L, -1, null, null) - } - - @Test - fun racingReaders() { - val pipe = Pipe(1024) - val sink = pipe.sink.buffer() - val relay = edit(file, pipe.source, metadata, 5) - val future1 = executor.submit(sourceReader(relay.newSource())) - val future2 = executor.submit(sourceReader(relay.newSource())) - Thread.sleep(500) - sink.writeUtf8("abcdefghij") - Thread.sleep(500) - sink.writeUtf8("klmnopqrst") - sink.close() - assertThat(future1.get()) - .isEqualTo("abcdefghijklmnopqrst".encodeUtf8()) - assertThat(future2.get()) - .isEqualTo("abcdefghijklmnopqrst".encodeUtf8()) - assertThat(relay.isClosed).isTrue() - assertFile(Relay.PREFIX_CLEAN, 20L, metadata.size, "abcdefghijklmnopqrst", metadata) - } - - /** Returns a callable that reads all of source, closes it, and returns the bytes. */ - private fun sourceReader(source: Source?): Callable = - Callable { - val buffer = Buffer() - while (source!!.read(buffer, 16384) != -1L) { - } - source.close() - buffer.readByteString() - } - - private fun assertFile( - prefix: ByteString, - upstreamSize: Long, - metadataSize: Int, - upstream: String?, - metadata: ByteString?, - ) { - val source = file.source().buffer() - assertThat(source.readByteString(prefix.size.toLong())).isEqualTo(prefix) - assertThat(source.readLong()).isEqualTo(upstreamSize) - assertThat(source.readLong()).isEqualTo(metadataSize.toLong()) - if (upstream != null) { - assertThat(source.readUtf8(upstreamSize)).isEqualTo(upstream) - } - if (metadata != null) { - assertThat(source.readByteString(metadataSize.toLong())).isEqualTo(metadata) - } - source.close() - } -}