mirror of
https://github.com/square/okhttp.git
synced 2026-01-27 04:22:07 +03:00
Merge pull request #153 from square/jwilson/recovery
Recover from failures while writing a POST body.
This commit is contained in:
@@ -14,18 +14,18 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.squareup.okhttp.internal.http;
|
||||
package com.squareup.okhttp.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
* An output stream for the body of an HTTP request.
|
||||
* An output stream for an HTTP request body.
|
||||
*
|
||||
* <p>Since a single socket's output stream may be used to write multiple HTTP
|
||||
* requests to the same server, subclasses should not close the socket stream.
|
||||
*/
|
||||
abstract class AbstractHttpOutputStream extends OutputStream {
|
||||
public abstract class AbstractOutputStream extends OutputStream {
|
||||
protected boolean closed;
|
||||
|
||||
@Override public final void write(int data) throws IOException {
|
||||
@@ -37,4 +37,9 @@ abstract class AbstractHttpOutputStream extends OutputStream {
|
||||
throw new IOException("stream closed");
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if this stream was closed locally. */
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
}
|
||||
@@ -32,13 +32,12 @@ import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
|
||||
* replacement stream each time an {@link IOException} is encountered on the
|
||||
* current stream.
|
||||
*/
|
||||
public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
public abstract class FaultRecoveringOutputStream extends AbstractOutputStream {
|
||||
private final int maxReplayBufferLength;
|
||||
|
||||
/** Bytes to transmit on the replacement stream, or null if no recovery is possible. */
|
||||
private ByteArrayOutputStream replayBuffer;
|
||||
private OutputStream out;
|
||||
private boolean closed;
|
||||
|
||||
/**
|
||||
* @param maxReplayBufferLength the maximum number of successfully written
|
||||
@@ -52,10 +51,6 @@ public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
@Override public final void write(int data) throws IOException {
|
||||
write(new byte[] { (byte) data });
|
||||
}
|
||||
|
||||
@Override public final void write(byte[] buffer, int offset, int count) throws IOException {
|
||||
if (closed) throw new IOException("stream closed");
|
||||
checkOffsetAndCount(buffer.length, offset, count);
|
||||
@@ -119,15 +114,13 @@ public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
}
|
||||
|
||||
while (true) {
|
||||
OutputStream replacementStream = replacementStream(e);
|
||||
if (replacementStream == null) {
|
||||
return false;
|
||||
}
|
||||
OutputStream replacementStream = null;
|
||||
try {
|
||||
replayBuffer.writeTo(replacementStream);
|
||||
// We've found a replacement that works!
|
||||
Util.closeQuietly(out);
|
||||
out = replacementStream;
|
||||
replacementStream = replacementStream(e);
|
||||
if (replacementStream == null) {
|
||||
return false;
|
||||
}
|
||||
replaceStream(replacementStream);
|
||||
return true;
|
||||
} catch (IOException replacementStreamFailure) {
|
||||
// The replacement was also broken. Loop to ask for another replacement.
|
||||
@@ -137,11 +130,34 @@ public abstract class FaultRecoveringOutputStream extends OutputStream {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if errors in the underlying stream can currently be recovered.
|
||||
*/
|
||||
public boolean isRecoverable() {
|
||||
return replayBuffer != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the current output stream with {@code replacementStream}, writing
|
||||
* any replay bytes to it if they exist. The current output stream is closed.
|
||||
*/
|
||||
public final void replaceStream(OutputStream replacementStream) throws IOException {
|
||||
if (!isRecoverable()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
if (this.out == replacementStream) {
|
||||
return; // Don't replace a stream with itself.
|
||||
}
|
||||
replayBuffer.writeTo(replacementStream);
|
||||
Util.closeQuietly(out);
|
||||
out = replacementStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a replacement output stream to recover from {@code e} thrown by the
|
||||
* previous stream. Returns a new OutputStream if recovery was successful, in
|
||||
* which case all previously-written data will be replayed. Returns null if
|
||||
* the failure cannot be recovered.
|
||||
*/
|
||||
protected abstract OutputStream replacementStream(IOException e);
|
||||
protected abstract OutputStream replacementStream(IOException e) throws IOException;
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package com.squareup.okhttp.internal.http;
|
||||
|
||||
import com.squareup.okhttp.Connection;
|
||||
import com.squareup.okhttp.internal.AbstractOutputStream;
|
||||
import com.squareup.okhttp.internal.Util;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@@ -139,7 +140,7 @@ public final class HttpTransport implements Transport {
|
||||
}
|
||||
|
||||
// We cannot reuse sockets that have incomplete output.
|
||||
if (requestBodyOut != null && !((AbstractHttpOutputStream) requestBodyOut).closed) {
|
||||
if (requestBodyOut != null && !((AbstractOutputStream) requestBodyOut).isClosed()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -209,7 +210,7 @@ public final class HttpTransport implements Transport {
|
||||
}
|
||||
|
||||
/** An HTTP body with a fixed length known in advance. */
|
||||
private static final class FixedLengthOutputStream extends AbstractHttpOutputStream {
|
||||
private static final class FixedLengthOutputStream extends AbstractOutputStream {
|
||||
private final OutputStream socketOut;
|
||||
private int bytesRemaining;
|
||||
|
||||
@@ -251,7 +252,7 @@ public final class HttpTransport implements Transport {
|
||||
* buffered until {@code maxChunkLength} bytes are ready, at which point the
|
||||
* chunk is written and the buffer is cleared.
|
||||
*/
|
||||
private static final class ChunkedOutputStream extends AbstractHttpOutputStream {
|
||||
private static final class ChunkedOutputStream extends AbstractOutputStream {
|
||||
private static final byte[] CRLF = { '\r', '\n' };
|
||||
private static final byte[] HEX_DIGITS = {
|
||||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
|
||||
|
||||
@@ -21,6 +21,8 @@ import com.squareup.okhttp.Connection;
|
||||
import com.squareup.okhttp.ConnectionPool;
|
||||
import com.squareup.okhttp.OkHttpClient;
|
||||
import com.squareup.okhttp.Route;
|
||||
import com.squareup.okhttp.internal.AbstractOutputStream;
|
||||
import com.squareup.okhttp.internal.FaultRecoveringOutputStream;
|
||||
import com.squareup.okhttp.internal.Util;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
@@ -71,6 +73,13 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
*/
|
||||
private static final int MAX_REDIRECTS = 20;
|
||||
|
||||
/**
|
||||
* The minimum number of request body bytes to transmit before we're willing
|
||||
* to let a routine {@link IOException} bubble up to the user. This is used to
|
||||
* size a buffer for data that will be replayed upon error.
|
||||
*/
|
||||
private static final int MAX_REPLAY_BUFFER_LENGTH = 8192;
|
||||
|
||||
private final boolean followProtocolRedirects;
|
||||
|
||||
/** The proxy requested by the client, or null for a proxy to be selected automatically. */
|
||||
@@ -85,10 +94,10 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
HostnameVerifier hostnameVerifier;
|
||||
final Set<Route> failedRoutes;
|
||||
|
||||
|
||||
private final RawHeaders rawRequestHeaders = new RawHeaders();
|
||||
|
||||
private int redirectionCount;
|
||||
private FaultRecoveringOutputStream faultRecoveringRequestBody;
|
||||
|
||||
protected IOException httpEngineFailure;
|
||||
protected HttpEngine httpEngine;
|
||||
@@ -225,14 +234,29 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
@Override public final OutputStream getOutputStream() throws IOException {
|
||||
connect();
|
||||
|
||||
OutputStream result = httpEngine.getRequestBody();
|
||||
if (result == null) {
|
||||
OutputStream out = httpEngine.getRequestBody();
|
||||
if (out == null) {
|
||||
throw new ProtocolException("method does not support a request body: " + method);
|
||||
} else if (httpEngine.hasResponse()) {
|
||||
throw new ProtocolException("cannot write request body after response has been read");
|
||||
}
|
||||
|
||||
return result;
|
||||
if (faultRecoveringRequestBody == null) {
|
||||
faultRecoveringRequestBody = new FaultRecoveringOutputStream(MAX_REPLAY_BUFFER_LENGTH, out) {
|
||||
@Override protected OutputStream replacementStream(IOException e) throws IOException {
|
||||
if (httpEngine.getRequestBody() instanceof AbstractOutputStream
|
||||
&& ((AbstractOutputStream) httpEngine.getRequestBody()).isClosed()) {
|
||||
return null; // Don't recover once the underlying stream has been closed.
|
||||
}
|
||||
if (handleFailure(e)) {
|
||||
return httpEngine.getRequestBody();
|
||||
}
|
||||
return null; // This is a permanent failure.
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return faultRecoveringRequestBody;
|
||||
}
|
||||
|
||||
@Override public final Permission getPermission() throws IOException {
|
||||
@@ -362,29 +386,50 @@ public class HttpURLConnectionImpl extends HttpURLConnection {
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
RouteSelector routeSelector = httpEngine.routeSelector;
|
||||
if (routeSelector != null && httpEngine.connection != null) {
|
||||
routeSelector.connectFailed(httpEngine.connection, e);
|
||||
}
|
||||
if (routeSelector == null && httpEngine.connection == null) {
|
||||
throw e; // If we failed before finding a route or a connection, give up.
|
||||
}
|
||||
|
||||
// The connection failure isn't fatal if there's another route to attempt.
|
||||
OutputStream requestBody = httpEngine.getRequestBody();
|
||||
if ((routeSelector == null || routeSelector.hasNext()) && isRecoverable(e) && (requestBody
|
||||
== null || requestBody instanceof RetryableOutputStream)) {
|
||||
httpEngine.release(true);
|
||||
httpEngine =
|
||||
newHttpEngine(method, rawRequestHeaders, null, (RetryableOutputStream) requestBody);
|
||||
httpEngine.routeSelector = routeSelector; // Keep the same routeSelector.
|
||||
if (handleFailure(e)) {
|
||||
return false;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
httpEngineFailure = e;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Report and attempt to recover from {@code e}. Returns true if the HTTP
|
||||
* engine was replaced and the request should be retried. Otherwise the
|
||||
* failure is permanent.
|
||||
*/
|
||||
private boolean handleFailure(IOException e) throws IOException {
|
||||
RouteSelector routeSelector = httpEngine.routeSelector;
|
||||
if (routeSelector != null && httpEngine.connection != null) {
|
||||
routeSelector.connectFailed(httpEngine.connection, e);
|
||||
}
|
||||
|
||||
OutputStream requestBody = httpEngine.getRequestBody();
|
||||
boolean canRetryRequestBody = requestBody == null
|
||||
|| requestBody instanceof RetryableOutputStream
|
||||
|| (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable());
|
||||
if (routeSelector == null && httpEngine.connection == null // No connection.
|
||||
|| routeSelector != null && !routeSelector.hasNext() // No more routes to attempt.
|
||||
|| !isRecoverable(e)
|
||||
|| !canRetryRequestBody) {
|
||||
httpEngineFailure = e;
|
||||
return false;
|
||||
}
|
||||
|
||||
httpEngine.release(true);
|
||||
RetryableOutputStream retryableOutputStream = requestBody instanceof RetryableOutputStream
|
||||
? (RetryableOutputStream) requestBody
|
||||
: null;
|
||||
httpEngine = newHttpEngine(method, rawRequestHeaders, null, retryableOutputStream);
|
||||
httpEngine.routeSelector = routeSelector; // Keep the same routeSelector.
|
||||
if (faultRecoveringRequestBody != null && faultRecoveringRequestBody.isRecoverable()) {
|
||||
httpEngine.sendRequest();
|
||||
faultRecoveringRequestBody.replaceStream(httpEngine.getRequestBody());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean isRecoverable(IOException e) {
|
||||
// If the problem was a CertificateException from the X509TrustManager,
|
||||
// do not retry, we didn't have an abrupt server initiated exception.
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package com.squareup.okhttp.internal.http;
|
||||
|
||||
import com.squareup.okhttp.internal.AbstractOutputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
@@ -28,7 +29,7 @@ import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
|
||||
* the post body to be transparently re-sent if the HTTP request must be
|
||||
* sent multiple times.
|
||||
*/
|
||||
final class RetryableOutputStream extends AbstractHttpOutputStream {
|
||||
final class RetryableOutputStream extends AbstractOutputStream {
|
||||
private final int limit;
|
||||
private final ByteArrayOutputStream content;
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@@ -2286,6 +2287,97 @@ public final class URLConnectionTest {
|
||||
assertEquals(-1, in.read());
|
||||
}
|
||||
|
||||
@Test public void postFailsWithBufferedRequestForSmallRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.END_OF_STREAM, 1024);
|
||||
}
|
||||
|
||||
// This test is ignored because we don't (yet) reliably recover for large request bodies.
|
||||
@Test @Ignore public void postFailsWithBufferedRequestForLargeRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.END_OF_STREAM, 16384);
|
||||
}
|
||||
|
||||
@Test public void postFailsWithChunkedRequestForSmallRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.CHUNKED, 1024);
|
||||
}
|
||||
|
||||
// This test is ignored because we don't (yet) reliably recover for large request bodies.
|
||||
@Test @Ignore public void postFailsWithChunkedRequestForLargeRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.CHUNKED, 16384);
|
||||
}
|
||||
|
||||
@Test public void postFailsWithFixedLengthRequestForSmallRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 1024);
|
||||
}
|
||||
|
||||
// This test is ignored because we don't (yet) reliably recover for large request bodies.
|
||||
@Test @Ignore public void postFailsWithFixedLengthRequestForLargeRequest() throws Exception {
|
||||
reusedConnectionFailsWithPost(TransferKind.FIXED_LENGTH, 16384);
|
||||
}
|
||||
|
||||
private void reusedConnectionFailsWithPost(TransferKind transferKind, int requestSize)
|
||||
throws Exception {
|
||||
server.enqueue(new MockResponse().setBody("A").setSocketPolicy(SHUTDOWN_INPUT_AT_END));
|
||||
server.enqueue(new MockResponse().setBody("B"));
|
||||
server.play();
|
||||
|
||||
assertContent("A", client.open(server.getUrl("/a")));
|
||||
|
||||
// If the request body is larger than OkHttp's replay buffer, the failure may still occur.
|
||||
byte[] requestBody = new byte[requestSize];
|
||||
new Random(0).nextBytes(requestBody);
|
||||
|
||||
HttpURLConnection connection = client.open(server.getUrl("/b"));
|
||||
connection.setRequestMethod("POST");
|
||||
transferKind.setForRequest(connection, requestBody.length);
|
||||
for (int i = 0; i < requestBody.length; i += 1024) {
|
||||
connection.getOutputStream().write(requestBody, i, 1024);
|
||||
}
|
||||
connection.getOutputStream().close();
|
||||
assertContent("B", connection);
|
||||
|
||||
RecordedRequest requestA = server.takeRequest();
|
||||
assertEquals("/a", requestA.getPath());
|
||||
RecordedRequest requestB = server.takeRequest();
|
||||
assertEquals("/b", requestB.getPath());
|
||||
assertEquals(Arrays.toString(requestBody), Arrays.toString(requestB.getBody()));
|
||||
}
|
||||
|
||||
@Test public void fullyBufferedPostIsTooShort() throws Exception {
|
||||
server.enqueue(new MockResponse().setBody("A"));
|
||||
server.play();
|
||||
|
||||
HttpURLConnection connection = client.open(server.getUrl("/b"));
|
||||
connection.setRequestProperty("Content-Length", "4");
|
||||
connection.setRequestMethod("POST");
|
||||
OutputStream out = connection.getOutputStream();
|
||||
out.write('a');
|
||||
out.write('b');
|
||||
out.write('c');
|
||||
try {
|
||||
out.close();
|
||||
fail();
|
||||
} catch (IOException expected) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void fullyBufferedPostIsTooLong() throws Exception {
|
||||
server.enqueue(new MockResponse().setBody("A"));
|
||||
server.play();
|
||||
|
||||
HttpURLConnection connection = client.open(server.getUrl("/b"));
|
||||
connection.setRequestProperty("Content-Length", "3");
|
||||
connection.setRequestMethod("POST");
|
||||
OutputStream out = connection.getOutputStream();
|
||||
out.write('a');
|
||||
out.write('b');
|
||||
out.write('c');
|
||||
try {
|
||||
out.write('d');
|
||||
fail();
|
||||
} catch (IOException expected) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test @Ignore public void testPooledConnectionsDetectHttp10() {
|
||||
// TODO: write a test that shows pooled connections detect HTTP/1.0 (vs. HTTP/1.1)
|
||||
fail("TODO");
|
||||
@@ -2413,7 +2505,7 @@ public final class URLConnectionTest {
|
||||
response.setBody(content);
|
||||
}
|
||||
@Override void setForRequest(HttpURLConnection connection, int contentLength) {
|
||||
connection.setChunkedStreamingMode(contentLength);
|
||||
connection.setFixedLengthStreamingMode(contentLength);
|
||||
}
|
||||
},
|
||||
END_OF_STREAM() {
|
||||
|
||||
Reference in New Issue
Block a user