diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/AbstractHttpOutputStream.java b/okhttp/src/main/java/com/squareup/okhttp/internal/AbstractOutputStream.java similarity index 81% rename from okhttp/src/main/java/com/squareup/okhttp/internal/http/AbstractHttpOutputStream.java rename to okhttp/src/main/java/com/squareup/okhttp/internal/AbstractOutputStream.java index 90675b06b..78c9691e6 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/AbstractHttpOutputStream.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/AbstractOutputStream.java @@ -14,18 +14,18 @@ * limitations under the License. */ -package com.squareup.okhttp.internal.http; +package com.squareup.okhttp.internal; import java.io.IOException; import java.io.OutputStream; /** - * An output stream for the body of an HTTP request. + * An output stream for an HTTP request body. * *

Since a single socket's output stream may be used to write multiple HTTP * requests to the same server, subclasses should not close the socket stream. */ -abstract class AbstractHttpOutputStream extends OutputStream { +public abstract class AbstractOutputStream extends OutputStream { protected boolean closed; @Override public final void write(int data) throws IOException { @@ -37,4 +37,9 @@ abstract class AbstractHttpOutputStream extends OutputStream { throw new IOException("stream closed"); } } + + /** Returns true if this stream was closed locally. */ + public boolean isClosed() { + return closed; + } } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/FaultRecoveringOutputStream.java b/okhttp/src/main/java/com/squareup/okhttp/internal/FaultRecoveringOutputStream.java index 0fbe7d618..c32b27aeb 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/FaultRecoveringOutputStream.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/FaultRecoveringOutputStream.java @@ -32,13 +32,12 @@ import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; * replacement stream each time an {@link IOException} is encountered on the * current stream. */ -public abstract class FaultRecoveringOutputStream extends OutputStream { +public abstract class FaultRecoveringOutputStream extends AbstractOutputStream { private final int maxReplayBufferLength; /** Bytes to transmit on the replacement stream, or null if no recovery is possible. */ private ByteArrayOutputStream replayBuffer; private OutputStream out; - private boolean closed; /** * @param maxReplayBufferLength the maximum number of successfully written @@ -52,10 +51,6 @@ public abstract class FaultRecoveringOutputStream extends OutputStream { this.out = out; } - @Override public final void write(int data) throws IOException { - write(new byte[] { (byte) data }); - } - @Override public final void write(byte[] buffer, int offset, int count) throws IOException { if (closed) throw new IOException("stream closed"); checkOffsetAndCount(buffer.length, offset, count); @@ -119,15 +114,13 @@ public abstract class FaultRecoveringOutputStream extends OutputStream { } while (true) { - OutputStream replacementStream = replacementStream(e); - if (replacementStream == null) { - return false; - } + OutputStream replacementStream = null; try { - replayBuffer.writeTo(replacementStream); - // We've found a replacement that works! - Util.closeQuietly(out); - out = replacementStream; + replacementStream = replacementStream(e); + if (replacementStream == null) { + return false; + } + replaceStream(replacementStream); return true; } catch (IOException replacementStreamFailure) { // The replacement was also broken. Loop to ask for another replacement. @@ -137,11 +130,34 @@ public abstract class FaultRecoveringOutputStream extends OutputStream { } } + /** + * Returns true if errors in the underlying stream can currently be recovered. + */ + public boolean isRecoverable() { + return replayBuffer != null; + } + + /** + * Replaces the current output stream with {@code replacementStream}, writing + * any replay bytes to it if they exist. The current output stream is closed. + */ + public final void replaceStream(OutputStream replacementStream) throws IOException { + if (!isRecoverable()) { + throw new IllegalStateException(); + } + if (this.out == replacementStream) { + return; // Don't replace a stream with itself. + } + replayBuffer.writeTo(replacementStream); + Util.closeQuietly(out); + out = replacementStream; + } + /** * Returns a replacement output stream to recover from {@code e} thrown by the * previous stream. Returns a new OutputStream if recovery was successful, in * which case all previously-written data will be replayed. Returns null if * the failure cannot be recovered. */ - protected abstract OutputStream replacementStream(IOException e); + protected abstract OutputStream replacementStream(IOException e) throws IOException; } diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java index e3efac9e2..f6d77b250 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpTransport.java @@ -17,6 +17,7 @@ package com.squareup.okhttp.internal.http; import com.squareup.okhttp.Connection; +import com.squareup.okhttp.internal.AbstractOutputStream; import com.squareup.okhttp.internal.Util; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -139,7 +140,7 @@ public final class HttpTransport implements Transport { } // We cannot reuse sockets that have incomplete output. - if (requestBodyOut != null && !((AbstractHttpOutputStream) requestBodyOut).closed) { + if (requestBodyOut != null && !((AbstractOutputStream) requestBodyOut).isClosed()) { return false; } @@ -209,7 +210,7 @@ public final class HttpTransport implements Transport { } /** An HTTP body with a fixed length known in advance. */ - private static final class FixedLengthOutputStream extends AbstractHttpOutputStream { + private static final class FixedLengthOutputStream extends AbstractOutputStream { private final OutputStream socketOut; private int bytesRemaining; @@ -251,7 +252,7 @@ public final class HttpTransport implements Transport { * buffered until {@code maxChunkLength} bytes are ready, at which point the * chunk is written and the buffer is cleared. */ - private static final class ChunkedOutputStream extends AbstractHttpOutputStream { + private static final class ChunkedOutputStream extends AbstractOutputStream { private static final byte[] CRLF = { '\r', '\n' }; private static final byte[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java index 1da7cf513..eabe649db 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/HttpURLConnectionImpl.java @@ -21,6 +21,8 @@ import com.squareup.okhttp.Connection; import com.squareup.okhttp.ConnectionPool; import com.squareup.okhttp.OkHttpClient; import com.squareup.okhttp.Route; +import com.squareup.okhttp.internal.AbstractOutputStream; +import com.squareup.okhttp.internal.FaultRecoveringOutputStream; import com.squareup.okhttp.internal.Util; import java.io.FileNotFoundException; import java.io.IOException; @@ -71,6 +73,13 @@ public class HttpURLConnectionImpl extends HttpURLConnection { */ private static final int MAX_REDIRECTS = 20; + /** + * The minimum number of request body bytes to transmit before we're willing + * to let a routine {@link IOException} bubble up to the user. This is used to + * size a buffer for data that will be replayed upon error. + */ + private static final int MAX_REPLAY_BUFFER_LENGTH = 8192; + private final boolean followProtocolRedirects; /** The proxy requested by the client, or null for a proxy to be selected automatically. */ @@ -85,10 +94,10 @@ public class HttpURLConnectionImpl extends HttpURLConnection { HostnameVerifier hostnameVerifier; final Set failedRoutes; - private final RawHeaders rawRequestHeaders = new RawHeaders(); private int redirectionCount; + private FaultRecoveringOutputStream faultRecoveringRequestBody; protected IOException httpEngineFailure; protected HttpEngine httpEngine; @@ -225,14 +234,29 @@ public class HttpURLConnectionImpl extends HttpURLConnection { @Override public final OutputStream getOutputStream() throws IOException { connect(); - OutputStream result = httpEngine.getRequestBody(); - if (result == null) { + OutputStream out = httpEngine.getRequestBody(); + if (out == null) { throw new ProtocolException("method does not support a request body: " + method); } else if (httpEngine.hasResponse()) { throw new ProtocolException("cannot write request body after response has been read"); } - return result; + if (faultRecoveringRequestBody == null) { + faultRecoveringRequestBody = new FaultRecoveringOutputStream(MAX_REPLAY_BUFFER_LENGTH, out) { + @Override protected OutputStream replacementStream(IOException e) throws IOException { + if (httpEngine.getRequestBody() instanceof AbstractOutputStream + && ((AbstractOutputStream) httpEngine.getRequestBody()).isClosed()) { + return null; // Don't recover once the underlying stream has been closed. + } + if (handleFailure(e)) { + return httpEngine.getRequestBody(); + } + return null; // This is a permanent failure. + } + }; + } + + return faultRecoveringRequestBody; } @Override public final Permission getPermission() throws IOException { @@ -362,29 +386,50 @@ public class HttpURLConnectionImpl extends HttpURLConnection { } return true; } catch (IOException e) { - RouteSelector routeSelector = httpEngine.routeSelector; - if (routeSelector != null && httpEngine.connection != null) { - routeSelector.connectFailed(httpEngine.connection, e); - } - if (routeSelector == null && httpEngine.connection == null) { - throw e; // If we failed before finding a route or a connection, give up. - } - - // The connection failure isn't fatal if there's another route to attempt. - OutputStream requestBody = httpEngine.getRequestBody(); - if ((routeSelector == null || routeSelector.hasNext()) && isRecoverable(e) && (requestBody - == null || requestBody instanceof RetryableOutputStream)) { - httpEngine.release(true); - httpEngine = - newHttpEngine(method, rawRequestHeaders, null, (RetryableOutputStream) requestBody); - httpEngine.routeSelector = routeSelector; // Keep the same routeSelector. + if (handleFailure(e)) { return false; + } else { + throw e; } - httpEngineFailure = e; - throw e; } } + /** + * Report and attempt to recover from {@code e}. Returns true if the HTTP + * engine was replaced and the request should be retried. Otherwise the + * failure is permanent. + */ + private boolean handleFailure(IOException e) throws IOException { + RouteSelector routeSelector = httpEngine.routeSelector; + if (routeSelector != null && httpEngine.connection != null) { + routeSelector.connectFailed(httpEngine.connection, e); + } + + OutputStream requestBody = httpEngine.getRequestBody(); + boolean canRetryRequestBody = requestBody == null + || requestBody instanceof RetryableOutputStream + || (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable()); + if (routeSelector == null && httpEngine.connection == null // No connection. + || routeSelector != null && !routeSelector.hasNext() // No more routes to attempt. + || !isRecoverable(e) + || !canRetryRequestBody) { + httpEngineFailure = e; + return false; + } + + httpEngine.release(true); + RetryableOutputStream retryableOutputStream = requestBody instanceof RetryableOutputStream + ? (RetryableOutputStream) requestBody + : null; + httpEngine = newHttpEngine(method, rawRequestHeaders, null, retryableOutputStream); + httpEngine.routeSelector = routeSelector; // Keep the same routeSelector. + if (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable()) { + httpEngine.sendRequest(); + faultRecoveringRequestBody.replaceStream(httpEngine.getRequestBody()); + } + return true; + } + private boolean isRecoverable(IOException e) { // If the problem was a CertificateException from the X509TrustManager, // do not retry, we didn't have an abrupt server initiated exception. diff --git a/okhttp/src/main/java/com/squareup/okhttp/internal/http/RetryableOutputStream.java b/okhttp/src/main/java/com/squareup/okhttp/internal/http/RetryableOutputStream.java index 325327db0..5eb6b7645 100644 --- a/okhttp/src/main/java/com/squareup/okhttp/internal/http/RetryableOutputStream.java +++ b/okhttp/src/main/java/com/squareup/okhttp/internal/http/RetryableOutputStream.java @@ -16,6 +16,7 @@ package com.squareup.okhttp.internal.http; +import com.squareup.okhttp.internal.AbstractOutputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -28,7 +29,7 @@ import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; * the post body to be transparently re-sent if the HTTP request must be * sent multiple times. */ -final class RetryableOutputStream extends AbstractHttpOutputStream { +final class RetryableOutputStream extends AbstractOutputStream { private final int limit; private final ByteArrayOutputStream content; diff --git a/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java b/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java index 332de5eba..71d3668d3 100644 --- a/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java +++ b/okhttp/src/test/java/com/squareup/okhttp/internal/http/URLConnectionTest.java @@ -57,6 +57,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -2286,6 +2287,97 @@ public final class URLConnectionTest { assertEquals(-1, in.read()); } + @Test public void postFailsWithBufferedRequestForSmallRequest() throws Exception { + reusedConnectionFailsWithPost(TransferKind.END_OF_STREAM, 1024); + } + + // This test is ignored because we don't (yet) reliably recover for large request bodies. + @Test @Ignore public void postFailsWithBufferedRequestForLargeRequest() throws Exception { + reusedConnectionFailsWithPost(TransferKind.END_OF_STREAM, 16384); + } + + @Test public void postFailsWithChunkedRequestForSmallRequest() throws Exception { + reusedConnectionFailsWithPost(TransferKind.CHUNKED, 1024); + } + + // This test is ignored because we don't (yet) reliably recover for large request bodies. + @Test @Ignore public void postFailsWithChunkedRequestForLargeRequest() throws Exception { + reusedConnectionFailsWithPost(TransferKind.CHUNKED, 16384); + } + + @Test public void postFailsWithFixedLengthRequestForSmallRequest() throws Exception { + reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 1024); + } + + // This test is ignored because we don't (yet) reliably recover for large request bodies. + @Test @Ignore public void postFailsWithFixedLengthRequestForLargeRequest() throws Exception { + reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 16384); + } + + private void reusedConnectionFailsWithPost(TransferKind transferKind, int requestSize) + throws Exception { + server.enqueue(new MockResponse().setBody("A").setSocketPolicy(SHUTDOWN_INPUT_AT_END)); + server.enqueue(new MockResponse().setBody("B")); + server.play(); + + assertContent("A", client.open(server.getUrl("/a"))); + + // If the request body is larger than OkHttp's replay buffer, the failure may still occur. + byte[] requestBody = new byte[requestSize]; + new Random(0).nextBytes(requestBody); + + HttpURLConnection connection = client.open(server.getUrl("/b")); + connection.setRequestMethod("POST"); + transferKind.setForRequest(connection, requestBody.length); + for (int i = 0; i < requestBody.length; i += 1024) { + connection.getOutputStream().write(requestBody, i, 1024); + } + connection.getOutputStream().close(); + assertContent("B", connection); + + RecordedRequest requestA = server.takeRequest(); + assertEquals("/a", requestA.getPath()); + RecordedRequest requestB = server.takeRequest(); + assertEquals("/b", requestB.getPath()); + assertEquals(Arrays.toString(requestBody), Arrays.toString(requestB.getBody())); + } + + @Test public void fullyBufferedPostIsTooShort() throws Exception { + server.enqueue(new MockResponse().setBody("A")); + server.play(); + + HttpURLConnection connection = client.open(server.getUrl("/b")); + connection.setRequestProperty("Content-Length", "4"); + connection.setRequestMethod("POST"); + OutputStream out = connection.getOutputStream(); + out.write('a'); + out.write('b'); + out.write('c'); + try { + out.close(); + fail(); + } catch (IOException expected) { + } + } + + @Test public void fullyBufferedPostIsTooLong() throws Exception { + server.enqueue(new MockResponse().setBody("A")); + server.play(); + + HttpURLConnection connection = client.open(server.getUrl("/b")); + connection.setRequestProperty("Content-Length", "3"); + connection.setRequestMethod("POST"); + OutputStream out = connection.getOutputStream(); + out.write('a'); + out.write('b'); + out.write('c'); + try { + out.write('d'); + fail(); + } catch (IOException expected) { + } + } + @Test @Ignore public void testPooledConnectionsDetectHttp10() { // TODO: write a test that shows pooled connections detect HTTP/1.0 (vs. HTTP/1.1) fail("TODO"); @@ -2413,7 +2505,7 @@ public final class URLConnectionTest { response.setBody(content); } @Override void setForRequest(HttpURLConnection connection, int contentLength) { - connection.setChunkedStreamingMode(contentLength); + connection.setFixedLengthStreamingMode(contentLength); } }, END_OF_STREAM() {