diff --git a/utils/cloudio/CMakeLists.txt b/utils/cloudio/CMakeLists.txt index 3fc10197a..7e6ec1c25 100644 --- a/utils/cloudio/CMakeLists.txt +++ b/utils/cloudio/CMakeLists.txt @@ -4,7 +4,22 @@ set(cloudio_LIB_SRCS SMComm.cpp SMDataFile.cpp SMFileFactory.cpp SMFileSystem.cp add_library(cloudio SHARED ${cloudio_LIB_SRCS}) +# IDBDataFile currently depends on cloudio, which is backward. +# Once cloudio has been turned into a proper plugin for idbdatafile, +# we should be able to reverse the dependency like so: +# target_link_libraries(cloudio idbdatafile messageqcpp loggingcpp) + set_target_properties(cloudio PROPERTIES VERSION 1.0.0 SOVERSION 1) install(TARGETS cloudio DESTINATION ${ENGINE_LIBDIR} COMPONENT libs) +add_executable(cloudio_component_test component_test.cpp) + +# see the comment above and change this dependency to cloudio. Hm +# our lib dependencies seem not to be declared. Punting on that, +# maybe in the future we can have some poor unfortunate intern +# untangle all of that and declare lib dependencies properly. +# For now I'm going to do like the other executables, which means +# nearly everything AFAICT. +target_link_libraries(cloudio_component_test ${ENGINE_LDFLAGS} ${ENGINE_EXEC_LIBS} ${MARIADB_CLIENT_LIBS}) + diff --git a/utils/cloudio/SMComm.cpp b/utils/cloudio/SMComm.cpp index 7c01a203b..5f19c0849 100644 --- a/utils/cloudio/SMComm.cpp +++ b/utils/cloudio/SMComm.cpp @@ -85,7 +85,7 @@ int SMComm::open(const string &filename, const int mode, struct stat *statbuf) err = sockets.send_recv(*command, response); if (err) common_exit(command, response, err); - + check_for_error(command, response, err); memcpy(statbuf, response->buf(), sizeof(*statbuf)); common_exit(command, response, err); diff --git a/utils/cloudio/SMFileFactory.cpp b/utils/cloudio/SMFileFactory.cpp index 1b0b9b192..f4a8b1154 100644 --- a/utils/cloudio/SMFileFactory.cpp +++ b/utils/cloudio/SMFileFactory.cpp @@ -90,7 +90,7 @@ IDBDataFile* SMFileFactory::open(const char *filename, const char *mode, unsigne SMComm *comm = SMComm::get(); struct stat _stat; int err = comm->open(filename, posix_flags, &_stat); - if (!err) + if (err) return NULL; SMDataFile *ret = new SMDataFile(filename, posix_flags, _stat); diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp index f5130a91d..b5e88fd41 100644 --- a/utils/cloudio/SocketPool.cpp +++ b/utils/cloudio/SocketPool.cpp @@ -64,13 +64,6 @@ SocketPool::SocketPool() log(logging::LOG_TYPE_CRITICAL, os.str()); maxSockets = defaultSockets; } - clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); - if (clientSocket < 0) - { - char buf[80], *ptr; - ptr = strerror_r(errno, buf, 80); - throw runtime_error("SocketPool(): Failed to get clientSocket, got " + string(ptr)); - } } SocketPool::~SocketPool() @@ -79,12 +72,12 @@ SocketPool::~SocketPool() for (uint i = 0; i < allSockets.size(); i++) ::close(allSockets[i]); - ::close(clientSocket); } #define sm_check_error \ if (err < 0) \ { \ + cout << "SP: got an error on the socket" << endl; \ returnSocket(sock); \ return -1; \ } @@ -109,49 +102,63 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * count += err; in.advance(err); } + //cout << "SP sent the msg" << endl; out->restart(); uint8_t *outbuf; uint8_t window[8192]; - bool foundHeader = false; length = 0; uint remainingBytes = 0; uint i; - /* TODO: consider adding timeouts on msg recv if we start using tcp sockets */ - while (!foundHeader) + while (1) { // here remainingBytes means the # of bytes from the previous message err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes); sm_check_error; + if (err == 0) + { + remoteClosed(sock); + // TODO, a retry loop + return -1; + } uint endOfData = remainingBytes + err; // for clarity // scan for the 8-byte header. If it is fragmented, move the fragment to the front of the buffer // for the next iteration to handle. - for (i = 0; i <= endOfData - 8 && !foundHeader; i++) + + if (endOfData < 8) + { + remainingBytes = endOfData; + continue; + } + + for (i = 0; i <= endOfData - 8; i++) { if (*((uint *) &window[i]) == storagemanager::SM_MSG_START) { length = *((uint *) &window[i+4]); - foundHeader = true; + break; } } - if (!foundHeader) + if (length == 0) // didn't find the header yet { + // i == endOfData - 7 here remainingBytes = endOfData - i; - if (i != 0) - memmove(window, &window[i], remainingBytes); - else - continue; // if i == 0, then the read was too short to see the whole header, do another read(). + memmove(window, &window[i], remainingBytes); } else { + // i == first byte of the header here // copy the payload fragment we got into the output bytestream + uint startOfPayload = i + 8; // for clarity out->needAtLeast(length); outbuf = out->getInputPtr(); - memcpy(outbuf, &window[i+8], endOfData - (i+8)); - remainingBytes = length - (endOfData - (i+8)); // remainingBytes is now the # of bytes left to read + memcpy(outbuf, &window[startOfPayload], endOfData - startOfPayload); + remainingBytes = length - (endOfData - startOfPayload); // remainingBytes is now the # of bytes left to read + out->advanceInputPtr(endOfData - startOfPayload); + break; // done looking for the header, can fill the output buffer directly now. } } @@ -170,7 +177,7 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream * int SocketPool::getSocket() { boost::mutex::scoped_lock lock(mutex); - int ret; + int clientSocket; if (freeSockets.size() == 0 && allSockets.size() < maxSockets) { @@ -178,29 +185,31 @@ int SocketPool::getSocket() struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; - strcpy(&addr.sun_path[0], storagemanager::socket_name); - ret = ::connect(clientSocket, (const struct sockaddr *) &addr, sizeof(addr)); - if (ret >= 0) - allSockets.push_back(ret); + strcpy(&addr.sun_path[1], &storagemanager::socket_name[1]); // first char is null... + clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + int err = ::connect(clientSocket, (const struct sockaddr *) &addr, sizeof(addr)); + if (err >= 0) + allSockets.push_back(clientSocket); else { int saved_errno = errno; ostringstream os; char buf[80]; os << "SocketPool::getSocket() failed to connect; got '" << strerror_r(saved_errno, buf, 80); + cout << os.str() << endl; log(logging::LOG_TYPE_CRITICAL, os.str()); errno = saved_errno; } - return ret; + return clientSocket; } // wait for a socket to become free while (freeSockets.size() == 0) socketAvailable.wait(lock); - ret = freeSockets.front(); + clientSocket = freeSockets.front(); freeSockets.pop_front(); - return ret; + return clientSocket; } void SocketPool::returnSocket(const int sock) @@ -210,4 +219,17 @@ void SocketPool::returnSocket(const int sock) socketAvailable.notify_one(); } +void SocketPool::remoteClosed(const int sock) +{ + boost::mutex::scoped_lock lock(mutex); + ::close(sock); + for (vector::iterator i = allSockets.begin(); i != allSockets.end(); ++i) + if (*i == sock) + { + allSockets.erase(i, i+1); + break; + } +} + + } diff --git a/utils/cloudio/SocketPool.h b/utils/cloudio/SocketPool.h index 60468bee3..cc7dbaa92 100644 --- a/utils/cloudio/SocketPool.h +++ b/utils/cloudio/SocketPool.h @@ -42,14 +42,15 @@ class SocketPool : public boost::noncopyable private: int getSocket(); void returnSocket(const int sock); + void remoteClosed(const int sock); std::vector allSockets; std::deque freeSockets; boost::mutex mutex; boost::condition_variable socketAvailable; - int clientSocket; uint maxSockets; static const uint defaultSockets = 20; + }; } diff --git a/utils/cloudio/component_test.cpp b/utils/cloudio/component_test.cpp new file mode 100644 index 000000000..47b32f0f2 --- /dev/null +++ b/utils/cloudio/component_test.cpp @@ -0,0 +1,184 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + + + +#include +#include +#include +#include +#include +#include +#include + +#include "SMFileSystem.h" +#include "SMDataFile.h" +#include "SMFileFactory.h" +#include "sm_exceptions.h" +#include "messageFormat.h" + +/* The purpose of this is to test the idbdatafile cloud classes using a dummy +SessionManager defined in this file. */ + +#undef NDEBUG +#include + +using namespace idbdatafile; +using namespace std; + +volatile bool die = false; +int errCode = 0; + +/* Right now this will just accept one connection & send a fixed error response */ +void error_server_thread() +{ + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(&addr.sun_path[1], &storagemanager::socket_name[1]); + + int server_socket = ::socket(AF_UNIX, SOCK_STREAM, 0); + assert(server_socket >= 0); + + int err = ::bind(server_socket, (struct sockaddr *) &addr, sizeof(addr)); + assert(err == 0); + + err = ::listen(server_socket, 1); + assert(err == 0); + + socklen_t addrlen; + struct sockaddr_un client_addr; + memset(&client_addr, 0, sizeof(client_addr)); + int client_socket = ::accept(server_socket, (struct sockaddr *) &client_addr, &addrlen); + assert(client_socket >= 0); + + //cout << "server thread got a connection" << endl; + + uint8_t buf[4096]; + uint32_t response[4] = { storagemanager::SM_MSG_START, 8, (uint32_t ) -1, EINVAL }; + uint remainingBytes = 0; + + while (!die) + { + /* This just scans for SM_MSG_START, and as it finds them it sends a generic error + response. */ + + err = ::recv(client_socket, &buf[remainingBytes], 4096 - remainingBytes, MSG_DONTWAIT); + if (err < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + //cout << "server looping..." << endl; + sleep(1); // who cares + continue; + } + else { + char errbuf[80]; + //cout << "server thread got an error: " << strerror_r(errno, errbuf, 80) << endl; + close(client_socket); + errCode = -1; + die = true; + return; + } + } + + //cout << "server thread got some data" << endl; + uint endOfData = remainingBytes + err; + uint i; + if (endOfData < 4) + { + remainingBytes = endOfData; + continue; + } + for (i = 0; i <= endOfData - 4; i++) + { + if (*((uint32_t *) &buf[i]) == storagemanager::SM_MSG_START) { + //cout << "server thread found a msg start magic" << endl; + err = ::send(client_socket, response, 16, 0); + assert(err > 0); + } + } + memmove(buf, &buf[i], endOfData - i); // should be the trailing 3 bytes of the data + remainingBytes = endOfData - i; + } +} + +int test1() +{ + /* Start a server thread that returns generic errors */ + boost::thread server_thread(error_server_thread); + + /* Instantiate each of SM subclasses, call each function, and verify the expected error response */ + int err; + SMFileFactory factory; + + cout << "open" << endl; + IDBDataFile *f = factory.open("dummy", "r", 0, 0); + assert(f == NULL && errno == EINVAL && !die); + + SMFileSystem filesystem; + + cout << "compressedSize" << endl; + bool gotException = false; + try { + filesystem.compressedSize("dummy"); + } + catch (NotImplementedYet &) { + gotException = true; + } + assert(gotException && !die); + + cout << "copyFile" << endl; + try { + filesystem.copyFile("dummy1", "dummy2"); + } + catch (NotImplementedYet &) { + gotException = true; + } + assert(gotException && !die); + + cout << "exists" << endl; + err = filesystem.exists("dummy"); + assert(!err); + + cout << "filesystemisup" << endl; + err = filesystem.filesystemIsUp(); + assert(!err && !die); + + cout << "isdir" << endl; + err = filesystem.isDir("dummy"); + assert(!err && !die); + + cout << "listdirectory" << endl; + list filenames; + err = filesystem.listDirectory("dummy", filenames); + assert(err == -1 && filenames.empty() && !die); + + // done, return errCode + die = true; + server_thread.join(); + return errCode; +} + +int main() +{ + int ret; + + ret = test1(); + return ret; +} + diff --git a/utils/idbdatafile/IDBFactory.cpp b/utils/idbdatafile/IDBFactory.cpp index d15998c9f..e1fbf1da9 100644 --- a/utils/idbdatafile/IDBFactory.cpp +++ b/utils/idbdatafile/IDBFactory.cpp @@ -52,6 +52,9 @@ bool IDBFactory::installDefaultPlugins() s_plugins[IDBDataFile::BUFFERED] = FileFactoryEnt(IDBDataFile::BUFFERED, "buffered", new BufferedFileFactory(), new PosixFileSystem()); s_plugins[IDBDataFile::UNBUFFERED] = FileFactoryEnt(IDBDataFile::UNBUFFERED, "unbuffered", new UnbufferedFileFactory(), new PosixFileSystem()); + + // TODO: use the installPlugin fcn below instead of declaring this statically, then remove the dependency + // IDBDatafile -> cloudio s_plugins[IDBDataFile::CLOUD] = FileFactoryEnt(IDBDataFile::CLOUD, "cloud", new SMFileFactory(), new SMFileSystem()); return false;