1
0
mirror of https://github.com/square/okhttp.git synced 2026-01-15 20:56:41 +03:00

Migrate WebSocketReaderTest to the async API.

Also change reader's callback types to be strings and byte strings.
This commit is contained in:
jwilson
2016-11-18 22:16:08 -05:00
parent 395ae6f9b8
commit ab73bddaa4
10 changed files with 109 additions and 785 deletions

View File

@@ -32,11 +32,11 @@ import static org.junit.Assert.assertTrue;
public final class RecordedResponse {
public final Request request;
public final Response response;
public final WebSocket webSocket;
public final NewWebSocket webSocket;
public final String body;
public final IOException failure;
public RecordedResponse(Request request, Response response, WebSocket webSocket, String body,
public RecordedResponse(Request request, Response response, NewWebSocket webSocket, String body,
IOException failure) {
this.request = request;
this.response = response;

View File

@@ -1,40 +0,0 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.ws;
import java.io.IOException;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
public class EmptyWebSocketListener implements WebSocketListener {
@Override public void onOpen(WebSocket webSocket, Response response) {
}
@Override public void onMessage(ResponseBody message) throws IOException {
}
@Override public void onPong(ByteString payload) {
}
@Override public void onClose(int code, String reason) {
}
@Override public void onFailure(Throwable t, Response response) {
}
}

View File

@@ -140,6 +140,16 @@ public final class NewWebSocketRecorder extends NewWebSocket.Listener {
assertEquals(new Message(ByteString.of(payload)), actual);
}
public void assertPing(ByteString payload) {
Object actual = nextEvent();
assertEquals(new Ping(payload), actual);
}
public void assertPong(ByteString payload) {
Object actual = nextEvent();
assertEquals(new Pong(payload), actual);
}
public void assertClosing(int code, String reason) {
Object actual = nextEvent();
assertEquals(new Closing(code, reason), actual);
@@ -198,6 +208,31 @@ public final class NewWebSocketRecorder extends NewWebSocket.Listener {
assertEquals(message, failure.t.getMessage());
}
/** Expose this recorder as a frame callback and shim in "ping" events. */
public WebSocketReader.FrameCallback asFrameCallback() {
return new WebSocketReader.FrameCallback() {
@Override public void onReadMessage(String text) throws IOException {
onMessage(null, text);
}
@Override public void onReadMessage(ByteString bytes) throws IOException {
onMessage(null, bytes);
}
@Override public void onReadPing(ByteString payload) {
events.add(new Ping(payload));
}
@Override public void onReadPong(ByteString payload) {
events.add(new Pong(payload));
}
@Override public void onReadClose(int code, String reason) {
onClosing(null, code, reason);
}
};
}
static final class Open {
final NewWebSocket webSocket;
final Response response;
@@ -267,6 +302,48 @@ public final class NewWebSocketRecorder extends NewWebSocket.Listener {
}
}
static final class Ping {
public final ByteString payload;
public Ping(ByteString payload) {
this.payload = payload;
}
@Override public String toString() {
return "Ping[" + payload + "]";
}
@Override public int hashCode() {
return payload.hashCode();
}
@Override public boolean equals(Object other) {
return other instanceof Ping
&& ((Ping) other).payload.equals(payload);
}
}
static final class Pong {
public final ByteString payload;
public Pong(ByteString payload) {
this.payload = payload;
}
@Override public String toString() {
return "Pong[" + payload + "]";
}
@Override public int hashCode() {
return payload.hashCode();
}
@Override public boolean equals(Object other) {
return other instanceof Pong
&& ((Pong) other).payload.equals(payload);
}
}
static final class Closing {
public final int code;
public final String reason;

View File

@@ -19,24 +19,20 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import okhttp3.ResponseBody;
import okhttp3.internal.Util;
import okio.Buffer;
import okio.BufferedSource;
import okio.ByteString;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public final class WebSocketReaderTest {
private final Buffer data = new Buffer();
private final WebSocketRecorder callback = new WebSocketRecorder("client");
private final NewWebSocketRecorder callback = new NewWebSocketRecorder("client");
private final Random random = new Random(0);
// Mutually exclusive. Use the one corresponding to the peer whose behavior you wish to test.
@@ -149,20 +145,11 @@ public final class WebSocketReaderTest {
@Test public void serverHelloTwoChunks() throws IOException {
data.write(ByteString.decodeHex("818537fa213d7f9f4d")); // Hel
data.write(ByteString.decodeHex("5158")); // lo
final Buffer sink = new Buffer();
callback.setNextEventDelegate(new EmptyWebSocketListener() {
@Override public void onMessage(ResponseBody message) throws IOException {
BufferedSource source = message.source();
source.readFully(sink, 3); // Read "Hel"
data.write(ByteString.decodeHex("5158")); // lo
source.readFully(sink, 2); // Read "lo"
source.close();
}
});
serverReader.processNextFrame();
assertEquals("Hello", sink.readUtf8());
callback.assertTextMessage("Hello");
}
@Test public void clientTwoFrameHello() throws IOException {
@@ -250,84 +237,6 @@ public final class WebSocketReaderTest {
}
}
@Test public void noCloseErrors() throws IOException {
data.write(ByteString.decodeHex("810548656c6c6f")); // Hello
callback.setNextEventDelegate(new EmptyWebSocketListener() {
@Override public void onMessage(ResponseBody body) throws IOException {
body.source().readAll(new Buffer());
}
});
try {
clientReader.processNextFrame();
fail();
} catch (IllegalStateException e) {
assertEquals("Listener failed to call close on message payload.", e.getMessage());
}
}
@Test public void closeExhaustsMessage() throws IOException {
data.write(ByteString.decodeHex("810548656c6c6f")); // Hello
data.write(ByteString.decodeHex("810448657921")); // Hey!
final Buffer sink = new Buffer();
callback.setNextEventDelegate(new EmptyWebSocketListener() {
@Override public void onMessage(ResponseBody message) throws IOException {
message.source().read(sink, 3);
message.close();
}
});
clientReader.processNextFrame();
assertEquals("Hel", sink.readUtf8());
clientReader.processNextFrame();
callback.assertTextMessage("Hey!");
}
@Test public void closeExhaustsMessageOverControlFrames() throws IOException {
data.write(ByteString.decodeHex("010348656c")); // Hel
data.write(ByteString.decodeHex("8a00")); // Pong
data.write(ByteString.decodeHex("8a00")); // Pong
data.write(ByteString.decodeHex("80026c6f")); // lo
data.write(ByteString.decodeHex("810448657921")); // Hey!
final Buffer sink = new Buffer();
callback.setNextEventDelegate(new EmptyWebSocketListener() {
@Override public void onMessage(ResponseBody message) throws IOException {
message.source().read(sink, 2);
message.close();
}
});
clientReader.processNextFrame();
assertEquals("He", sink.readUtf8());
callback.assertPong(ByteString.EMPTY);
callback.assertPong(ByteString.EMPTY);
clientReader.processNextFrame();
callback.assertTextMessage("Hey!");
}
@Test public void closedMessageSourceThrows() throws IOException {
data.write(ByteString.decodeHex("810548656c6c6f")); // Hello
final AtomicReference<Exception> exception = new AtomicReference<>();
callback.setNextEventDelegate(new EmptyWebSocketListener() {
@Override public void onMessage(ResponseBody message) throws IOException {
message.close();
try {
message.source().readAll(new Buffer());
fail();
} catch (IllegalStateException e) {
exception.set(e);
}
}
});
clientReader.processNextFrame();
assertNotNull(exception.get());
}
@Test public void emptyPingCallsCallback() throws IOException {
data.write(ByteString.decodeHex("8900")); // Empty ping
clientReader.processNextFrame();
@@ -343,7 +252,7 @@ public final class WebSocketReaderTest {
@Test public void emptyCloseCallsCallback() throws IOException {
data.write(ByteString.decodeHex("8800")); // Empty close
clientReader.processNextFrame();
callback.assertClose(1005, "");
callback.assertClosing(1005, "");
}
@Test public void closeLengthOfOneThrows() throws IOException {
@@ -359,7 +268,7 @@ public final class WebSocketReaderTest {
@Test public void closeCallsCallback() throws IOException {
data.write(ByteString.decodeHex("880703e848656c6c6f")); // Close with code and reason
clientReader.processNextFrame();
callback.assertClose(1000, "Hello");
callback.assertClosing(1000, "Hello");
}
@Test public void closeOutOfRangeThrows() throws IOException {

View File

@@ -1,354 +0,0 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.ws;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import okhttp3.MediaType;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.internal.platform.Platform;
import okio.Buffer;
import okio.ByteString;
import static okhttp3.WebSocket.BINARY;
import static okhttp3.WebSocket.TEXT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public final class WebSocketRecorder implements WebSocketListener {
private final String name;
private final BlockingQueue<Object> events = new LinkedBlockingQueue<>();
private WebSocketListener delegate;
public WebSocketRecorder(String name) {
this.name = name;
}
/** Sets a delegate for handling the next callback to this listener. Cleared after invoked. */
public void setNextEventDelegate(WebSocketListener delegate) {
this.delegate = delegate;
}
@Override public void onOpen(WebSocket webSocket, Response response) {
Platform.get().log(Platform.INFO, "[WS " + name + "] onOpen", null);
WebSocketListener delegate = this.delegate;
if (delegate != null) {
this.delegate = null;
delegate.onOpen(webSocket, response);
} else {
events.add(new Open(webSocket, response));
}
}
@Override public void onMessage(ResponseBody message) throws IOException {
Platform.get().log(Platform.INFO, "[WS " + name + "] onMessage", null);
WebSocketListener delegate = this.delegate;
if (delegate != null) {
this.delegate = null;
delegate.onMessage(message);
} else {
Message event = new Message(message.contentType());
message.source().readAll(event.buffer);
message.close();
events.add(event);
}
}
@Override public void onPong(ByteString payload) {
Platform.get().log(Platform.INFO, "[WS " + name + "] onPong", null);
WebSocketListener delegate = this.delegate;
if (delegate != null) {
this.delegate = null;
delegate.onPong(payload);
} else {
events.add(new Pong(payload));
}
}
@Override public void onClose(int code, String reason) {
Platform.get().log(Platform.INFO, "[WS " + name + "] onClose " + code, null);
WebSocketListener delegate = this.delegate;
if (delegate != null) {
this.delegate = null;
delegate.onClose(code, reason);
} else {
events.add(new Close(code, reason));
}
}
@Override public void onFailure(Throwable t, Response response) {
Platform.get().log(Platform.INFO, "[WS " + name + "] onFailure", t);
WebSocketListener delegate = this.delegate;
if (delegate != null) {
this.delegate = null;
delegate.onFailure(t, response);
} else {
events.add(new Failure(t, response));
}
}
private Object nextEvent() {
try {
Object event = events.poll(10, TimeUnit.SECONDS);
if (event == null) {
throw new AssertionError("Timed out waiting for event.");
}
return event;
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
public void assertTextMessage(String payload) {
Message message = new Message(TEXT);
message.buffer.writeUtf8(payload);
Object actual = nextEvent();
assertEquals(message, actual);
}
public void assertBinaryMessage(byte[] payload) {
Message message = new Message(BINARY);
message.buffer.write(payload);
Object actual = nextEvent();
assertEquals(message, actual);
}
public void assertPong(ByteString payload) {
Object actual = nextEvent();
assertEquals(new Pong(payload), actual);
}
public void assertClose(int code, String reason) {
Object actual = nextEvent();
assertEquals(new Close(code, reason), actual);
}
public void assertExhausted() {
assertTrue("Remaining events: " + events, events.isEmpty());
}
public WebSocket assertOpen() {
Object event = nextEvent();
if (!(event instanceof Open)) {
throw new AssertionError("Expected Open but was " + event);
}
return ((Open) event).webSocket;
}
public void assertFailure(Throwable t) {
Object event = nextEvent();
if (!(event instanceof Failure)) {
throw new AssertionError("Expected Failure but was " + event);
}
Failure failure = (Failure) event;
assertNull(failure.response);
assertSame(t, failure.t);
}
public void assertFailure(Class<? extends IOException> cls, String message) {
Object event = nextEvent();
if (!(event instanceof Failure)) {
throw new AssertionError("Expected Failure but was " + event);
}
Failure failure = (Failure) event;
assertNull(failure.response);
assertEquals(cls, failure.t.getClass());
assertEquals(message, failure.t.getMessage());
}
public void assertFailure(int code, String body, Class<? extends IOException> cls, String message)
throws IOException {
Object event = nextEvent();
if (!(event instanceof Failure)) {
throw new AssertionError("Expected Failure but was " + event);
}
Failure failure = (Failure) event;
assertEquals(code, failure.response.code());
if (body != null) {
assertEquals(body, failure.response.body().string());
}
assertEquals(cls, failure.t.getClass());
assertEquals(message, failure.t.getMessage());
}
static final class Open {
final WebSocket webSocket;
final Response response;
Open(WebSocket webSocket, Response response) {
this.webSocket = webSocket;
this.response = response;
}
@Override public String toString() {
return "Open[" + response + "]";
}
}
static final class Failure {
final Throwable t;
final Response response;
Failure(Throwable t, Response response) {
this.t = t;
this.response = response;
}
@Override public String toString() {
if (response == null) {
return "Failure[" + t + "]";
}
return "Failure[" + response + "]";
}
}
static final class Message {
public final MediaType mediaType;
public final Buffer buffer = new Buffer();
Message(MediaType mediaType) {
this.mediaType = mediaType;
}
@Override public String toString() {
return "Message[" + mediaType + " " + buffer + "]";
}
@Override public int hashCode() {
return mediaType.hashCode() * 37 + buffer.hashCode();
}
@Override public boolean equals(Object obj) {
if (obj instanceof Message) {
Message other = (Message) obj;
return mediaType.equals(other.mediaType) && buffer.equals(other.buffer);
}
return false;
}
}
static final class Pong {
public final ByteString payload;
Pong(ByteString payload) {
this.payload = payload;
}
@Override public String toString() {
return "Pong[" + payload + "]";
}
@Override public int hashCode() {
return payload.hashCode();
}
@Override public boolean equals(Object obj) {
if (obj instanceof Pong) {
Pong other = (Pong) obj;
return payload == null ? other.payload == null : payload.equals(other.payload);
}
return false;
}
}
static final class Close {
public final int code;
public final String reason;
Close(int code, String reason) {
this.code = code;
this.reason = reason;
}
@Override public String toString() {
return "Close[" + code + " " + reason + "]";
}
@Override public int hashCode() {
return code * 37 + reason.hashCode();
}
@Override public boolean equals(Object obj) {
if (obj instanceof Close) {
Close other = (Close) obj;
return code == other.code && reason.equals(other.reason);
}
return false;
}
}
/** Expose this recorder as a frame callback and shim in "ping" events. */
WebSocketReader.FrameCallback asFrameCallback() {
return new WebSocketReader.FrameCallback() {
@Override public void onReadMessage(ResponseBody body) throws IOException {
onMessage(body);
}
@Override public void onReadPing(ByteString payload) {
events.add(new Ping(payload));
}
@Override public void onReadPong(ByteString padload) {
onPong(padload);
}
@Override public void onReadClose(int code, String reason) {
onClose(code, reason);
}
};
}
void assertPing(ByteString payload) {
Object actual = nextEvent();
assertEquals(new Ping(payload), actual);
}
static final class Ping {
public final ByteString buffer;
Ping(ByteString buffer) {
this.buffer = buffer;
}
@Override public String toString() {
return "Ping[" + buffer + "]";
}
@Override public int hashCode() {
return buffer.hashCode();
}
@Override public boolean equals(Object obj) {
if (obj instanceof Ping) {
Ping other = (Ping) obj;
return buffer == null ? other.buffer == null : buffer.equals(other.buffer);
}
return false;
}
}
}

View File

@@ -1,66 +0,0 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
import java.io.IOException;
import okio.ByteString;
/** Blocking interface to connect and write to a web socket. This class is not thread safe. */
public interface WebSocket {
/** A {@link MediaType} indicating UTF-8 text frames should be used when sending the message. */
MediaType TEXT = MediaType.parse("application/vnd.okhttp.websocket+text; charset=utf-8");
/** A {@link MediaType} indicating binary frames should be used when sending the message. */
MediaType BINARY = MediaType.parse("application/vnd.okhttp.websocket+binary");
/**
* Send a message to the server.
*
* @param message The message body. The {@linkplain RequestBody#contentType() content type} of
* must be either {@link #TEXT} or {@link #BINARY}.
* @throws IOException if unable to write the message. Clients must call {@link #close} when this
* happens to ensure resources are cleaned up.
* @throws IllegalStateException if this web socket was already closed.
*/
void message(RequestBody message) throws IOException;
/**
* Send a ping to the server.
*
* @param payload Ping payload which must not exceed 125 bytes. Use {@link ByteString#EMPTY} for
* no payload.
* @throws IOException if unable to write the ping. Clients must call {@link #close} when this
* happens to ensure resources are cleaned up.
* @throws IllegalStateException if this web socket was already closed.
*/
void ping(ByteString payload) throws IOException;
/**
* Send a close indicator to the server.
*
* <p>The corresponding {@link WebSocketListener} will continue to get messages until its {@link
* WebSocketListener#onClose onClose()} method is called.
*
* <p>It is an error to call this method before calling close on an active writer. Calling this
* method more than once has no effect.
*
* @param code Status code as defined by <a
* href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a> or {@code 0}.
* @param reason Reason for shutting down or {@code null}.
* @throws IOException if unable to write the close message. Resources will still be freed.
* @throws IllegalStateException if this web socket was already closed.
*/
void close(int code, String reason) throws IOException;
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
public interface WebSocketCall extends Cloneable {
/** Returns the original request that initiated this call. */
Request request();
/**
* Schedules the request to be executed at some point in the future.
*
* <p>The {@link OkHttpClient#dispatcher dispatcher} defines when the request will run: usually
* immediately unless there are several other requests currently being executed.
*
* <p>This client will later call back {@code responseCallback} with either an HTTP response or a
* failure exception. If you {@link #cancel} a request before it completes the callback will not
* be invoked.
*
* @throws IllegalStateException when the call has already been executed.
*/
void enqueue(WebSocketListener listener);
/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */
void cancel();
/**
* Returns true if this call has been {@linkplain #enqueue(WebSocketListener) enqueued}. It is an
* error to enqueue a call more than once.
*/
boolean isExecuted();
boolean isCanceled();
/**
* Create a new, identical call to this one which can be enqueued even if this call has already
* been.
*/
WebSocketCall clone();
interface Factory {
WebSocketCall newWebSocketCall(Request request);
}
}

View File

@@ -1,90 +0,0 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
import java.io.IOException;
import okio.ByteString;
/**
* Listener for server-initiated messages on a connected {@link WebSocket}. All callbacks will be
* called on a single thread.
*
* <h2>Lifecycle Rules</h2>
* <ul>
* <li>Either {@link #onOpen} or {@link #onFailure} will be called first depending on if the web
* socket was successfully opened or if there was an error connecting to the server or parsing its
* response.</li>
* <li>After {@link #onOpen} is called, {@link #onFailure} can be called at any time. No more
* callbacks will follow a call to {@link #onFailure}.</li>
* <li>After {@link #onOpen} is called, {@link #onMessage} and {@link #onPong} will be called for
* each message and pong frame, respectively. Note: {@link #onPong} may be called while {@link
* #onMessage} is reading the message because pong frames may interleave in the message body.</li>
* <li>After {@link #onOpen} is called, {@link #onClose} may be called once. No calls to {@link
* #onMessage} or {@link #onPong} will follow a call to {@link #onClose}.</li>
* <li>{@link #onFailure} will be called if any of the other callbacks throws an exception.</li>
* </ul>
*/
public interface WebSocketListener {
/**
* Called when the request has successfully been upgraded to a web socket. <b>Do not</b> use this
* callback to write to the web socket. Start a new thread or use another thread in your
* application.
*/
void onOpen(WebSocket webSocket, Response response);
/**
* Called when a server message is received. The {@code type} indicates whether the {@code
* payload} should be interpreted as UTF-8 text or binary data.
*
* <p>Implementations <strong>must</strong> call {@code source.close()} before returning. This
* indicates completion of parsing the message payload and will consume any remaining bytes in the
* message.
*
* <p>The {@linkplain ResponseBody#contentType() content type} of {@code message} will be either
* {@link WebSocket#TEXT} or {@link WebSocket#BINARY} which indicates the format of the message.
*/
void onMessage(ResponseBody message) throws IOException;
/**
* Called when a server pong is received. This is usually a result of calling {@link
* WebSocket#ping(ByteString)} but might also be unsolicited directly from the server.
*/
void onPong(ByteString payload);
/**
* Called when the server sends a close message. This may have been initiated from a call to
* {@link WebSocket#close(int, String) close()} or as an unprompted message from the server.
* If you did not explicitly call {@link WebSocket#close(int, String) close()}, you do not need
* to do so in response to this callback. A matching close frame is automatically sent back to
* the server.
*
* @param code The <a href="http://tools.ietf.org/html/rfc6455#section-7.4.1">RFC-compliant</a>
* status code.
* @param reason Reason for close or an empty string.
*/
void onClose(int code, String reason);
/**
* Called when the transport or protocol layer of this web socket errors during communication, or
* when another listener callback throws an exception. If the web socket was successfully
* {@linkplain #onOpen opened} before this callback, it will have been closed automatically and
* future interactions with it will throw {@link IOException}.
*
* @param response Non-null when the failure is because of an unexpected HTTP response (e.g.,
* failed upgrade, non-101 response code, etc.).
*/
void onFailure(Throwable t, Response response);
}

View File

@@ -33,8 +33,6 @@ import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.WebSocket;
import okhttp3.internal.Internal;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.Util;
@@ -257,20 +255,12 @@ public final class RealNewWebSocket implements NewWebSocket, WebSocketReader.Fra
}
}
@Override public void onReadMessage(ResponseBody body) throws IOException {
try {
if (body.contentType().equals(WebSocket.TEXT)) {
String text = body.source().readUtf8();
listener.onMessage(this, text);
} else if (body.contentType().equals(WebSocket.BINARY)) {
ByteString bytes = body.source().readByteString();
listener.onMessage(this, bytes);
} else {
throw new IllegalArgumentException();
}
} finally {
Util.closeQuietly(body);
}
@Override public void onReadMessage(String text) throws IOException {
listener.onMessage(this, text);
}
@Override public void onReadMessage(ByteString bytes) throws IOException {
listener.onMessage(this, bytes);
}
@Override public synchronized void onReadPing(final ByteString payload) {

View File

@@ -18,15 +18,9 @@ package okhttp3.internal.ws;
import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
import okhttp3.MediaType;
import okhttp3.ResponseBody;
import okhttp3.WebSocket;
import okio.Buffer;
import okio.BufferedSource;
import okio.ByteString;
import okio.Okio;
import okio.Source;
import okio.Timeout;
import static java.lang.Integer.toHexString;
import static okhttp3.internal.ws.WebSocketProtocol.B0_FLAG_FIN;
@@ -57,7 +51,8 @@ import static okhttp3.internal.ws.WebSocketProtocol.validateCloseCode;
*/
final class WebSocketReader {
public interface FrameCallback {
void onReadMessage(ResponseBody body) throws IOException;
void onReadMessage(String text) throws IOException;
void onReadMessage(ByteString bytes) throws IOException;
void onReadPing(ByteString buffer);
void onReadPong(ByteString buffer);
void onReadClose(int code, String reason);
@@ -67,10 +62,7 @@ final class WebSocketReader {
final BufferedSource source;
final FrameCallback frameCallback;
final Source framedMessageSource = new FramedMessageSource();
boolean closed;
boolean messageClosed;
// Stateful data about the current frame.
int opcode;
@@ -209,37 +201,18 @@ final class WebSocketReader {
}
private void readMessageFrame() throws IOException {
final MediaType type;
switch (opcode) {
case OPCODE_TEXT:
type = WebSocket.TEXT;
break;
case OPCODE_BINARY:
type = WebSocket.BINARY;
break;
default:
throw new ProtocolException("Unknown opcode: " + toHexString(opcode));
int opcode = this.opcode;
if (opcode != OPCODE_TEXT && opcode != OPCODE_BINARY) {
throw new ProtocolException("Unknown opcode: " + toHexString(opcode));
}
final BufferedSource source = Okio.buffer(framedMessageSource);
ResponseBody body = new ResponseBody() {
@Override public MediaType contentType() {
return type;
}
Buffer message = new Buffer();
readMessage(message);
@Override public long contentLength() {
return -1;
}
@Override public BufferedSource source() {
return source;
}
};
messageClosed = false;
frameCallback.onReadMessage(body);
if (!messageClosed) {
throw new IllegalStateException("Listener failed to call close on message payload.");
if (opcode == OPCODE_TEXT) {
frameCallback.onReadMessage(message.readUtf8());
} else {
frameCallback.onReadMessage(message.readByteString());
}
}
@@ -255,28 +228,27 @@ final class WebSocketReader {
}
/**
* A special source which knows how to read a message body across one or more frames. Control
* frames that occur between fragments will be processed. If the message payload is masked this
* will unmask as it's being processed.
* Reads a message body into across one or more frames. Control frames that occur between
* fragments will be processed. If the message payload is masked this will unmask as it's being
* processed.
*/
final class FramedMessageSource implements Source {
@Override public long read(Buffer sink, long byteCount) throws IOException {
private void readMessage(Buffer sink) throws IOException {
while (true) {
if (closed) throw new IOException("closed");
if (messageClosed) throw new IllegalStateException("closed");
if (frameBytesRead == frameLength) {
if (isFinalFrame) return -1; // We are exhausted and have no continuations.
if (isFinalFrame) return; // We are exhausted and have no continuations.
readUntilNonControlFrame();
if (opcode != OPCODE_CONTINUATION) {
throw new ProtocolException("Expected continuation opcode. Got: " + toHexString(opcode));
}
if (isFinalFrame && frameLength == 0) {
return -1; // Fast-path for empty final frame.
return; // Fast-path for empty final frame.
}
}
long toRead = Math.min(byteCount, frameLength - frameBytesRead);
long toRead = frameLength - frameBytesRead;
long read;
if (isMasked) {
@@ -291,24 +263,6 @@ final class WebSocketReader {
}
frameBytesRead += read;
return read;
}
@Override public Timeout timeout() {
return source.timeout();
}
@Override public void close() throws IOException {
if (messageClosed) return;
messageClosed = true;
if (closed) return;
// Exhaust the remainder of the message, if any.
source.skip(frameLength - frameBytesRead);
while (!isFinalFrame) {
readUntilNonControlFrame();
source.skip(frameLength);
}
}
}
}