diff --git a/okhttp-tests/src/test/java/okhttp3/RecordedResponse.java b/okhttp-tests/src/test/java/okhttp3/RecordedResponse.java index 3410775b6..b292bc056 100644 --- a/okhttp-tests/src/test/java/okhttp3/RecordedResponse.java +++ b/okhttp-tests/src/test/java/okhttp3/RecordedResponse.java @@ -32,11 +32,11 @@ import static org.junit.Assert.assertTrue; public final class RecordedResponse { public final Request request; public final Response response; - public final WebSocket webSocket; + public final NewWebSocket webSocket; public final String body; public final IOException failure; - public RecordedResponse(Request request, Response response, WebSocket webSocket, String body, + public RecordedResponse(Request request, Response response, NewWebSocket webSocket, String body, IOException failure) { this.request = request; this.response = response; diff --git a/okhttp-tests/src/test/java/okhttp3/internal/ws/EmptyWebSocketListener.java b/okhttp-tests/src/test/java/okhttp3/internal/ws/EmptyWebSocketListener.java deleted file mode 100644 index f3ba32e46..000000000 --- a/okhttp-tests/src/test/java/okhttp3/internal/ws/EmptyWebSocketListener.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.ws; - -import java.io.IOException; -import okhttp3.Response; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okio.ByteString; - -public class EmptyWebSocketListener implements WebSocketListener { - @Override public void onOpen(WebSocket webSocket, Response response) { - } - - @Override public void onMessage(ResponseBody message) throws IOException { - } - - @Override public void onPong(ByteString payload) { - } - - @Override public void onClose(int code, String reason) { - } - - @Override public void onFailure(Throwable t, Response response) { - } -} diff --git a/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java b/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java index 40cbfaba7..68f53a2b5 100644 --- a/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java +++ b/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java @@ -140,6 +140,16 @@ public final class NewWebSocketRecorder extends NewWebSocket.Listener { assertEquals(new Message(ByteString.of(payload)), actual); } + public void assertPing(ByteString payload) { + Object actual = nextEvent(); + assertEquals(new Ping(payload), actual); + } + + public void assertPong(ByteString payload) { + Object actual = nextEvent(); + assertEquals(new Pong(payload), actual); + } + public void assertClosing(int code, String reason) { Object actual = nextEvent(); assertEquals(new Closing(code, reason), actual); @@ -198,6 +208,31 @@ public final class NewWebSocketRecorder extends NewWebSocket.Listener { assertEquals(message, failure.t.getMessage()); } + /** Expose this recorder as a frame callback and shim in "ping" events. */ + public WebSocketReader.FrameCallback asFrameCallback() { + return new WebSocketReader.FrameCallback() { + @Override public void onReadMessage(String text) throws IOException { + onMessage(null, text); + } + + @Override public void onReadMessage(ByteString bytes) throws IOException { + onMessage(null, bytes); + } + + @Override public void onReadPing(ByteString payload) { + events.add(new Ping(payload)); + } + + @Override public void onReadPong(ByteString payload) { + events.add(new Pong(payload)); + } + + @Override public void onReadClose(int code, String reason) { + onClosing(null, code, reason); + } + }; + } + static final class Open { final NewWebSocket webSocket; final Response response; @@ -267,6 +302,48 @@ public final class NewWebSocketRecorder extends NewWebSocket.Listener { } } + static final class Ping { + public final ByteString payload; + + public Ping(ByteString payload) { + this.payload = payload; + } + + @Override public String toString() { + return "Ping[" + payload + "]"; + } + + @Override public int hashCode() { + return payload.hashCode(); + } + + @Override public boolean equals(Object other) { + return other instanceof Ping + && ((Ping) other).payload.equals(payload); + } + } + + static final class Pong { + public final ByteString payload; + + public Pong(ByteString payload) { + this.payload = payload; + } + + @Override public String toString() { + return "Pong[" + payload + "]"; + } + + @Override public int hashCode() { + return payload.hashCode(); + } + + @Override public boolean equals(Object other) { + return other instanceof Pong + && ((Pong) other).payload.equals(payload); + } + } + static final class Closing { public final int code; public final String reason; diff --git a/okhttp-tests/src/test/java/okhttp3/internal/ws/WebSocketReaderTest.java b/okhttp-tests/src/test/java/okhttp3/internal/ws/WebSocketReaderTest.java index f1ae34e51..e88f132ee 100644 --- a/okhttp-tests/src/test/java/okhttp3/internal/ws/WebSocketReaderTest.java +++ b/okhttp-tests/src/test/java/okhttp3/internal/ws/WebSocketReaderTest.java @@ -19,24 +19,20 @@ import java.io.EOFException; import java.io.IOException; import java.net.ProtocolException; import java.util.Random; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import okhttp3.ResponseBody; import okhttp3.internal.Util; import okio.Buffer; -import okio.BufferedSource; import okio.ByteString; import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public final class WebSocketReaderTest { private final Buffer data = new Buffer(); - private final WebSocketRecorder callback = new WebSocketRecorder("client"); + private final NewWebSocketRecorder callback = new NewWebSocketRecorder("client"); private final Random random = new Random(0); // Mutually exclusive. Use the one corresponding to the peer whose behavior you wish to test. @@ -149,20 +145,11 @@ public final class WebSocketReaderTest { @Test public void serverHelloTwoChunks() throws IOException { data.write(ByteString.decodeHex("818537fa213d7f9f4d")); // Hel + data.write(ByteString.decodeHex("5158")); // lo - final Buffer sink = new Buffer(); - callback.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onMessage(ResponseBody message) throws IOException { - BufferedSource source = message.source(); - source.readFully(sink, 3); // Read "Hel" - data.write(ByteString.decodeHex("5158")); // lo - source.readFully(sink, 2); // Read "lo" - source.close(); - } - }); serverReader.processNextFrame(); - assertEquals("Hello", sink.readUtf8()); + callback.assertTextMessage("Hello"); } @Test public void clientTwoFrameHello() throws IOException { @@ -250,84 +237,6 @@ public final class WebSocketReaderTest { } } - @Test public void noCloseErrors() throws IOException { - data.write(ByteString.decodeHex("810548656c6c6f")); // Hello - callback.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onMessage(ResponseBody body) throws IOException { - body.source().readAll(new Buffer()); - } - }); - try { - clientReader.processNextFrame(); - fail(); - } catch (IllegalStateException e) { - assertEquals("Listener failed to call close on message payload.", e.getMessage()); - } - } - - @Test public void closeExhaustsMessage() throws IOException { - data.write(ByteString.decodeHex("810548656c6c6f")); // Hello - data.write(ByteString.decodeHex("810448657921")); // Hey! - - final Buffer sink = new Buffer(); - callback.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onMessage(ResponseBody message) throws IOException { - message.source().read(sink, 3); - message.close(); - } - }); - - clientReader.processNextFrame(); - assertEquals("Hel", sink.readUtf8()); - - clientReader.processNextFrame(); - callback.assertTextMessage("Hey!"); - } - - @Test public void closeExhaustsMessageOverControlFrames() throws IOException { - data.write(ByteString.decodeHex("010348656c")); // Hel - data.write(ByteString.decodeHex("8a00")); // Pong - data.write(ByteString.decodeHex("8a00")); // Pong - data.write(ByteString.decodeHex("80026c6f")); // lo - data.write(ByteString.decodeHex("810448657921")); // Hey! - - final Buffer sink = new Buffer(); - callback.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onMessage(ResponseBody message) throws IOException { - message.source().read(sink, 2); - message.close(); - } - }); - - clientReader.processNextFrame(); - assertEquals("He", sink.readUtf8()); - callback.assertPong(ByteString.EMPTY); - callback.assertPong(ByteString.EMPTY); - - clientReader.processNextFrame(); - callback.assertTextMessage("Hey!"); - } - - @Test public void closedMessageSourceThrows() throws IOException { - data.write(ByteString.decodeHex("810548656c6c6f")); // Hello - - final AtomicReference exception = new AtomicReference<>(); - callback.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onMessage(ResponseBody message) throws IOException { - message.close(); - try { - message.source().readAll(new Buffer()); - fail(); - } catch (IllegalStateException e) { - exception.set(e); - } - } - }); - clientReader.processNextFrame(); - - assertNotNull(exception.get()); - } - @Test public void emptyPingCallsCallback() throws IOException { data.write(ByteString.decodeHex("8900")); // Empty ping clientReader.processNextFrame(); @@ -343,7 +252,7 @@ public final class WebSocketReaderTest { @Test public void emptyCloseCallsCallback() throws IOException { data.write(ByteString.decodeHex("8800")); // Empty close clientReader.processNextFrame(); - callback.assertClose(1005, ""); + callback.assertClosing(1005, ""); } @Test public void closeLengthOfOneThrows() throws IOException { @@ -359,7 +268,7 @@ public final class WebSocketReaderTest { @Test public void closeCallsCallback() throws IOException { data.write(ByteString.decodeHex("880703e848656c6c6f")); // Close with code and reason clientReader.processNextFrame(); - callback.assertClose(1000, "Hello"); + callback.assertClosing(1000, "Hello"); } @Test public void closeOutOfRangeThrows() throws IOException { diff --git a/okhttp-tests/src/test/java/okhttp3/internal/ws/WebSocketRecorder.java b/okhttp-tests/src/test/java/okhttp3/internal/ws/WebSocketRecorder.java deleted file mode 100644 index 91510cdf5..000000000 --- a/okhttp-tests/src/test/java/okhttp3/internal/ws/WebSocketRecorder.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.ws; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import okhttp3.MediaType; -import okhttp3.Response; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okhttp3.internal.platform.Platform; -import okio.Buffer; -import okio.ByteString; - -import static okhttp3.WebSocket.BINARY; -import static okhttp3.WebSocket.TEXT; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -public final class WebSocketRecorder implements WebSocketListener { - private final String name; - private final BlockingQueue events = new LinkedBlockingQueue<>(); - private WebSocketListener delegate; - - public WebSocketRecorder(String name) { - this.name = name; - } - - /** Sets a delegate for handling the next callback to this listener. Cleared after invoked. */ - public void setNextEventDelegate(WebSocketListener delegate) { - this.delegate = delegate; - } - - @Override public void onOpen(WebSocket webSocket, Response response) { - Platform.get().log(Platform.INFO, "[WS " + name + "] onOpen", null); - - WebSocketListener delegate = this.delegate; - if (delegate != null) { - this.delegate = null; - delegate.onOpen(webSocket, response); - } else { - events.add(new Open(webSocket, response)); - } - } - - @Override public void onMessage(ResponseBody message) throws IOException { - Platform.get().log(Platform.INFO, "[WS " + name + "] onMessage", null); - - WebSocketListener delegate = this.delegate; - if (delegate != null) { - this.delegate = null; - delegate.onMessage(message); - } else { - Message event = new Message(message.contentType()); - message.source().readAll(event.buffer); - message.close(); - events.add(event); - } - } - - @Override public void onPong(ByteString payload) { - Platform.get().log(Platform.INFO, "[WS " + name + "] onPong", null); - - WebSocketListener delegate = this.delegate; - if (delegate != null) { - this.delegate = null; - delegate.onPong(payload); - } else { - events.add(new Pong(payload)); - } - } - - @Override public void onClose(int code, String reason) { - Platform.get().log(Platform.INFO, "[WS " + name + "] onClose " + code, null); - - WebSocketListener delegate = this.delegate; - if (delegate != null) { - this.delegate = null; - delegate.onClose(code, reason); - } else { - events.add(new Close(code, reason)); - } - } - - @Override public void onFailure(Throwable t, Response response) { - Platform.get().log(Platform.INFO, "[WS " + name + "] onFailure", t); - - WebSocketListener delegate = this.delegate; - if (delegate != null) { - this.delegate = null; - delegate.onFailure(t, response); - } else { - events.add(new Failure(t, response)); - } - } - - private Object nextEvent() { - try { - Object event = events.poll(10, TimeUnit.SECONDS); - if (event == null) { - throw new AssertionError("Timed out waiting for event."); - } - return event; - } catch (InterruptedException e) { - throw new AssertionError(e); - } - } - - public void assertTextMessage(String payload) { - Message message = new Message(TEXT); - message.buffer.writeUtf8(payload); - Object actual = nextEvent(); - assertEquals(message, actual); - } - - public void assertBinaryMessage(byte[] payload) { - Message message = new Message(BINARY); - message.buffer.write(payload); - Object actual = nextEvent(); - assertEquals(message, actual); - } - - public void assertPong(ByteString payload) { - Object actual = nextEvent(); - assertEquals(new Pong(payload), actual); - } - - public void assertClose(int code, String reason) { - Object actual = nextEvent(); - assertEquals(new Close(code, reason), actual); - } - - public void assertExhausted() { - assertTrue("Remaining events: " + events, events.isEmpty()); - } - - public WebSocket assertOpen() { - Object event = nextEvent(); - if (!(event instanceof Open)) { - throw new AssertionError("Expected Open but was " + event); - } - return ((Open) event).webSocket; - } - - public void assertFailure(Throwable t) { - Object event = nextEvent(); - if (!(event instanceof Failure)) { - throw new AssertionError("Expected Failure but was " + event); - } - Failure failure = (Failure) event; - assertNull(failure.response); - assertSame(t, failure.t); - } - - public void assertFailure(Class cls, String message) { - Object event = nextEvent(); - if (!(event instanceof Failure)) { - throw new AssertionError("Expected Failure but was " + event); - } - Failure failure = (Failure) event; - assertNull(failure.response); - assertEquals(cls, failure.t.getClass()); - assertEquals(message, failure.t.getMessage()); - } - - public void assertFailure(int code, String body, Class cls, String message) - throws IOException { - Object event = nextEvent(); - if (!(event instanceof Failure)) { - throw new AssertionError("Expected Failure but was " + event); - } - Failure failure = (Failure) event; - assertEquals(code, failure.response.code()); - if (body != null) { - assertEquals(body, failure.response.body().string()); - } - assertEquals(cls, failure.t.getClass()); - assertEquals(message, failure.t.getMessage()); - } - - static final class Open { - final WebSocket webSocket; - final Response response; - - Open(WebSocket webSocket, Response response) { - this.webSocket = webSocket; - this.response = response; - } - - @Override public String toString() { - return "Open[" + response + "]"; - } - } - - static final class Failure { - final Throwable t; - final Response response; - - Failure(Throwable t, Response response) { - this.t = t; - this.response = response; - } - - @Override public String toString() { - if (response == null) { - return "Failure[" + t + "]"; - } - return "Failure[" + response + "]"; - } - } - - static final class Message { - public final MediaType mediaType; - public final Buffer buffer = new Buffer(); - - Message(MediaType mediaType) { - this.mediaType = mediaType; - } - - @Override public String toString() { - return "Message[" + mediaType + " " + buffer + "]"; - } - - @Override public int hashCode() { - return mediaType.hashCode() * 37 + buffer.hashCode(); - } - - @Override public boolean equals(Object obj) { - if (obj instanceof Message) { - Message other = (Message) obj; - return mediaType.equals(other.mediaType) && buffer.equals(other.buffer); - } - return false; - } - } - - static final class Pong { - public final ByteString payload; - - Pong(ByteString payload) { - this.payload = payload; - } - - @Override public String toString() { - return "Pong[" + payload + "]"; - } - - @Override public int hashCode() { - return payload.hashCode(); - } - - @Override public boolean equals(Object obj) { - if (obj instanceof Pong) { - Pong other = (Pong) obj; - return payload == null ? other.payload == null : payload.equals(other.payload); - } - return false; - } - } - - static final class Close { - public final int code; - public final String reason; - - Close(int code, String reason) { - this.code = code; - this.reason = reason; - } - - @Override public String toString() { - return "Close[" + code + " " + reason + "]"; - } - - @Override public int hashCode() { - return code * 37 + reason.hashCode(); - } - - @Override public boolean equals(Object obj) { - if (obj instanceof Close) { - Close other = (Close) obj; - return code == other.code && reason.equals(other.reason); - } - return false; - } - } - - /** Expose this recorder as a frame callback and shim in "ping" events. */ - WebSocketReader.FrameCallback asFrameCallback() { - return new WebSocketReader.FrameCallback() { - @Override public void onReadMessage(ResponseBody body) throws IOException { - onMessage(body); - } - - @Override public void onReadPing(ByteString payload) { - events.add(new Ping(payload)); - } - - @Override public void onReadPong(ByteString padload) { - onPong(padload); - } - - @Override public void onReadClose(int code, String reason) { - onClose(code, reason); - } - }; - } - - void assertPing(ByteString payload) { - Object actual = nextEvent(); - assertEquals(new Ping(payload), actual); - } - - static final class Ping { - public final ByteString buffer; - - Ping(ByteString buffer) { - this.buffer = buffer; - } - - @Override public String toString() { - return "Ping[" + buffer + "]"; - } - - @Override public int hashCode() { - return buffer.hashCode(); - } - - @Override public boolean equals(Object obj) { - if (obj instanceof Ping) { - Ping other = (Ping) obj; - return buffer == null ? other.buffer == null : buffer.equals(other.buffer); - } - return false; - } - } -} diff --git a/okhttp/src/main/java/okhttp3/WebSocket.java b/okhttp/src/main/java/okhttp3/WebSocket.java deleted file mode 100644 index 35b02f250..000000000 --- a/okhttp/src/main/java/okhttp3/WebSocket.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3; - -import java.io.IOException; -import okio.ByteString; - -/** Blocking interface to connect and write to a web socket. This class is not thread safe. */ -public interface WebSocket { - /** A {@link MediaType} indicating UTF-8 text frames should be used when sending the message. */ - MediaType TEXT = MediaType.parse("application/vnd.okhttp.websocket+text; charset=utf-8"); - /** A {@link MediaType} indicating binary frames should be used when sending the message. */ - MediaType BINARY = MediaType.parse("application/vnd.okhttp.websocket+binary"); - - /** - * Send a message to the server. - * - * @param message The message body. The {@linkplain RequestBody#contentType() content type} of - * must be either {@link #TEXT} or {@link #BINARY}. - * @throws IOException if unable to write the message. Clients must call {@link #close} when this - * happens to ensure resources are cleaned up. - * @throws IllegalStateException if this web socket was already closed. - */ - void message(RequestBody message) throws IOException; - - /** - * Send a ping to the server. - * - * @param payload Ping payload which must not exceed 125 bytes. Use {@link ByteString#EMPTY} for - * no payload. - * @throws IOException if unable to write the ping. Clients must call {@link #close} when this - * happens to ensure resources are cleaned up. - * @throws IllegalStateException if this web socket was already closed. - */ - void ping(ByteString payload) throws IOException; - - /** - * Send a close indicator to the server. - * - *

The corresponding {@link WebSocketListener} will continue to get messages until its {@link - * WebSocketListener#onClose onClose()} method is called. - * - *

It is an error to call this method before calling close on an active writer. Calling this - * method more than once has no effect. - * - * @param code Status code as defined by Section 7.4 of RFC 6455 or {@code 0}. - * @param reason Reason for shutting down or {@code null}. - * @throws IOException if unable to write the close message. Resources will still be freed. - * @throws IllegalStateException if this web socket was already closed. - */ - void close(int code, String reason) throws IOException; -} diff --git a/okhttp/src/main/java/okhttp3/WebSocketCall.java b/okhttp/src/main/java/okhttp3/WebSocketCall.java deleted file mode 100644 index 3dc18bcba..000000000 --- a/okhttp/src/main/java/okhttp3/WebSocketCall.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3; - -public interface WebSocketCall extends Cloneable { - /** Returns the original request that initiated this call. */ - Request request(); - - /** - * Schedules the request to be executed at some point in the future. - * - *

The {@link OkHttpClient#dispatcher dispatcher} defines when the request will run: usually - * immediately unless there are several other requests currently being executed. - * - *

This client will later call back {@code responseCallback} with either an HTTP response or a - * failure exception. If you {@link #cancel} a request before it completes the callback will not - * be invoked. - * - * @throws IllegalStateException when the call has already been executed. - */ - void enqueue(WebSocketListener listener); - - /** Cancels the request, if possible. Requests that are already complete cannot be canceled. */ - void cancel(); - - /** - * Returns true if this call has been {@linkplain #enqueue(WebSocketListener) enqueued}. It is an - * error to enqueue a call more than once. - */ - boolean isExecuted(); - - boolean isCanceled(); - - /** - * Create a new, identical call to this one which can be enqueued even if this call has already - * been. - */ - WebSocketCall clone(); - - interface Factory { - WebSocketCall newWebSocketCall(Request request); - } -} diff --git a/okhttp/src/main/java/okhttp3/WebSocketListener.java b/okhttp/src/main/java/okhttp3/WebSocketListener.java deleted file mode 100644 index 8d853f7d9..000000000 --- a/okhttp/src/main/java/okhttp3/WebSocketListener.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3; - -import java.io.IOException; -import okio.ByteString; - -/** - * Listener for server-initiated messages on a connected {@link WebSocket}. All callbacks will be - * called on a single thread. - * - *

Lifecycle Rules

- * - */ -public interface WebSocketListener { - /** - * Called when the request has successfully been upgraded to a web socket. Do not use this - * callback to write to the web socket. Start a new thread or use another thread in your - * application. - */ - void onOpen(WebSocket webSocket, Response response); - - /** - * Called when a server message is received. The {@code type} indicates whether the {@code - * payload} should be interpreted as UTF-8 text or binary data. - * - *

Implementations must call {@code source.close()} before returning. This - * indicates completion of parsing the message payload and will consume any remaining bytes in the - * message. - * - *

The {@linkplain ResponseBody#contentType() content type} of {@code message} will be either - * {@link WebSocket#TEXT} or {@link WebSocket#BINARY} which indicates the format of the message. - */ - void onMessage(ResponseBody message) throws IOException; - - /** - * Called when a server pong is received. This is usually a result of calling {@link - * WebSocket#ping(ByteString)} but might also be unsolicited directly from the server. - */ - void onPong(ByteString payload); - - /** - * Called when the server sends a close message. This may have been initiated from a call to - * {@link WebSocket#close(int, String) close()} or as an unprompted message from the server. - * If you did not explicitly call {@link WebSocket#close(int, String) close()}, you do not need - * to do so in response to this callback. A matching close frame is automatically sent back to - * the server. - * - * @param code The RFC-compliant - * status code. - * @param reason Reason for close or an empty string. - */ - void onClose(int code, String reason); - - /** - * Called when the transport or protocol layer of this web socket errors during communication, or - * when another listener callback throws an exception. If the web socket was successfully - * {@linkplain #onOpen opened} before this callback, it will have been closed automatically and - * future interactions with it will throw {@link IOException}. - * - * @param response Non-null when the failure is because of an unexpected HTTP response (e.g., - * failed upgrade, non-101 response code, etc.). - */ - void onFailure(Throwable t, Response response); -} diff --git a/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java b/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java index a288e2e7d..fb5888e25 100644 --- a/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java +++ b/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java @@ -33,8 +33,6 @@ import okhttp3.OkHttpClient; import okhttp3.Protocol; import okhttp3.Request; import okhttp3.Response; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; import okhttp3.internal.Internal; import okhttp3.internal.NamedRunnable; import okhttp3.internal.Util; @@ -257,20 +255,12 @@ public final class RealNewWebSocket implements NewWebSocket, WebSocketReader.Fra } } - @Override public void onReadMessage(ResponseBody body) throws IOException { - try { - if (body.contentType().equals(WebSocket.TEXT)) { - String text = body.source().readUtf8(); - listener.onMessage(this, text); - } else if (body.contentType().equals(WebSocket.BINARY)) { - ByteString bytes = body.source().readByteString(); - listener.onMessage(this, bytes); - } else { - throw new IllegalArgumentException(); - } - } finally { - Util.closeQuietly(body); - } + @Override public void onReadMessage(String text) throws IOException { + listener.onMessage(this, text); + } + + @Override public void onReadMessage(ByteString bytes) throws IOException { + listener.onMessage(this, bytes); } @Override public synchronized void onReadPing(final ByteString payload) { diff --git a/okhttp/src/main/java/okhttp3/internal/ws/WebSocketReader.java b/okhttp/src/main/java/okhttp3/internal/ws/WebSocketReader.java index bb6f5760d..0d9873bfb 100644 --- a/okhttp/src/main/java/okhttp3/internal/ws/WebSocketReader.java +++ b/okhttp/src/main/java/okhttp3/internal/ws/WebSocketReader.java @@ -18,15 +18,9 @@ package okhttp3.internal.ws; import java.io.EOFException; import java.io.IOException; import java.net.ProtocolException; -import okhttp3.MediaType; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; import okio.Buffer; import okio.BufferedSource; import okio.ByteString; -import okio.Okio; -import okio.Source; -import okio.Timeout; import static java.lang.Integer.toHexString; import static okhttp3.internal.ws.WebSocketProtocol.B0_FLAG_FIN; @@ -57,7 +51,8 @@ import static okhttp3.internal.ws.WebSocketProtocol.validateCloseCode; */ final class WebSocketReader { public interface FrameCallback { - void onReadMessage(ResponseBody body) throws IOException; + void onReadMessage(String text) throws IOException; + void onReadMessage(ByteString bytes) throws IOException; void onReadPing(ByteString buffer); void onReadPong(ByteString buffer); void onReadClose(int code, String reason); @@ -67,10 +62,7 @@ final class WebSocketReader { final BufferedSource source; final FrameCallback frameCallback; - final Source framedMessageSource = new FramedMessageSource(); - boolean closed; - boolean messageClosed; // Stateful data about the current frame. int opcode; @@ -209,37 +201,18 @@ final class WebSocketReader { } private void readMessageFrame() throws IOException { - final MediaType type; - switch (opcode) { - case OPCODE_TEXT: - type = WebSocket.TEXT; - break; - case OPCODE_BINARY: - type = WebSocket.BINARY; - break; - default: - throw new ProtocolException("Unknown opcode: " + toHexString(opcode)); + int opcode = this.opcode; + if (opcode != OPCODE_TEXT && opcode != OPCODE_BINARY) { + throw new ProtocolException("Unknown opcode: " + toHexString(opcode)); } - final BufferedSource source = Okio.buffer(framedMessageSource); - ResponseBody body = new ResponseBody() { - @Override public MediaType contentType() { - return type; - } + Buffer message = new Buffer(); + readMessage(message); - @Override public long contentLength() { - return -1; - } - - @Override public BufferedSource source() { - return source; - } - }; - - messageClosed = false; - frameCallback.onReadMessage(body); - if (!messageClosed) { - throw new IllegalStateException("Listener failed to call close on message payload."); + if (opcode == OPCODE_TEXT) { + frameCallback.onReadMessage(message.readUtf8()); + } else { + frameCallback.onReadMessage(message.readByteString()); } } @@ -255,28 +228,27 @@ final class WebSocketReader { } /** - * A special source which knows how to read a message body across one or more frames. Control - * frames that occur between fragments will be processed. If the message payload is masked this - * will unmask as it's being processed. + * Reads a message body into across one or more frames. Control frames that occur between + * fragments will be processed. If the message payload is masked this will unmask as it's being + * processed. */ - final class FramedMessageSource implements Source { - @Override public long read(Buffer sink, long byteCount) throws IOException { + private void readMessage(Buffer sink) throws IOException { + while (true) { if (closed) throw new IOException("closed"); - if (messageClosed) throw new IllegalStateException("closed"); if (frameBytesRead == frameLength) { - if (isFinalFrame) return -1; // We are exhausted and have no continuations. + if (isFinalFrame) return; // We are exhausted and have no continuations. readUntilNonControlFrame(); if (opcode != OPCODE_CONTINUATION) { throw new ProtocolException("Expected continuation opcode. Got: " + toHexString(opcode)); } if (isFinalFrame && frameLength == 0) { - return -1; // Fast-path for empty final frame. + return; // Fast-path for empty final frame. } } - long toRead = Math.min(byteCount, frameLength - frameBytesRead); + long toRead = frameLength - frameBytesRead; long read; if (isMasked) { @@ -291,24 +263,6 @@ final class WebSocketReader { } frameBytesRead += read; - return read; - } - - @Override public Timeout timeout() { - return source.timeout(); - } - - @Override public void close() throws IOException { - if (messageClosed) return; - messageClosed = true; - if (closed) return; - - // Exhaust the remainder of the message, if any. - source.skip(frameLength - frameBytesRead); - while (!isFinalFrame) { - readUntilNonControlFrame(); - source.skip(frameLength); - } } } }