mirror of
https://github.com/square/okhttp.git
synced 2026-01-27 04:22:07 +03:00
Recover from failures while writing a POST body.
This introduces failure recovery in the output stream. It means we should be able to recover from any kind of failure that could be triggered by aggressive connection pooling. The recovery buffer is 8 KiB. I anticipate this should be more than enough to detect that an HTTP post is going to a black hole.
This commit is contained in:
@@ -14,18 +14,18 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.squareup.okhttp.internal.http;
|
||||
package com.squareup.okhttp.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
* An output stream for the body of an HTTP request.
|
||||
* An output stream for an HTTP request body.
|
||||
*
|
||||
* <p>Since a single socket's output stream may be used to write multiple HTTP
|
||||
* requests to the same server, subclasses should not close the socket stream.
|
||||
*/
|
||||
abstract class AbstractHttpOutputStream extends OutputStream {
|
||||
public abstract class AbstractOutputStream extends OutputStream {
|
||||
protected boolean closed;
|
||||
|
||||
@Override public final void write(int data) throws IOException {
|
||||
@@ -37,4 +37,9 @@ abstract class AbstractHttpOutputStream extends OutputStream {
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if this stream was closed locally. */
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
}
|
||||
@@ -32,13 +32,12 @@ import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
|
||||
* replacement stream each time an {@link IOException} is encountered on the
|
||||
* current stream.
|
||||
*/
|
||||
public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
public abstract class FaultRecoveringOutputStream extends AbstractOutputStream {
|
||||
private final int maxReplayBufferLength;
|
||||
|
||||
/** Bytes to transmit on the replacement stream, or null if no recovery is possible. */
|
||||
private ByteArrayOutputStream replayBuffer;
|
||||
private OutputStream out;
|
||||
private boolean closed;
|
||||
|
||||
/**
|
||||
* @param maxReplayBufferLength the maximum number of successfully written
|
||||
@@ -52,10 +51,6 @@ public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
@Override public final void write(int data) throws IOException {
|
||||
write(new byte[] { (byte) data });
|
||||
}
|
||||
|
||||
@Override public final void write(byte[] buffer, int offset, int count) throws IOException {
|
||||
if (closed) throw new IOException("stream closed");
|
||||
checkOffsetAndCount(buffer.length, offset, count);
|
||||
@@ -119,15 +114,13 @@ public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
}
|
||||
|
||||
while (true) {
|
||||
OutputStream replacementStream = replacementStream(e);
|
||||
if (replacementStream == null) {
|
||||
return false;
|
||||
}
|
||||
OutputStream replacementStream = null;
|
||||
try {
|
||||
replayBuffer.writeTo(replacementStream);
|
||||
// We've found a replacement that works!
|
||||
Util.closeQuietly(out);
|
||||
out = replacementStream;
|
||||
replacementStream = replacementStream(e);
|
||||
if (replacementStream == null) {
|
||||
return false;
|
||||
}
|
||||
replaceStream(replacementStream);
|
||||
return true;
|
||||
} catch (IOException replacementStreamFailure) {
|
||||
// The replacement was also broken. Loop to ask for another replacement.
|
||||
@@ -137,11 +130,34 @@ public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if errors in the underlying stream can currently be recovered.
|
||||
*/
|
||||
public boolean isRecoverable() {
|
||||
return replayBuffer != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the current output stream with {@code replacementStream}, writing
|
||||
* any replay bytes to it if they exist. The current output stream is closed.
|
||||
*/
|
||||
public final void replaceStream(OutputStream replacementStream) throws IOException {
|
||||
if (!isRecoverable()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
if (this.out == replacementStream) {
|
||||
return; // Don't replace a stream with itself.
|
||||
}
|
||||
replayBuffer.writeTo(replacementStream);
|
||||
Util.closeQuietly(out);
|
||||
out = replacementStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a replacement output stream to recover from {@code e} thrown by the
|
||||
* previous stream. Returns a new OutputStream if recovery was successful, in
|
||||
* which case all previously-written data will be replayed. Returns null if
|
||||
* the failure cannot be recovered.
|
||||
*/
|
||||
protected abstract OutputStream replacementStream(IOException e);
|
||||
protected abstract OutputStream replacementStream(IOException e) throws IOException;
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package com.squareup.okhttp.internal.http;
|
||||
|
||||
import com.squareup.okhttp.Connection;
|
||||
import com.squareup.okhttp.internal.AbstractOutputStream;
|
||||
import com.squareup.okhttp.internal.Util;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@@ -139,7 +140,7 @@ public final class HttpTransport implements Transport {
|
||||
}
|
||||
|
||||
// We cannot reuse sockets that have incomplete output.
|
||||
if (requestBodyOut != null && !((AbstractHttpOutputStream) requestBodyOut).closed) {
|
||||
if (requestBodyOut != null && !((AbstractOutputStream) requestBodyOut).isClosed()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -209,7 +210,7 @@ public final class HttpTransport implements Transport {
|
||||
}
|
||||
|
||||
/** An HTTP body with a fixed length known in advance. */
|
||||
private static final class FixedLengthOutputStream extends AbstractHttpOutputStream {
|
||||
private static final class FixedLengthOutputStream extends AbstractOutputStream {
|
||||
private final OutputStream socketOut;
|
||||
private int bytesRemaining;
|
||||
|
||||
@@ -251,7 +252,7 @@ public final class HttpTransport implements Transport {
|
||||
* buffered until {@code maxChunkLength} bytes are ready, at which point the
|
||||
* chunk is written and the buffer is cleared.
|
||||
*/
|
||||
private static final class ChunkedOutputStream extends AbstractHttpOutputStream {
|
||||
private static final class ChunkedOutputStream extends AbstractOutputStream {
|
||||
private static final byte[] CRLF = { '\r', '\n' };
|
||||
private static final byte[] HEX_DIGITS = {
|
||||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
|
||||
|
||||
@@ -21,6 +21,8 @@ import com.squareup.okhttp.Connection;
|
||||
import com.squareup.okhttp.ConnectionPool;
|
||||
import com.squareup.okhttp.OkHttpClient;
|
||||
import com.squareup.okhttp.Route;
|
||||
import com.squareup.okhttp.internal.AbstractOutputStream;
|
||||
import com.squareup.okhttp.internal.FaultRecoveringOutputStream;
|
||||
import com.squareup.okhttp.internal.Util;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
@@ -71,6 +73,13 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
*/
|
||||
private static final int MAX_REDIRECTS = 20;
|
||||
|
||||
/**
|
||||
* The minimum number of request body bytes to transmit before we're willing
|
||||
* to let a routine {@link IOException} bubble up to the user. This is used to
|
||||
* size a buffer for data that will be replayed upon error.
|
||||
*/
|
||||
private static final int MAX_REPLAY_BUFFER_LENGTH = 8192;
|
||||
|
||||
private final boolean followProtocolRedirects;
|
||||
|
||||
/** The proxy requested by the client, or null for a proxy to be selected automatically. */
|
||||
@@ -85,10 +94,10 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
HostnameVerifier hostnameVerifier;
|
||||
final Set<Route> failedRoutes;
|
||||
|
||||
|
||||
private final RawHeaders rawRequestHeaders = new RawHeaders();
|
||||
|
||||
private int redirectionCount;
|
||||
private FaultRecoveringOutputStream faultRecoveringRequestBody;
|
||||
|
||||
protected IOException httpEngineFailure;
|
||||
protected HttpEngine httpEngine;
|
||||
@@ -225,14 +234,29 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
@Override public final OutputStream getOutputStream() throws IOException {
|
||||
connect();
|
||||
|
||||
OutputStream result = httpEngine.getRequestBody();
|
||||
if (result == null) {
|
||||
OutputStream out = httpEngine.getRequestBody();
|
||||
if (out == null) {
|
||||
throw new ProtocolException("method does not support a request body: " + method);
|
||||
} else if (httpEngine.hasResponse()) {
|
||||
throw new ProtocolException("cannot write request body after response has been read");
|
||||
}
|
||||
|
||||
return result;
|
||||
if (faultRecoveringRequestBody == null) {
|
||||
faultRecoveringRequestBody = new FaultRecoveringOutputStream(MAX_REPLAY_BUFFER_LENGTH, out) {
|
||||
@Override protected OutputStream replacementStream(IOException e) throws IOException {
|
||||
if (httpEngine.getRequestBody() instanceof AbstractOutputStream
|
||||
&& ((AbstractOutputStream) httpEngine.getRequestBody()).isClosed()) {
|
||||
return null; // Don't recover once the underlying stream has been closed.
|
||||
}
|
||||
if (handleFailure(e)) {
|
||||
return httpEngine.getRequestBody();
|
||||
}
|
||||
return null; // This is a permanent failure.
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return faultRecoveringRequestBody;
|
||||
}
|
||||
|
||||
@Override public final Permission getPermission() throws IOException {
|
||||
@@ -362,29 +386,50 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
RouteSelector routeSelector = httpEngine.routeSelector;
|
||||
if (routeSelector != null && httpEngine.connection != null) {
|
||||
routeSelector.connectFailed(httpEngine.connection, e);
|
||||
}
|
||||
if (routeSelector == null && httpEngine.connection == null) {
|
||||
throw e; // If we failed before finding a route or a connection, give up.
|
||||
}
|
||||
|
||||
// The connection failure isn't fatal if there's another route to attempt.
|
||||
OutputStream requestBody = httpEngine.getRequestBody();
|
||||
if ((routeSelector == null || routeSelector.hasNext()) && isRecoverable(e) && (requestBody
|
||||
== null || requestBody instanceof RetryableOutputStream)) {
|
||||
httpEngine.release(true);
|
||||
httpEngine =
|
||||
newHttpEngine(method, rawRequestHeaders, null, (RetryableOutputStream) requestBody);
|
||||
httpEngine.routeSelector = routeSelector; // Keep the same routeSelector.
|
||||
if (handleFailure(e)) {
|
||||
return false;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
httpEngineFailure = e;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Report and attempt to recover from {@code e}. Returns true if the HTTP
|
||||
* engine was replaced and the request should be retried. Otherwise the
|
||||
* failure is permanent.
|
||||
*/
|
||||
private boolean handleFailure(IOException e) throws IOException {
|
||||
RouteSelector routeSelector = httpEngine.routeSelector;
|
||||
if (routeSelector != null && httpEngine.connection != null) {
|
||||
routeSelector.connectFailed(httpEngine.connection, e);
|
||||
}
|
||||
|
||||
OutputStream requestBody = httpEngine.getRequestBody();
|
||||
boolean canRetryRequestBody = requestBody == null
|
||||
|| requestBody instanceof RetryableOutputStream
|
||||
|| (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable());
|
||||
if (routeSelector == null && httpEngine.connection == null // No connection.
|
||||
|| routeSelector != null && !routeSelector.hasNext() // No more routes to attempt.
|
||||
|| !isRecoverable(e)
|
||||
|| !canRetryRequestBody) {
|
||||
httpEngineFailure = e;
|
||||
return false;
|
||||
}
|
||||
|
||||
httpEngine.release(true);
|
||||
RetryableOutputStream retryableOutputStream = requestBody instanceof RetryableOutputStream
|
||||
? (RetryableOutputStream) requestBody
|
||||
: null;
|
||||
httpEngine = newHttpEngine(method, rawRequestHeaders, null, retryableOutputStream);
|
||||
httpEngine.routeSelector = routeSelector; // Keep the same routeSelector.
|
||||
if (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable()) {
|
||||
httpEngine.sendRequest();
|
||||
faultRecoveringRequestBody.replaceStream(httpEngine.getRequestBody());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean isRecoverable(IOException e) {
|
||||
// If the problem was a CertificateException from the X509TrustManager,
|
||||
// do not retry, we didn't have an abrupt server initiated exception.
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package com.squareup.okhttp.internal.http;
|
||||
|
||||
import com.squareup.okhttp.internal.AbstractOutputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
@@ -28,7 +29,7 @@ import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
|
||||
* the post body to be transparently re-sent if the HTTP request must be
|
||||
* sent multiple times.
|
||||
*/
|
||||
final class RetryableOutputStream extends AbstractHttpOutputStream {
|
||||
final class RetryableOutputStream extends AbstractOutputStream {
|
||||
private final int limit;
|
||||
private final ByteArrayOutputStream content;
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@@ -2286,6 +2287,97 @@ public final class URLConnectionTest {
|
||||
assertEquals(-1, in.read());
|
||||
}
|
||||
|
||||
@Test public void postFailsWithBufferedRequestForSmallRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.END_OF_STREAM, 1024);
|
||||
}
|
||||
|
||||
// This test is ignored because we don't (yet) reliably recover for large request bodies.
|
||||
@Test @Ignore public void postFailsWithBufferedRequestForLargeRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.END_OF_STREAM, 16384);
|
||||
}
|
||||
|
||||
@Test public void postFailsWithChunkedRequestForSmallRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.CHUNKED, 1024);
|
||||
}
|
||||
|
||||
// This test is ignored because we don't (yet) reliably recover for large request bodies.
|
||||
@Test @Ignore public void postFailsWithChunkedRequestForLargeRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.CHUNKED, 16384);
|
||||
}
|
||||
|
||||
@Test public void postFailsWithFixedLengthRequestForSmallRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 1024);
|
||||
}
|
||||
|
||||
// This test is ignored because we don't (yet) reliably recover for large request bodies.
|
||||
@Test @Ignore public void postFailsWithFixedLengthRequestForLargeRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 16384);
|
||||
}
|
||||
|
||||
private void reusedConnectionFailsWithPost(TransferKind transferKind, int requestSize)
|
||||
throws Exception {
|
||||
server.enqueue(new MockResponse().setBody("A").setSocketPolicy(SHUTDOWN_INPUT_AT_END));
|
||||
server.enqueue(new MockResponse().setBody("B"));
|
||||
server.play();
|
||||
|
||||
assertContent("A", client.open(server.getUrl("/a")));
|
||||
|
||||
// If the request body is larger than OkHttp's replay buffer, the failure may still occur.
|
||||
byte[] requestBody = new byte[requestSize];
|
||||
new Random(0).nextBytes(requestBody);
|
||||
|
||||
HttpURLConnection connection = client.open(server.getUrl("/b"));
|
||||
connection.setRequestMethod("POST");
|
||||
transferKind.setForRequest(connection, requestBody.length);
|
||||
for (int i = 0; i < requestBody.length; i += 1024) {
|
||||
connection.getOutputStream().write(requestBody, i, 1024);
|
||||
}
|
||||
connection.getOutputStream().close();
|
||||
assertContent("B", connection);
|
||||
|
||||
RecordedRequest requestA = server.takeRequest();
|
||||
assertEquals("/a", requestA.getPath());
|
||||
RecordedRequest requestB = server.takeRequest();
|
||||
assertEquals("/b", requestB.getPath());
|
||||
assertEquals(Arrays.toString(requestBody), Arrays.toString(requestB.getBody()));
|
||||
}
|
||||
|
||||
@Test public void fullyBufferedPostIsTooShort() throws Exception {
|
||||
server.enqueue(new MockResponse().setBody("A"));
|
||||
server.play();
|
||||
|
||||
HttpURLConnection connection = client.open(server.getUrl("/b"));
|
||||
connection.setRequestProperty("Content-Length", "4");
|
||||
connection.setRequestMethod("POST");
|
||||
OutputStream out = connection.getOutputStream();
|
||||
out.write('a');
|
||||
out.write('b');
|
||||
out.write('c');
|
||||
try {
|
||||
out.close();
|
||||
fail();
|
||||
} catch (IOException expected) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void fullyBufferedPostIsTooLong() throws Exception {
|
||||
server.enqueue(new MockResponse().setBody("A"));
|
||||
server.play();
|
||||
|
||||
HttpURLConnection connection = client.open(server.getUrl("/b"));
|
||||
connection.setRequestProperty("Content-Length", "3");
|
||||
connection.setRequestMethod("POST");
|
||||
OutputStream out = connection.getOutputStream();
|
||||
out.write('a');
|
||||
out.write('b');
|
||||
out.write('c');
|
||||
try {
|
||||
out.write('d');
|
||||
fail();
|
||||
} catch (IOException expected) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test @Ignore public void testPooledConnectionsDetectHttp10() {
|
||||
// TODO: write a test that shows pooled connections detect HTTP/1.0 (vs. HTTP/1.1)
|
||||
fail("TODO");
|
||||
@@ -2413,7 +2505,7 @@ public final class URLConnectionTest {
|
||||
response.setBody(content);
|
||||
}
|
||||
@Override void setForRequest(HttpURLConnection connection, int contentLength) {
|
||||
connection.setChunkedStreamingMode(contentLength);
|
||||
connection.setFixedLengthStreamingMode(contentLength);
|
||||
}
|
||||
},
|
||||
END_OF_STREAM() {
|
||||
|
||||
Reference in New Issue
Block a user