You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-15 12:09:09 +03:00
Implemented the SM-side copy op & a couple minor changes.
This commit is contained in:
@@ -16,6 +16,7 @@ set(storagemanager_SRCS
|
|||||||
src/TruncateTask.cpp
|
src/TruncateTask.cpp
|
||||||
src/UnlinkTask.cpp
|
src/UnlinkTask.cpp
|
||||||
src/WriteTask.cpp
|
src/WriteTask.cpp
|
||||||
|
src/CopyTask.cpp
|
||||||
src/IOCoordinator.cpp
|
src/IOCoordinator.cpp
|
||||||
src/SessionManager.cpp
|
src/SessionManager.cpp
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -60,7 +60,8 @@ enum Opcodes {
|
|||||||
APPEND,
|
APPEND,
|
||||||
TRUNCATE,
|
TRUNCATE,
|
||||||
LIST_DIRECTORY,
|
LIST_DIRECTORY,
|
||||||
PING
|
PING,
|
||||||
|
COPY
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -74,7 +75,8 @@ enum Opcodes {
|
|||||||
|
|
||||||
On success, what follows is any output parameters from the call.
|
On success, what follows is any output parameters from the call.
|
||||||
|
|
||||||
TBD: Require filenames to be NULL-terminated. Currently they are not.
|
Note: filenames and pathnames in the following parameters should
|
||||||
|
be absolute rather than relative.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -240,6 +242,25 @@ struct ping_cmd {
|
|||||||
uint8_t opcode;
|
uint8_t opcode;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
COPY
|
||||||
|
----
|
||||||
|
command format:
|
||||||
|
1-byte opcode|4-byte filename1 length|filename2|4-byte filename2 length|filename2
|
||||||
|
|
||||||
|
response format:
|
||||||
|
*/
|
||||||
|
|
||||||
|
struct f_name {
|
||||||
|
uint32_t flen;
|
||||||
|
char filename[];
|
||||||
|
};
|
||||||
|
|
||||||
|
struct copy_cmd {
|
||||||
|
uint8_t opcode;
|
||||||
|
f_name file1;
|
||||||
|
// use f_name as an overlay at the end of file1 to get file2.
|
||||||
|
};
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
|
|
||||||
#include "AppendTask.h"
|
#include "AppendTask.h"
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
#include "IOCoordinator.h"
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|||||||
57
src/CopyTask.cpp
Normal file
57
src/CopyTask.cpp
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
|
||||||
|
#include "CopyTask.h"
|
||||||
|
#include <errno.h>
|
||||||
|
#include "messageFormat.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
namespace storagemanager
|
||||||
|
{
|
||||||
|
|
||||||
|
CopyTask::CopyTask(int sock, uint len) : PosixTask(sock, len)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
CopyTask::~CopyTask()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
#define check_error(msg) \
|
||||||
|
if (!success) \
|
||||||
|
{ \
|
||||||
|
handleError(msg, errno); \
|
||||||
|
return; \
|
||||||
|
}
|
||||||
|
|
||||||
|
void CopyTask::run()
|
||||||
|
{
|
||||||
|
bool success;
|
||||||
|
uint8_t buf[2048] = {0};
|
||||||
|
|
||||||
|
if (getLength() > 2047)
|
||||||
|
{
|
||||||
|
handleError("CopyTask read", ENAMETOOLONG);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
success = read(buf, getLength());
|
||||||
|
check_error("CopyTask read");
|
||||||
|
copy_cmd *cmd = (copy_cmd *) buf;
|
||||||
|
string filename1(cmd->file1.filename, cmd->file1.flen); // need to copy this in case it's not null terminated
|
||||||
|
f_name *filename2 = (f_name *) &buf[sizeof(copy_cmd) + cmd->file1.flen];
|
||||||
|
|
||||||
|
int err = ioc->copyFile(filename1.c_str(), filename2->filename);
|
||||||
|
if (err)
|
||||||
|
{
|
||||||
|
handleError("CopyTask copy", errno);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sm_msg_resp *resp = (sm_msg_resp *) buf;
|
||||||
|
resp->type = SM_MSG_START;
|
||||||
|
resp->payloadLen = 4;
|
||||||
|
resp->returnCode = 0;
|
||||||
|
write(buf, sizeof(sm_msg_resp));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
24
src/CopyTask.h
Normal file
24
src/CopyTask.h
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
|
||||||
|
#ifndef COPYTASK_H_
|
||||||
|
#define COPYTASK_H_
|
||||||
|
|
||||||
|
#include "PosixTask.h"
|
||||||
|
|
||||||
|
namespace storagemanager
|
||||||
|
{
|
||||||
|
|
||||||
|
class CopyTask : public PosixTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
CopyTask(int sock, uint length);
|
||||||
|
virtual ~CopyTask();
|
||||||
|
|
||||||
|
void run();
|
||||||
|
|
||||||
|
private:
|
||||||
|
CopyTask();
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif
|
||||||
@@ -7,6 +7,7 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
@@ -54,7 +55,7 @@ struct scoped_closer {
|
|||||||
};
|
};
|
||||||
|
|
||||||
#define OPEN(name, mode) \
|
#define OPEN(name, mode) \
|
||||||
fd = ::open(filename, mode, 0666); \
|
fd = ::open(filename, mode, 0660); \
|
||||||
if (fd < 0) \
|
if (fd < 0) \
|
||||||
return fd; \
|
return fd; \
|
||||||
scoped_closer sc(fd);
|
scoped_closer sc(fd);
|
||||||
@@ -164,4 +165,24 @@ int IOCoordinator::unlink(const char *path)
|
|||||||
return ::unlink(path);
|
return ::unlink(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int IOCoordinator::copyFile(const char *filename1, const char *filename2)
|
||||||
|
{
|
||||||
|
int err = 0, l_errno;
|
||||||
|
try {
|
||||||
|
boost::filesystem::copy_file(filename1, filename2);
|
||||||
|
}
|
||||||
|
catch (boost::filesystem::filesystem_error &e) {
|
||||||
|
err = -1;
|
||||||
|
l_errno = EIO; // why not.
|
||||||
|
// eh, not going to translate all of boost's errors into our errors for this.
|
||||||
|
// log the error
|
||||||
|
cout << "IOCoordinator::copy(): got " << e.what() << endl;
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
err = -1;
|
||||||
|
l_errno = EIO;
|
||||||
|
}
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ class IOCoordinator : public boost::noncopyable
|
|||||||
int stat(const char *path, struct stat *out);
|
int stat(const char *path, struct stat *out);
|
||||||
int truncate(const char *path, size_t newsize);
|
int truncate(const char *path, size_t newsize);
|
||||||
int unlink(const char *path);
|
int unlink(const char *path);
|
||||||
|
int copyFile(const char *filename1, const char *filename2);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
IOCoordinator();
|
IOCoordinator();
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
|
|
||||||
#include "ListDirectoryTask.h"
|
#include "ListDirectoryTask.h"
|
||||||
#include "IOCoordinator.h"
|
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
|
|
||||||
|
|
||||||
#include "OpenTask.h"
|
#include "OpenTask.h"
|
||||||
#include "IOCoordinator.h"
|
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
|
|
||||||
#include "AppendTask.h"
|
#include "AppendTask.h"
|
||||||
|
#include "CopyTask.h"
|
||||||
#include "ListDirectoryTask.h"
|
#include "ListDirectoryTask.h"
|
||||||
#include "OpenTask.h"
|
#include "OpenTask.h"
|
||||||
#include "PingTask.h"
|
#include "PingTask.h"
|
||||||
@@ -89,6 +90,9 @@ void ProcessTask::operator()()
|
|||||||
case PING:
|
case PING:
|
||||||
task = new PingTask(sock, length);
|
task = new PingTask(sock, length);
|
||||||
break;
|
break;
|
||||||
|
case COPY:
|
||||||
|
task = new CopyTask(sock, length);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw runtime_error("ProcessTask: got an unknown opcode");
|
throw runtime_error("ProcessTask: got an unknown opcode");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
|
|
||||||
#include "ReadTask.h"
|
#include "ReadTask.h"
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
#include "IOCoordinator.h"
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
|
|
||||||
#include "StatTask.h"
|
#include "StatTask.h"
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
#include "IOCoordinator.h"
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
|
|
||||||
#include "TruncateTask.h"
|
#include "TruncateTask.h"
|
||||||
#include "IOCoordinator.h"
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
|
|
||||||
#include "UnlinkTask.h"
|
#include "UnlinkTask.h"
|
||||||
#include "IOCoordinator.h"
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
#include "TruncateTask.h"
|
#include "TruncateTask.h"
|
||||||
#include "ListDirectoryTask.h"
|
#include "ListDirectoryTask.h"
|
||||||
#include "PingTask.h"
|
#include "PingTask.h"
|
||||||
|
#include "CopyTask.h"
|
||||||
#include "messageFormat.h"
|
#include "messageFormat.h"
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
@@ -398,6 +399,52 @@ bool pingtask()
|
|||||||
cout << "pingtask OK" << endl;
|
cout << "pingtask OK" << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool copytask()
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
make a file
|
||||||
|
copy it
|
||||||
|
verify it exists
|
||||||
|
*/
|
||||||
|
const char *filename = "copytest1";
|
||||||
|
::unlink(filename);
|
||||||
|
int fd = ::open(filename, O_CREAT | O_RDWR, 0666);
|
||||||
|
assert(fd > 0);
|
||||||
|
scoped_closer f(fd);
|
||||||
|
int err = ::write(fd, "testjunk", 8);
|
||||||
|
assert(err == 8);
|
||||||
|
|
||||||
|
uint8_t buf[1024];
|
||||||
|
copy_cmd *cmd = (copy_cmd *) buf;
|
||||||
|
cmd->opcode = COPY;
|
||||||
|
cmd->file1.flen = strlen(filename);
|
||||||
|
strncpy(cmd->file1.filename, filename, cmd->file1.flen);
|
||||||
|
const char *filename2 = "copytest2";
|
||||||
|
f_name *file2 = (f_name *) &cmd->file1.filename[cmd->file1.flen];
|
||||||
|
file2->flen = strlen(filename2);
|
||||||
|
strncpy(file2->filename, filename2, file2->flen);
|
||||||
|
|
||||||
|
uint len = (uint64_t) &file2->filename[file2->flen] - (uint64_t) buf;
|
||||||
|
::write(sessionSock, buf, len);
|
||||||
|
CopyTask c(clientSock, len);
|
||||||
|
c.run();
|
||||||
|
|
||||||
|
// read the response
|
||||||
|
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
|
||||||
|
sm_msg_resp *resp = (sm_msg_resp *) buf;
|
||||||
|
assert(err == sizeof(sm_msg_resp));
|
||||||
|
assert(resp->type == SM_MSG_START);
|
||||||
|
assert(resp->payloadLen == 4);
|
||||||
|
assert(resp->returnCode == 0);
|
||||||
|
|
||||||
|
// verify copytest2 is there
|
||||||
|
assert(boost::filesystem::exists(filename2));
|
||||||
|
::unlink(filename);
|
||||||
|
::unlink(filename2);
|
||||||
|
cout << "copytask OK " << endl;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
int main()
|
int main()
|
||||||
{
|
{
|
||||||
cout << "connecting" << endl;
|
cout << "connecting" << endl;
|
||||||
@@ -412,5 +459,6 @@ int main()
|
|||||||
truncatetask();
|
truncatetask();
|
||||||
listdirtask();
|
listdirtask();
|
||||||
pingtask();
|
pingtask();
|
||||||
|
copytask();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user