1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-12 10:23:16 +03:00

Delete Relay (#9182)

This was never used.

I'm eager to delete it now 'cause some of the tests
are failing in our Windows build on CI.
This commit is contained in:
Jesse Wilson
2025-11-04 16:49:45 -05:00
committed by GitHub
parent d1aaf7c6cc
commit 24cc7ecbed
4 changed files with 0 additions and 892 deletions

View File

@@ -1,77 +0,0 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.cache2
import java.io.IOException
import java.nio.channels.FileChannel
import okio.Buffer
/**
* Read and write a target file. Unlike Okio's built-in `Okio.source(java.io.File file)` and `Okio.sink(java.io.File file)`
* this class offers:
*
* * **Read/write:** read and write using the same operator.
* * **Random access:** access any position within the file.
* * **Shared channels:** read and write a file channel that's shared between
* multiple operators. Note that although the underlying [FileChannel] may be shared,
* each [FileOperator] should not be.
*/
internal class FileOperator(
private val fileChannel: FileChannel,
) {
/** Write [byteCount] bytes from [source] to the file at [pos]. */
@Throws(IOException::class)
fun write(
pos: Long,
source: Buffer,
byteCount: Long,
) {
if (byteCount < 0L || byteCount > source.size) {
throw IndexOutOfBoundsException()
}
var mutablePos = pos
var mutableByteCount = byteCount
while (mutableByteCount > 0L) {
val bytesWritten = fileChannel.transferFrom(source, mutablePos, mutableByteCount)
mutablePos += bytesWritten
mutableByteCount -= bytesWritten
}
}
/**
* Copy [byteCount] bytes from the file at [pos] into `sink`. It is the
* caller's responsibility to make sure there are sufficient bytes to read: if there aren't this
* method throws an `EOFException`.
*/
fun read(
pos: Long,
sink: Buffer,
byteCount: Long,
) {
if (byteCount < 0L) {
throw IndexOutOfBoundsException()
}
var mutablePos = pos
var mutableByteCount = byteCount
while (mutableByteCount > 0L) {
val bytesRead = fileChannel.transferTo(mutablePos, mutableByteCount, sink)
mutablePos += bytesRead
mutableByteCount -= bytesRead
}
}
}

View File

@@ -1,361 +0,0 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.cache2
import java.io.File
import java.io.IOException
import java.io.RandomAccessFile
import okhttp3.internal.closeQuietly
import okhttp3.internal.concurrent.Lockable
import okhttp3.internal.concurrent.notifyAll
import okio.Buffer
import okio.ByteString
import okio.ByteString.Companion.encodeUtf8
import okio.Source
import okio.Timeout
/**
* Replicates a single upstream source into multiple downstream sources. Each downstream source
* returns the same bytes as the upstream source. Downstream sources may read data either as it
* is returned by upstream, or after the upstream source has been exhausted.
*
* As bytes are returned from upstream they are written to a local file. Downstream sources read
* from this file as necessary.
*
* This class also keeps a small buffer of bytes recently read from upstream. This is intended to
* save a small amount of file I/O and data copying.
*/
class Relay private constructor(
/**
* Read/write persistence of the upstream source and its metadata. Its layout is as follows:
*
* * 16 bytes: either `OkHttp cache v1\n` if the persisted file is complete. This is another
* sequence of bytes if the file is incomplete and should not be used.
* * 8 bytes: *n*: upstream data size
* * 8 bytes: *m*: metadata size
* * *n* bytes: upstream data
* * *m* bytes: metadata
*
* This is closed and assigned to null when the last source is closed and no further sources
* are permitted.
*/
var file: RandomAccessFile?,
/**
* Null once the file has a complete copy of the upstream bytes. Only the [upstreamReader] thread
* may access this source.
*/
var upstream: Source?,
/** The number of bytes consumed from [upstream]. Guarded by this. */
var upstreamPos: Long,
/** User-supplied additional data persisted with the source data. */
private val metadata: ByteString,
/** The maximum size of [buffer]. */
val bufferMaxSize: Long,
) : Lockable {
/** The thread that currently has access to upstream. Possibly null. Guarded by this. */
var upstreamReader: Thread? = null
/**
* A buffer for [upstreamReader] to use when pulling bytes from upstream. Only the
* [upstreamReader] thread may access this buffer.
*/
val upstreamBuffer = Buffer()
/** True if there are no further bytes to read from [upstream]. Guarded by this. */
var complete = (upstream == null)
/** The most recently read bytes from [upstream]. This is a suffix of [file]. Guarded by this. */
val buffer = Buffer()
/**
* Reference count of the number of active sources reading this stream. When decremented to 0
* resources are released and all following calls to [.newSource] return null. Guarded by this.
*/
var sourceCount = 0
val isClosed: Boolean
get() = file == null
@Throws(IOException::class)
private fun writeHeader(
prefix: ByteString,
upstreamSize: Long,
metadataSize: Long,
) {
val header =
Buffer().apply {
write(prefix)
writeLong(upstreamSize)
writeLong(metadataSize)
require(size == FILE_HEADER_SIZE)
}
val fileOperator = FileOperator(file!!.channel)
fileOperator.write(0, header, FILE_HEADER_SIZE)
}
@Throws(IOException::class)
private fun writeMetadata(upstreamSize: Long) {
val metadataBuffer = Buffer()
metadataBuffer.write(metadata)
val fileOperator = FileOperator(file!!.channel)
fileOperator.write(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadata.size.toLong())
}
@Throws(IOException::class)
fun commit(upstreamSize: Long) {
// Write metadata to the end of the file.
writeMetadata(upstreamSize)
file!!.channel.force(false)
// Once everything else is in place we can swap the dirty header for a clean one.
writeHeader(PREFIX_CLEAN, upstreamSize, metadata.size.toLong())
file!!.channel.force(false)
// This file is complete.
synchronized(this@Relay) {
complete = true
}
upstream?.closeQuietly()
upstream = null
}
fun metadata(): ByteString = metadata
/**
* Returns a new source that returns the same bytes as upstream. Returns null if this relay has
* been closed and no further sources are possible. In that case callers should retry after
* building a new relay with [.read].
*/
fun newSource(): Source? {
synchronized(this@Relay) {
if (file == null) return null
sourceCount++
}
return RelaySource()
}
internal inner class RelaySource : Source {
private val timeout = Timeout()
/** The operator to read and write the shared file. Null if this source is closed. */
private var fileOperator: FileOperator? = FileOperator(file!!.channel)
/** The next byte to read. This is always less than or equal to [upstreamPos]. */
private var sourcePos = 0L
/**
* Selects where to find the bytes for a read and read them. This is one of three sources.
*
* ## Upstream
*
* In this case the current thread is assigned as the upstream reader. We read bytes from
* upstream and copy them to both the file and to the buffer. Finally we release the upstream
* reader lock and return the new bytes.
*
* ## The file
*
* In this case we copy bytes from the file to the [sink].
*
* ## The buffer
*
* In this case the bytes are immediately copied into [sink] and the number of bytes copied is
* returned.
*
* If upstream would be selected but another thread is already reading upstream this will
* block until that read completes. It is possible to time out while waiting for that.
*/
@Throws(IOException::class)
override fun read(
sink: Buffer,
byteCount: Long,
): Long {
check(fileOperator != null)
val source: Int =
synchronized(this@Relay) {
// We need new data from upstream.
while (true) {
val upstreamPos = this@Relay.upstreamPos
if (sourcePos != upstreamPos) break
// No more data upstream. We're done.
if (complete) return -1L
// Another thread is already reading. Wait for that.
if (upstreamReader != null) {
timeout.waitUntilNotified(this@Relay)
continue
}
// We will do the read.
upstreamReader = Thread.currentThread()
return@synchronized SOURCE_UPSTREAM
}
val bufferPos = upstreamPos - buffer.size
// Bytes of the read precede the buffer. Read from the file.
if (sourcePos < bufferPos) {
return@synchronized SOURCE_FILE
}
// The buffer has the data we need. Read from there and return immediately.
val bytesToRead = minOf(byteCount, upstreamPos - sourcePos)
buffer.copyTo(sink, sourcePos - bufferPos, bytesToRead)
sourcePos += bytesToRead
return bytesToRead
}
// Read from the file.
if (source == SOURCE_FILE) {
val bytesToRead = minOf(byteCount, upstreamPos - sourcePos)
fileOperator!!.read(FILE_HEADER_SIZE + sourcePos, sink, bytesToRead)
sourcePos += bytesToRead
return bytesToRead
}
// Read from upstream. This always reads a full buffer: that might be more than what the
// current call to Source.read() has requested.
try {
val upstreamBytesRead = upstream!!.read(upstreamBuffer, bufferMaxSize)
// If we've exhausted upstream, we're done.
if (upstreamBytesRead == -1L) {
commit(upstreamPos)
return -1L
}
// Update this source and prepare this call's result.
val bytesRead = minOf(upstreamBytesRead, byteCount)
upstreamBuffer.copyTo(sink, 0, bytesRead)
sourcePos += bytesRead
// Append the upstream bytes to the file.
fileOperator!!.write(
FILE_HEADER_SIZE + upstreamPos,
upstreamBuffer.clone(),
upstreamBytesRead,
)
synchronized(this@Relay) {
// Append new upstream bytes into the buffer. Trim it to its max size.
buffer.write(upstreamBuffer, upstreamBytesRead)
if (buffer.size > bufferMaxSize) {
buffer.skip(buffer.size - bufferMaxSize)
}
// Now that the file and buffer have bytes, adjust upstreamPos.
this@Relay.upstreamPos += upstreamBytesRead
}
return bytesRead
} finally {
synchronized(this@Relay) {
upstreamReader = null
this@Relay.notifyAll()
}
}
}
override fun timeout(): Timeout = timeout
@Throws(IOException::class)
override fun close() {
if (fileOperator == null) return // Already closed.
fileOperator = null
var fileToClose: RandomAccessFile? = null
synchronized(this@Relay) {
sourceCount--
if (sourceCount == 0) {
fileToClose = file
file = null
}
}
fileToClose?.closeQuietly()
}
}
companion object {
// TODO(jwilson): what to do about timeouts? They could be different and unfortunately when any
// timeout is hit we like to tear down the whole stream.
private const val SOURCE_UPSTREAM = 1
private const val SOURCE_FILE = 2
@JvmField val PREFIX_CLEAN = "OkHttp cache v1\n".encodeUtf8()
@JvmField val PREFIX_DIRTY = "OkHttp DIRTY :(\n".encodeUtf8()
private const val FILE_HEADER_SIZE = 32L
/**
* Creates a new relay that reads a live stream from [upstream], using [file] to share that data
* with other sources.
*
* **Warning:** callers to this method must immediately call [newSource] to create a source and
* close that when they're done. Otherwise a handle to [file] will be leaked.
*/
@Throws(IOException::class)
fun edit(
file: File,
upstream: Source,
metadata: ByteString,
bufferMaxSize: Long,
): Relay {
val randomAccessFile = RandomAccessFile(file, "rw")
val result = Relay(randomAccessFile, upstream, 0L, metadata, bufferMaxSize)
// Write a dirty header. That way if we crash we won't attempt to recover this.
randomAccessFile.setLength(0L)
result.writeHeader(PREFIX_DIRTY, -1L, -1L)
return result
}
/**
* Creates a relay that reads a recorded stream from [file].
*
* **Warning:** callers to this method must immediately call [newSource] to create a source and
* close that when they're done. Otherwise a handle to [file] will be leaked.
*/
@Throws(IOException::class)
fun read(file: File): Relay {
val randomAccessFile = RandomAccessFile(file, "rw")
val fileOperator = FileOperator(randomAccessFile.channel)
// Read the header.
val header = Buffer()
fileOperator.read(0, header, FILE_HEADER_SIZE)
val prefix = header.readByteString(PREFIX_CLEAN.size.toLong())
if (prefix != PREFIX_CLEAN) throw IOException("unreadable cache file")
val upstreamSize = header.readLong()
val metadataSize = header.readLong()
// Read the metadata.
val metadataBuffer = Buffer()
fileOperator.read(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadataSize)
val metadata = metadataBuffer.readByteString()
// Return the result.
return Relay(randomAccessFile, null, upstreamSize, metadata, 0L)
}
}
}

View File

@@ -1,214 +0,0 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.cache2
import assertk.assertThat
import assertk.assertions.isEqualTo
import java.io.File
import java.io.RandomAccessFile
import java.util.Random
import kotlin.test.assertFailsWith
import okio.Buffer
import okio.ByteString
import okio.ByteString.Companion.encodeUtf8
import okio.buffer
import okio.sink
import okio.source
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
class FileOperatorTest {
@TempDir
var tempDir: File? = null
private var file: File? = null
private var randomAccessFile: RandomAccessFile? = null
@BeforeEach
fun setUp() {
file = File(tempDir, "test")
randomAccessFile = RandomAccessFile(file, "rw")
}
@AfterEach
fun tearDown() {
randomAccessFile!!.close()
}
@Test
fun read() {
write("Hello, World".encodeUtf8())
val operator =
FileOperator(
randomAccessFile!!.getChannel(),
)
val buffer = Buffer()
operator.read(0, buffer, 5)
assertThat(buffer.readUtf8()).isEqualTo("Hello")
operator.read(4, buffer, 5)
assertThat(buffer.readUtf8()).isEqualTo("o, Wo")
}
@Test
fun write() {
val operator =
FileOperator(
randomAccessFile!!.getChannel(),
)
val buffer1 = Buffer().writeUtf8("Hello, World")
operator.write(0, buffer1, 5)
assertThat(buffer1.readUtf8()).isEqualTo(", World")
val buffer2 = Buffer().writeUtf8("icopter!")
operator.write(3, buffer2, 7)
assertThat(buffer2.readUtf8()).isEqualTo("!")
assertThat<ByteString>(snapshot()).isEqualTo("Helicopter".encodeUtf8())
}
@Test
fun readAndWrite() {
val operator =
FileOperator(
randomAccessFile!!.getChannel(),
)
write("woman god creates dinosaurs destroys. ".encodeUtf8())
val buffer = Buffer()
operator.read(6, buffer, 21)
operator.read(36, buffer, 1)
operator.read(5, buffer, 5)
operator.read(28, buffer, 8)
operator.read(17, buffer, 10)
operator.read(36, buffer, 2)
operator.read(2, buffer, 4)
operator.write(0, buffer, buffer.size)
operator.read(0, buffer, 12)
operator.read(47, buffer, 3)
operator.read(45, buffer, 2)
operator.read(47, buffer, 3)
operator.read(26, buffer, 10)
operator.read(23, buffer, 3)
operator.write(47, buffer, buffer.size)
operator.read(62, buffer, 6)
operator.read(4, buffer, 19)
operator.write(80, buffer, buffer.size)
assertThat(snapshot()).isEqualTo(
(
"" +
"god creates dinosaurs. " +
"god destroys dinosaurs. " +
"god creates man. " +
"man destroys god. " +
"man creates dinosaurs. "
).encodeUtf8(),
)
}
@Test
fun multipleOperatorsShareOneFile() {
val operatorA =
FileOperator(
randomAccessFile!!.getChannel(),
)
val operatorB =
FileOperator(
randomAccessFile!!.getChannel(),
)
val bufferA = Buffer()
val bufferB = Buffer()
bufferA.writeUtf8("Dodgson!\n")
operatorA.write(0, bufferA, 9)
bufferB.writeUtf8("You shouldn't use my name.\n")
operatorB.write(9, bufferB, 27)
bufferA.writeUtf8("Dodgson, we've got Dodgson here!\n")
operatorA.write(36, bufferA, 33)
operatorB.read(0, bufferB, 9)
assertThat(bufferB.readUtf8()).isEqualTo("Dodgson!\n")
operatorA.read(9, bufferA, 27)
assertThat(bufferA.readUtf8()).isEqualTo("You shouldn't use my name.\n")
operatorB.read(36, bufferB, 33)
assertThat(bufferB.readUtf8()).isEqualTo("Dodgson, we've got Dodgson here!\n")
}
@Test
fun largeRead() {
val data = randomByteString(1000000)
write(data)
val operator =
FileOperator(
randomAccessFile!!.getChannel(),
)
val buffer = Buffer()
operator.read(0, buffer, data.size.toLong())
assertThat(buffer.readByteString()).isEqualTo(data)
}
@Test
fun largeWrite() {
val data = randomByteString(1000000)
val operator =
FileOperator(
randomAccessFile!!.getChannel(),
)
val buffer = Buffer().write(data)
operator.write(0, buffer, data.size.toLong())
assertThat(snapshot()).isEqualTo(data)
}
@Test
fun readBounds() {
val operator =
FileOperator(
randomAccessFile!!.getChannel(),
)
val buffer = Buffer()
assertFailsWith<IndexOutOfBoundsException> {
operator.read(0, buffer, -1L)
}
}
@Test
fun writeBounds() {
val operator =
FileOperator(
randomAccessFile!!.getChannel(),
)
val buffer = Buffer().writeUtf8("abc")
assertFailsWith<IndexOutOfBoundsException> {
operator.write(0, buffer, -1L)
}
assertFailsWith<IndexOutOfBoundsException> {
operator.write(0, buffer, 4L)
}
}
private fun randomByteString(byteCount: Int): ByteString {
val bytes = ByteArray(byteCount)
Random(0).nextBytes(bytes)
return ByteString.of(*bytes)
}
private fun snapshot(): ByteString {
randomAccessFile!!.getChannel().force(false)
val source = file!!.source().buffer()
return source.readByteString()
}
private fun write(data: ByteString) {
val sink = file!!.sink().buffer()
sink.write(data)
sink.close()
}
}

View File

@@ -1,240 +0,0 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.cache2
import assertk.assertThat
import assertk.assertions.isEqualTo
import assertk.assertions.isFalse
import assertk.assertions.isNull
import assertk.assertions.isTrue
import java.io.File
import java.io.IOException
import java.util.concurrent.Callable
import java.util.concurrent.Executors
import kotlin.test.assertFailsWith
import okhttp3.TestUtil.threadFactory
import okhttp3.internal.cache2.Relay.Companion.edit
import okhttp3.internal.cache2.Relay.Companion.read
import okio.Buffer
import okio.ByteString
import okio.ByteString.Companion.encodeUtf8
import okio.Pipe
import okio.Source
import okio.buffer
import okio.source
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
@Tag("Slowish")
class RelayTest {
@TempDir
var tempDir: File? = null
private val executor = Executors.newCachedThreadPool(threadFactory("RelayTest"))
private val metadata: ByteString = "great metadata!".encodeUtf8()
private lateinit var file: File
@BeforeEach
fun setUp() {
file = File(tempDir, "test")
}
@AfterEach
fun tearDown() {
executor.shutdown()
}
@Test
fun singleSource() {
val upstream = Buffer()
upstream.writeUtf8("abcdefghijklm")
val relay = edit(file, upstream, metadata, 1024)
val source = relay.newSource()
val sourceBuffer = Buffer()
assertThat(source!!.read(sourceBuffer, 5)).isEqualTo(5)
assertThat(sourceBuffer.readUtf8()).isEqualTo("abcde")
assertThat(source.read(sourceBuffer, 1024)).isEqualTo(8)
assertThat(sourceBuffer.readUtf8()).isEqualTo("fghijklm")
assertThat(source.read(sourceBuffer, 1024)).isEqualTo(-1)
assertThat(sourceBuffer.size).isEqualTo(0)
source.close()
assertThat(relay.isClosed).isTrue()
assertFile(Relay.PREFIX_CLEAN, 13L, metadata.size, "abcdefghijklm", metadata)
}
@Test
fun multipleSources() {
val upstream = Buffer()
upstream.writeUtf8("abcdefghijklm")
val relay = edit(file, upstream, metadata, 1024)
val source1 = relay.newSource()!!.buffer()
val source2 = relay.newSource()!!.buffer()
assertThat(source1.readUtf8()).isEqualTo("abcdefghijklm")
assertThat(source2.readUtf8()).isEqualTo("abcdefghijklm")
source1.close()
source2.close()
assertThat(relay.isClosed).isTrue()
assertFile(Relay.PREFIX_CLEAN, 13L, metadata.size, "abcdefghijklm", metadata)
}
@Test
fun readFromBuffer() {
val upstream = Buffer()
upstream.writeUtf8("abcdefghij")
val relay = edit(file, upstream, metadata, 5)
val source1 = relay.newSource()!!.buffer()
val source2 = relay.newSource()!!.buffer()
assertThat(source1.readUtf8(5)).isEqualTo("abcde")
assertThat(source2.readUtf8(5)).isEqualTo("abcde")
assertThat(source2.readUtf8(5)).isEqualTo("fghij")
assertThat(source1.readUtf8(5)).isEqualTo("fghij")
assertThat(source1.exhausted()).isTrue()
assertThat(source2.exhausted()).isTrue()
source1.close()
source2.close()
assertThat(relay.isClosed).isTrue()
assertFile(Relay.PREFIX_CLEAN, 10L, metadata.size, "abcdefghij", metadata)
}
@Test
fun readFromFile() {
val upstream = Buffer()
upstream.writeUtf8("abcdefghijklmnopqrst")
val relay = edit(file, upstream, metadata, 5)
val source1 = relay.newSource()!!.buffer()
val source2 = relay.newSource()!!.buffer()
assertThat(source1.readUtf8(10)).isEqualTo("abcdefghij")
assertThat(source2.readUtf8(10)).isEqualTo("abcdefghij")
assertThat(source2.readUtf8(10)).isEqualTo("klmnopqrst")
assertThat(source1.readUtf8(10)).isEqualTo("klmnopqrst")
assertThat(source1.exhausted()).isTrue()
assertThat(source2.exhausted()).isTrue()
source1.close()
source2.close()
assertThat(relay.isClosed).isTrue()
assertFile(Relay.PREFIX_CLEAN, 20L, metadata.size, "abcdefghijklmnopqrst", metadata)
}
@Test
fun readAfterEdit() {
val upstream = Buffer()
upstream.writeUtf8("abcdefghij")
val relay1 = edit(file, upstream, metadata, 5)
val source1 = relay1.newSource()!!.buffer()
assertThat(source1.readUtf8(10)).isEqualTo("abcdefghij")
assertThat(source1.exhausted()).isTrue()
source1.close()
assertThat(relay1.isClosed).isTrue()
// Since relay1 is closed, new sources cannot be created.
assertThat(relay1.newSource()).isNull()
val relay2 = read(file)
assertThat(relay2.metadata()).isEqualTo(metadata)
val source2 = relay2.newSource()!!.buffer()
assertThat(source2.readUtf8(10)).isEqualTo("abcdefghij")
assertThat(source2.exhausted()).isTrue()
source2.close()
assertThat(relay2.isClosed).isTrue()
// Since relay2 is closed, new sources cannot be created.
assertThat(relay2.newSource()).isNull()
assertFile(Relay.PREFIX_CLEAN, 10L, metadata.size, "abcdefghij", metadata)
}
@Test
fun closeBeforeExhaustLeavesDirtyFile() {
val upstream = Buffer()
upstream.writeUtf8("abcdefghij")
val relay1 = edit(file, upstream, metadata, 5)
val source1 = relay1.newSource()!!.buffer()
assertThat(source1.readUtf8(10)).isEqualTo("abcdefghij")
source1.close() // Not exhausted!
assertThat(relay1.isClosed).isTrue()
assertFailsWith<IOException> {
read(file)
}.also { expected ->
assertThat(expected.message).isEqualTo("unreadable cache file")
}
assertFile(Relay.PREFIX_DIRTY, -1L, -1, null, null)
}
@Test
fun redundantCallsToCloseAreIgnored() {
val upstream = Buffer()
upstream.writeUtf8("abcde")
val relay = edit(file, upstream, metadata, 1024)
val source1 = relay.newSource()
val source2 = relay.newSource()
source1!!.close()
source1.close() // Unnecessary. Shouldn't decrement the reference count.
assertThat(relay.isClosed).isFalse()
source2!!.close()
assertThat(relay.isClosed).isTrue()
assertFile(Relay.PREFIX_DIRTY, -1L, -1, null, null)
}
@Test
fun racingReaders() {
val pipe = Pipe(1024)
val sink = pipe.sink.buffer()
val relay = edit(file, pipe.source, metadata, 5)
val future1 = executor.submit(sourceReader(relay.newSource()))
val future2 = executor.submit(sourceReader(relay.newSource()))
Thread.sleep(500)
sink.writeUtf8("abcdefghij")
Thread.sleep(500)
sink.writeUtf8("klmnopqrst")
sink.close()
assertThat<ByteString>(future1.get())
.isEqualTo("abcdefghijklmnopqrst".encodeUtf8())
assertThat<ByteString>(future2.get())
.isEqualTo("abcdefghijklmnopqrst".encodeUtf8())
assertThat(relay.isClosed).isTrue()
assertFile(Relay.PREFIX_CLEAN, 20L, metadata.size, "abcdefghijklmnopqrst", metadata)
}
/** Returns a callable that reads all of source, closes it, and returns the bytes. */
private fun sourceReader(source: Source?): Callable<ByteString> =
Callable {
val buffer = Buffer()
while (source!!.read(buffer, 16384) != -1L) {
}
source.close()
buffer.readByteString()
}
private fun assertFile(
prefix: ByteString,
upstreamSize: Long,
metadataSize: Int,
upstream: String?,
metadata: ByteString?,
) {
val source = file.source().buffer()
assertThat(source.readByteString(prefix.size.toLong())).isEqualTo(prefix)
assertThat(source.readLong()).isEqualTo(upstreamSize)
assertThat(source.readLong()).isEqualTo(metadataSize.toLong())
if (upstream != null) {
assertThat(source.readUtf8(upstreamSize)).isEqualTo(upstream)
}
if (metadata != null) {
assertThat(source.readByteString(metadataSize.toLong())).isEqualTo(metadata)
}
source.close()
}
}