mirror of
https://github.com/facebookincubator/mvfst.git
synced 2025-08-08 09:42:06 +03:00
Refactor writeChain function signature not to reutrn IOBuf
Summary: Updating signature of writeChain to stop returning IOBuf, as implementation never actually returns back buffer and always writes full buffer. Reviewed By: mjoras Differential Revision: D21740607 fbshipit-source-id: f473ed8f3c6c6cbe2dd5db8f1247c912f3e77d0b
This commit is contained in:
committed by
Facebook GitHub Bot
parent
a5fc482c6a
commit
7a50ca13c7
@@ -798,15 +798,9 @@ class QuicSocket {
|
|||||||
* transport when the peer has acknowledged the receipt of all the data/eof
|
* transport when the peer has acknowledged the receipt of all the data/eof
|
||||||
* passed to write.
|
* passed to write.
|
||||||
*
|
*
|
||||||
* A returned IOBuf indicates that the passed data exceeded the transport
|
|
||||||
* flow control window or send buffer space. The application must call write
|
|
||||||
* with this data again later, and should be a signal to apply backpressure.
|
|
||||||
* If EOF was true or a delivery callback was set they also need to be
|
|
||||||
* passed again later. See notifyPendingWrite to register for a callback.
|
|
||||||
*
|
|
||||||
* An error code is present if there was an error with the write.
|
* An error code is present if there was an error with the write.
|
||||||
*/
|
*/
|
||||||
using WriteResult = folly::Expected<Buf, LocalErrorCode>;
|
using WriteResult = folly::Expected<folly::Unit, LocalErrorCode>;
|
||||||
virtual WriteResult writeChain(
|
virtual WriteResult writeChain(
|
||||||
StreamId id,
|
StreamId id,
|
||||||
Buf data,
|
Buf data,
|
||||||
|
@@ -1899,7 +1899,7 @@ QuicSocket::WriteResult QuicTransportBase::writeChain(
|
|||||||
std::string("writeChain() error")));
|
std::string("writeChain() error")));
|
||||||
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
|
return folly::makeUnexpected(LocalErrorCode::INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
return nullptr;
|
return folly::unit;
|
||||||
}
|
}
|
||||||
|
|
||||||
folly::Expected<folly::Unit, LocalErrorCode>
|
folly::Expected<folly::Unit, LocalErrorCode>
|
||||||
|
@@ -140,21 +140,15 @@ class MockQuicSocket : public QuicSocket {
|
|||||||
MOCK_METHOD1(
|
MOCK_METHOD1(
|
||||||
unregisterStreamWriteCallback,
|
unregisterStreamWriteCallback,
|
||||||
folly::Expected<folly::Unit, LocalErrorCode>(StreamId));
|
folly::Expected<folly::Unit, LocalErrorCode>(StreamId));
|
||||||
folly::Expected<Buf, LocalErrorCode> writeChain(
|
folly::Expected<folly::Unit, LocalErrorCode> writeChain(
|
||||||
StreamId id,
|
StreamId id,
|
||||||
Buf data,
|
Buf data,
|
||||||
bool eof,
|
bool eof,
|
||||||
bool cork,
|
bool cork,
|
||||||
DeliveryCallback* cb) override {
|
DeliveryCallback* cb) override {
|
||||||
SharedBuf sharedData(data.release());
|
SharedBuf sharedData(data.release());
|
||||||
auto res = writeChain(id, sharedData, eof, cork, cb);
|
return writeChain(id, sharedData, eof, cork, cb);
|
||||||
if (res.hasError()) {
|
|
||||||
return folly::makeUnexpected(res.error());
|
|
||||||
} else {
|
|
||||||
return Buf(res.value());
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
using WriteResult = folly::Expected<folly::IOBuf*, LocalErrorCode>;
|
|
||||||
MOCK_METHOD5(
|
MOCK_METHOD5(
|
||||||
writeChain,
|
writeChain,
|
||||||
WriteResult(StreamId, SharedBuf, bool, bool, DeliveryCallback*));
|
WriteResult(StreamId, SharedBuf, bool, bool, DeliveryCallback*));
|
||||||
|
@@ -1043,7 +1043,7 @@ TEST_F(QuicTransportImplTest, ConnectionErrorOnWrite) {
|
|||||||
auto stream = transport->createBidirectionalStream().value();
|
auto stream = transport->createBidirectionalStream().value();
|
||||||
EXPECT_CALL(*socketPtr, write(_, _))
|
EXPECT_CALL(*socketPtr, write(_, _))
|
||||||
.WillOnce(SetErrnoAndReturn(ENETUNREACH, -1));
|
.WillOnce(SetErrnoAndReturn(ENETUNREACH, -1));
|
||||||
QuicSocket::WriteResult result = transport->writeChain(
|
transport->writeChain(
|
||||||
stream, folly::IOBuf::copyBuffer("Hey"), true, false, nullptr);
|
stream, folly::IOBuf::copyBuffer("Hey"), true, false, nullptr);
|
||||||
transport->addDataToStream(
|
transport->addDataToStream(
|
||||||
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
|
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
|
||||||
@@ -1074,7 +1074,7 @@ TEST_F(QuicTransportImplTest, ReadErrorUnsanitizedErrorMsg) {
|
|||||||
throw std::runtime_error("You need to calm down.");
|
throw std::runtime_error("You need to calm down.");
|
||||||
return 0;
|
return 0;
|
||||||
}));
|
}));
|
||||||
QuicSocket::WriteResult result = transport->writeChain(
|
transport->writeChain(
|
||||||
stream,
|
stream,
|
||||||
folly::IOBuf::copyBuffer("You are being too loud."),
|
folly::IOBuf::copyBuffer("You are being too loud."),
|
||||||
true,
|
true,
|
||||||
@@ -1097,7 +1097,7 @@ TEST_F(QuicTransportImplTest, ConnectionErrorUnhandledException) {
|
|||||||
throw std::runtime_error("Well there's your problem");
|
throw std::runtime_error("Well there's your problem");
|
||||||
return 0;
|
return 0;
|
||||||
}));
|
}));
|
||||||
QuicSocket::WriteResult result = transport->writeChain(
|
transport->writeChain(
|
||||||
stream, folly::IOBuf::copyBuffer("Hey"), true, false, nullptr);
|
stream, folly::IOBuf::copyBuffer("Hey"), true, false, nullptr);
|
||||||
transport->addDataToStream(
|
transport->addDataToStream(
|
||||||
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
|
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
|
||||||
|
@@ -169,11 +169,6 @@ class EchoClient : public quic::QuicSocket::ConnectionCallback,
|
|||||||
auto res = quicClient_->writeChain(id, message->clone(), true, false);
|
auto res = quicClient_->writeChain(id, message->clone(), true, false);
|
||||||
if (res.hasError()) {
|
if (res.hasError()) {
|
||||||
LOG(ERROR) << "EchoClient writeChain error=" << uint32_t(res.error());
|
LOG(ERROR) << "EchoClient writeChain error=" << uint32_t(res.error());
|
||||||
} else if (res.value()) {
|
|
||||||
LOG(INFO) << "EchoClient socket did not accept all data, buffering len="
|
|
||||||
<< res.value()->computeChainDataLength();
|
|
||||||
data.append(std::move(res.value()));
|
|
||||||
quicClient_->notifyPendingWriteOnStream(id, this);
|
|
||||||
} else {
|
} else {
|
||||||
auto str = message->moveToFbString().toStdString();
|
auto str = message->moveToFbString().toStdString();
|
||||||
LOG(INFO) << "EchoClient wrote \"" << str << "\""
|
LOG(INFO) << "EchoClient wrote \"" << str << "\""
|
||||||
|
@@ -105,11 +105,6 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
|
|||||||
sock->writeChain(id, std::move(echoedData), true, false, nullptr);
|
sock->writeChain(id, std::move(echoedData), true, false, nullptr);
|
||||||
if (res.hasError()) {
|
if (res.hasError()) {
|
||||||
LOG(ERROR) << "write error=" << toString(res.error());
|
LOG(ERROR) << "write error=" << toString(res.error());
|
||||||
} else if (res.value()) {
|
|
||||||
LOG(INFO) << "socket did not accept all data, buffering len="
|
|
||||||
<< res.value()->computeChainDataLength();
|
|
||||||
data.first.append(std::move(res.value()));
|
|
||||||
sock->notifyPendingWriteOnStream(id, this);
|
|
||||||
} else {
|
} else {
|
||||||
// echo is done, clear EOF
|
// echo is done, clear EOF
|
||||||
data.second = false;
|
data.second = false;
|
||||||
@@ -149,11 +144,6 @@ class EchoHandler : public quic::QuicSocket::ConnectionCallback,
|
|||||||
auto res = sock->writeChain(id, originalData.move(), true, false, nullptr);
|
auto res = sock->writeChain(id, originalData.move(), true, false, nullptr);
|
||||||
if (res.hasError()) {
|
if (res.hasError()) {
|
||||||
LOG(ERROR) << "write error=" << toString(res.error());
|
LOG(ERROR) << "write error=" << toString(res.error());
|
||||||
} else if (res.value()) {
|
|
||||||
LOG(INFO) << "socket did not accept all data, buffering len="
|
|
||||||
<< res.value()->computeChainDataLength();
|
|
||||||
originalData.append(std::move(res.value()));
|
|
||||||
sock->notifyPendingWriteOnStream(id, this);
|
|
||||||
} else {
|
} else {
|
||||||
// echo is done, clear EOF
|
// echo is done, clear EOF
|
||||||
data.second = false;
|
data.second = false;
|
||||||
|
@@ -49,7 +49,7 @@ TEST_F(QuicSocketTest, simple) {
|
|||||||
EXPECT_CALL(*socket_, readNaked(3, _))
|
EXPECT_CALL(*socket_, readNaked(3, _))
|
||||||
.WillOnce(Return(readResult("hello world", true)));
|
.WillOnce(Return(readResult("hello world", true)));
|
||||||
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr))
|
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr))
|
||||||
.WillOnce(Return(nullptr));
|
.WillOnce(Return(folly::unit));
|
||||||
handler_.readAvailable(3);
|
handler_.readAvailable(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -64,29 +64,6 @@ TEST_F(QuicSocketTest, multiple_reads) {
|
|||||||
EXPECT_CALL(*socket_, readNaked(3, _))
|
EXPECT_CALL(*socket_, readNaked(3, _))
|
||||||
.WillOnce(Return(readResult("world", true)));
|
.WillOnce(Return(readResult("world", true)));
|
||||||
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr))
|
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr))
|
||||||
.WillOnce(Return(nullptr));
|
.WillOnce(Return(folly::unit));
|
||||||
handler_.readAvailable(3);
|
handler_.readAvailable(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(QuicSocketTest, blocked_write) {
|
|
||||||
InSequence enforceOrder;
|
|
||||||
openStream(3);
|
|
||||||
|
|
||||||
EXPECT_CALL(*socket_, readNaked(3, _))
|
|
||||||
.WillOnce(Return(readResult("hello world", true)));
|
|
||||||
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr))
|
|
||||||
.WillOnce(Invoke(
|
|
||||||
[](StreamId,
|
|
||||||
MockQuicSocket::SharedBuf b,
|
|
||||||
bool,
|
|
||||||
bool,
|
|
||||||
MockQuicSocket::DeliveryCallback*) {
|
|
||||||
return IOBuf::copyBuffer(b->data(), b->length()).release();
|
|
||||||
}));
|
|
||||||
EXPECT_CALL(*socket_, notifyPendingWriteOnStream(3, _));
|
|
||||||
handler_.readAvailable(3);
|
|
||||||
|
|
||||||
EXPECT_CALL(*socket_, writeChain(3, _, true, false, nullptr))
|
|
||||||
.WillOnce(Return(nullptr));
|
|
||||||
handler_.onStreamWriteReady(3, 100);
|
|
||||||
}
|
|
||||||
|
Reference in New Issue
Block a user