diff --git a/quic/api/QuicBatchWriter.cpp b/quic/api/QuicBatchWriter.cpp index dcf1d925b..4daa0876b 100644 --- a/quic/api/QuicBatchWriter.cpp +++ b/quic/api/QuicBatchWriter.cpp @@ -161,8 +161,31 @@ ssize_t SendmmsgPacketBatchWriter::write( return sock.write(address, vec, iovec_len); } - int ret = sock.writem( - folly::range(&address, &address + 1), bufs_.data(), bufs_.size()); + size_t numChainedBuffers = 0; + for (auto& buf : bufs_) { + numChainedBuffers += buf->countChainElements(); + } + + int ret = 0; + if (numChainedBuffers <= kNumIovecBufferChains && + bufs_.size() < kNumIovecBufferChains) { + // We don't allocate arrays on the heap + iovec vec[kNumIovecBufferChains]; + size_t messageSizes[kNumIovecBufferChains]; + fillIovecAndMessageSizes(vec, messageSizes, kNumIovecBufferChains); + sock.writem( + folly::range(&address, &address + 1), vec, messageSizes, bufs_.size()); + } else { + // We allocate the arrays on the heap + std::unique_ptr vec(new iovec[numChainedBuffers]); + std::unique_ptr messageSizes(new size_t[bufs_.size()]); + fillIovecAndMessageSizes(vec.get(), messageSizes.get(), numChainedBuffers); + sock.writem( + folly::range(&address, &address + 1), + vec.get(), + messageSizes.get(), + bufs_.size()); + } if (ret <= 0) { return ret; @@ -177,6 +200,21 @@ ssize_t SendmmsgPacketBatchWriter::write( return 0; } +void SendmmsgPacketBatchWriter::fillIovecAndMessageSizes( + iovec* vec, + size_t* messageSizes, + size_t iovecLen) { + size_t currentIovecIndex = 0; + for (uint32_t i = 0; i < bufs_.size(); i++) { + size_t numIovecs = + bufs_.at(i) + ->fillIov(vec + currentIovecIndex, iovecLen - currentIovecIndex) + .numIovecs; + messageSizes[i] = numIovecs; + currentIovecIndex += numIovecs; + } +} + bool useSinglePacketInplaceBatchWriter( uint32_t maxBatchSize, quic::DataPathType dataPathType) { diff --git a/quic/api/QuicBatchWriter.h b/quic/api/QuicBatchWriter.h index 81afa6e08..cfc0f57e4 100644 --- a/quic/api/QuicBatchWriter.h +++ b/quic/api/QuicBatchWriter.h @@ -147,6 +147,9 @@ class SendmmsgPacketBatchWriter : public BatchWriter { override; private: + void + fillIovecAndMessageSizes(iovec* vec, size_t* messageSizes, size_t iovecLen); + // max number of buffer chains we can accumulate before we need to flush size_t maxBufs_{1}; // size of data in all the buffers diff --git a/quic/api/test/QuicBatchWriterTest.cpp b/quic/api/test/QuicBatchWriterTest.cpp index f3799d2f8..8d2dfc195 100644 --- a/quic/api/test/QuicBatchWriterTest.cpp +++ b/quic/api/test/QuicBatchWriterTest.cpp @@ -249,6 +249,147 @@ TEST_F(QuicBatchWriterTest, TestBatchingSendmmsg) { } } +TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgInplaceIovecMatches) { + // In this test case, we don't surpass the kNumIovecBufferChains limit + // (i.e. the number of contiguous buffers we are sending) + folly::EventBase evb; + std::shared_ptr qEvb = + std::make_shared(&evb); + quic::test::MockAsyncUDPSocket sock(qEvb); + + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG, + kBatchNum, + false, /* enable backpressure */ + DataPathType::ChainedMemory, + conn_, + gsoSupported_); + CHECK(batchWriter); + + std::vector messages{"It", "is", "sunny!"}; + + CHECK(batchWriter->empty()); + CHECK_EQ(batchWriter->size(), 0); + size_t size = 0; + for (auto& message : messages) { + auto buf = folly::IOBuf::copyBuffer( + folly::ByteRange((unsigned char*)message.data(), message.size())); + batchWriter->append( + std::move(buf), message.size(), folly::SocketAddress(), nullptr); + size += message.size(); + CHECK_EQ(batchWriter->size(), size); + } + + EXPECT_CALL(sock, writem(_, _, _, _)) + .Times(1) + .WillOnce(Invoke([&](folly::Range addrs, + iovec* iovecs, + size_t* messageSizes, + size_t count) { + EXPECT_EQ(addrs.size(), 1); + EXPECT_EQ(count, messages.size()); + + size_t currentIovIndex = 0; + for (size_t i = 0; i < count; i++) { + auto wrappedIovBuffer = + folly::IOBuf::wrapIov(iovecs + currentIovIndex, messageSizes[i]); + currentIovIndex += messageSizes[i]; + + folly::IOBufEqualTo eq; + EXPECT_TRUE( + eq(wrappedIovBuffer, + folly::IOBuf::copyBuffer(folly::ByteRange( + (unsigned char*)messages[i].data(), messages[i].size())))); + } + + return 0; + })); + + batchWriter->write(sock, folly::SocketAddress()); +} + +TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgNewlyAllocatedIovecMatches) { + // In this test case, we surpass the kNumIovecBufferChains limit + // (i.e. the number of contiguous buffers we are sending) + folly::EventBase evb; + std::shared_ptr qEvb = + std::make_shared(&evb); + quic::test::MockAsyncUDPSocket sock(qEvb); + + auto batchWriter = quic::BatchWriterFactory::makeBatchWriter( + quic::QuicBatchingMode::BATCHING_MODE_SENDMMSG, + kBatchNum, + false, /* enable backpressure */ + DataPathType::ChainedMemory, + conn_, + gsoSupported_); + CHECK(batchWriter); + + std::vector> messages{ + {"It", "is", "sunny!"}, + {"but", "it", "is", "so", "cold"}, + {"my", + "jacket", + "isn't", + "warm", + "enough", + "and", + "my", + "hands", + "are", + "freezing"}}; + + CHECK(batchWriter->empty()); + CHECK_EQ(batchWriter->size(), 0); + + std::vector buffers; + + size_t size = 0; + for (auto& message : messages) { + auto buf = std::make_unique(); + for (size_t j = 0; j < message.size(); j++) { + auto partBuf = folly::IOBuf::copyBuffer(folly::ByteRange( + (unsigned char*)message[j].data(), message[j].size())); + buf->appendToChain(std::move(partBuf)); + } + buffers.emplace_back(std::move(buf)); + } + + for (size_t i = 0; i < messages.size(); i++) { + batchWriter->append( + buffers[i]->clone(), + buffers[i]->computeChainDataLength(), + folly::SocketAddress(), + nullptr); + size += buffers[i]->computeChainDataLength(); + CHECK_EQ(batchWriter->size(), size); + } + + EXPECT_CALL(sock, writem(_, _, _, _)) + .Times(1) + .WillOnce(Invoke([&](folly::Range addrs, + iovec* iovecs, + size_t* messageSizes, + size_t count) { + EXPECT_EQ(addrs.size(), 1); + EXPECT_EQ(count, messages.size()); + + size_t currentIovIndex = 0; + for (size_t i = 0; i < count; i++) { + auto wrappedIovBuffer = + folly::IOBuf::wrapIov(iovecs + currentIovIndex, messageSizes[i]); + currentIovIndex += messageSizes[i]; + + folly::IOBufEqualTo eq; + EXPECT_TRUE(eq(wrappedIovBuffer, buffers[i])); + } + + return 0; + })); + + batchWriter->write(sock, folly::SocketAddress()); +} + TEST_F(QuicBatchWriterTest, TestBatchingSendmmsgGSOBatchNum) { folly::EventBase evb; std::shared_ptr qEvb = diff --git a/quic/common/testutil/MockAsyncUDPSocket.h b/quic/common/testutil/MockAsyncUDPSocket.h index 4e326dd42..be6f96a55 100644 --- a/quic/common/testutil/MockAsyncUDPSocket.h +++ b/quic/common/testutil/MockAsyncUDPSocket.h @@ -30,9 +30,7 @@ struct MockAsyncUDPSocket : public FollyQuicAsyncUDPSocket { MOCK_METHOD( int, writem, - (folly::Range, - const std::unique_ptr*, - size_t)); + (folly::Range, iovec*, size_t*, size_t)); MOCK_METHOD( ssize_t, writeGSO, diff --git a/quic/common/udpsocket/FollyQuicAsyncUDPSocket.cpp b/quic/common/udpsocket/FollyQuicAsyncUDPSocket.cpp index 14fc17494..1ea50f70b 100644 --- a/quic/common/udpsocket/FollyQuicAsyncUDPSocket.cpp +++ b/quic/common/udpsocket/FollyQuicAsyncUDPSocket.cpp @@ -66,9 +66,10 @@ ssize_t FollyQuicAsyncUDPSocket::write( int FollyQuicAsyncUDPSocket::writem( folly::Range addrs, - const std::unique_ptr* bufs, + iovec* iov, + size_t* numIovecsInBuffer, size_t count) { - return follySocket_.writem(addrs, bufs, count); + return follySocket_.writemv(addrs, iov, numIovecsInBuffer, count); } ssize_t FollyQuicAsyncUDPSocket::writeGSO( diff --git a/quic/common/udpsocket/FollyQuicAsyncUDPSocket.h b/quic/common/udpsocket/FollyQuicAsyncUDPSocket.h index dd1648a3f..9550bcfb3 100644 --- a/quic/common/udpsocket/FollyQuicAsyncUDPSocket.h +++ b/quic/common/udpsocket/FollyQuicAsyncUDPSocket.h @@ -75,7 +75,8 @@ class FollyQuicAsyncUDPSocket : public QuicAsyncUDPSocketImpl { int writem( folly::Range addrs, - const std::unique_ptr* bufs, + iovec* iov, + size_t* numIovecsInBuffer, size_t count) override; ssize_t writeGSO( diff --git a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp index 43ebf708b..285316c03 100644 --- a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp +++ b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp @@ -113,9 +113,10 @@ int LibevQuicAsyncUDPSocket::getGSO() { } int LibevQuicAsyncUDPSocket::writem( - folly::Range /* addrs */, - const std::unique_ptr* /* bufs */, - size_t /* count */) { + folly::Range, + iovec*, + size_t*, + size_t) { LOG(FATAL) << __func__ << "is not implemented in LibevQuicAsyncUDPSocket"; return -1; } diff --git a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h index f1876aa0d..b450d01af 100644 --- a/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h +++ b/quic/common/udpsocket/LibevQuicAsyncUDPSocket.h @@ -41,7 +41,8 @@ class LibevQuicAsyncUDPSocket : public QuicAsyncUDPSocketImpl { int writem( folly::Range addrs, - const std::unique_ptr* bufs, + iovec* iov, + size_t* numIovecsInBuffer, size_t count) override; ssize_t writeGSO( diff --git a/quic/common/udpsocket/QuicAsyncUDPSocket.h b/quic/common/udpsocket/QuicAsyncUDPSocket.h index e64794dd4..219237185 100644 --- a/quic/common/udpsocket/QuicAsyncUDPSocket.h +++ b/quic/common/udpsocket/QuicAsyncUDPSocket.h @@ -166,13 +166,15 @@ class QuicAsyncUDPSocket { /** * Send the data in buffers to destination. Returns the return code from * ::sendmmsg. - * bufs is an array of std::unique_ptr - * of size num + * iov is an array of iovecs, which is composed of "count" messages that + * need to be sent. Each message can have multiple iovecs. The number of + * iovecs per message is specified in numIovecsInBuffer. */ virtual int writem( - folly::Range /* addrs */, - const std::unique_ptr* /* bufs */, - size_t /* count */) = 0; + folly::Range addrs, + iovec* iov, + size_t* numIovecsInBuffer, + size_t count) = 0; struct WriteOptions { WriteOptions() = default; diff --git a/quic/common/udpsocket/test/QuicAsyncUDPSocketMock.h b/quic/common/udpsocket/test/QuicAsyncUDPSocketMock.h index b66d2a83d..ba6e21097 100644 --- a/quic/common/udpsocket/test/QuicAsyncUDPSocketMock.h +++ b/quic/common/udpsocket/test/QuicAsyncUDPSocketMock.h @@ -28,9 +28,7 @@ class QuicAsyncUDPSocketMock : public QuicAsyncUDPSocket { MOCK_METHOD( (int), writem, - (folly::Range, - const std::unique_ptr*, - size_t)); + (folly::Range, iovec*, size_t*, size_t)); MOCK_METHOD( ssize_t, writeGSO,