1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge pull request #2432 from mariadb-corporation/dataload-raw

MCOL-5013: Load Data from S3 into Columnstore
This commit is contained in:
Roman Nozdrin
2022-07-05 13:06:53 +03:00
committed by GitHub
9 changed files with 359 additions and 29 deletions

View File

@ -6,6 +6,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
${SERVER_SOURCE_ROOT_DIR}/storage/maria )
SET(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/../../../)
SET(S3API_DEPS marias3 curl CACHE INTERNAL "S3API_DEPS")
SET ( libcalmysql_SRCS
../../datatypes/mcs_datatype.cpp
@ -31,7 +33,9 @@ SET ( libcalmysql_SRCS
is_columnstore_tables.cpp
is_columnstore_columns.cpp
is_columnstore_files.cpp
is_columnstore_extents.cpp)
is_columnstore_extents.cpp
columnstore_dataload.cpp)
set_source_files_properties(ha_mcs.cpp PROPERTIES COMPILE_FLAGS "-fno-implicit-templates")
@ -41,7 +45,7 @@ if (COMMAND mysql_add_plugin)
ENDIF()
mysql_add_plugin(columnstore ${libcalmysql_SRCS} STORAGE_ENGINE MODULE_ONLY ${disabled}
LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} threadpool
LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${S3API_DEPS} threadpool
VERSION ${PACKAGE_VERSION}
COMPONENT columnstore-engine CONFIG columnstore.cnf)
else ()
@ -49,9 +53,10 @@ else ()
SET_TARGET_PROPERTIES(ha_columnstore PROPERTIES PREFIX "")
add_dependencies(ha_columnstore loggingcpp)
add_dependencies(ha_columnstore ${S3API_DEPS})
add_definitions(-DMYSQL_DYNAMIC_PLUGIN -DPLUGIN_COLUMNSTORE_VERSION="${PACKAGE_VERSION}")
target_link_libraries(ha_columnstore ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
target_link_libraries(ha_columnstore ${S3API_DEPS} ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine)

View File

@ -0,0 +1,193 @@
/* Copyright (C) 2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA.
*/
#include <curl/curl.h>
#define NEED_CALPONT_INTERFACE
#define PREFER_MY_CONFIG_H 1
#include "ha_mcs_impl.h"
#include "ha_mcs_impl_if.h"
using namespace cal_impl_if;
#include "errorcodes.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;
#include "columnstoreversion.h"
#include "ha_mcs_sysvars.h"
#include "utils/json/json.hpp"
#include <regex>
static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp)
{
((std::string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb;
}
std::string parseCMAPIkey()
{
std::regex pattern("\\s*x-api-key\\s*=\\s*'(.*)'");
auto configFile = std::string(MCSSYSCONFDIR) + "/columnstore/cmapi_server.conf";
std::ifstream in(configFile, std::ios::in | std::ios::binary);
if (!in.good())
return {};
std::string contents;
in.seekg(0, std::ios::end);
contents.reserve(in.tellg());
in.seekg(0, std::ios::beg);
std::copy((std::istreambuf_iterator<char>(in)), std::istreambuf_iterator<char>(),
std::back_inserter(contents));
in.close();
std::smatch matches;
if (std::regex_search(contents, matches, pattern))
return matches.str(1);
return {};
}
extern "C"
{
struct InitData
{
CURL* curl = nullptr;
char* result = nullptr;
};
void columnstore_dataload_deinit(UDF_INIT* initid)
{
InitData* initData = (InitData*)(initid->ptr);
if (!initData)
return;
curl_easy_cleanup(initData->curl);
delete initData->result;
}
const char* columnstore_dataload_impl(CURL* curl, char* result, unsigned long* length,
std::string_view bucket, std::string_view table,
std::string_view filename, std::string_view database,
std::string_view secret, std::string_view key,
std::string_view region, std::string_view cmapi_host,
ulong cmapi_port, std::string_view cmapi_version,
std::string_view cmapi_key)
{
CURLcode res;
std::string readBuffer;
nlohmann::json j;
j["bucket"] = bucket;
j["table"] = table;
j["filename"] = filename;
j["key"] = key;
j["secret"] = secret;
j["region"] = region;
j["database"] = database;
std::string param = j.dump();
struct curl_slist* hs = NULL;
hs = curl_slist_append(hs, "Content-Type: application/json");
std::string key_header = std::string("x-api-key:") + std::string(cmapi_key.begin(), cmapi_key.end());
hs = curl_slist_append(hs, key_header.c_str());
std::string url = std::string(cmapi_host.begin(), cmapi_host.end()) + "/cmapi/" +
std::string(cmapi_version.begin(), cmapi_version.end()) + "/cluster/load_s3data";
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, hs);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_PORT, cmapi_port);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, param.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
res = curl_easy_perform(curl);
if (res != CURLE_OK)
{
std::string msg = std::string("Remote request failed: ") + curl_easy_strerror(res);
memcpy(result, msg.c_str(), msg.length());
*length = msg.length();
return result;
}
result = new char[readBuffer.length() + 1];
memcpy(result, readBuffer.c_str(), readBuffer.size());
*length = readBuffer.size();
return result;
}
const char* columnstore_dataload(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length,
char* /*is_null*/, char* /*error*/)
{
InitData* initData = (InitData*)(initid->ptr);
if (!initData->curl)
{
std::string msg("CURL initialization failed, remote execution of dataload error");
memcpy(result, msg.c_str(), msg.length());
*length = msg.length();
return result;
}
const char* table = args->args[0];
const char* filename = args->args[1];
const char* bucket = args->args[2];
ulong cmapi_port = get_cmapi_port(_current_thd());
const char* cmapi_host = get_cmapi_host(_current_thd());
const char* cmapi_version = get_cmapi_version(_current_thd());
const char* cmapi_key = get_cmapi_key(_current_thd());
THD* thd = _current_thd();
const char* secret = get_s3_secret(thd);
const char* key = get_s3_key(thd);
const char* region = get_s3_region(thd);
const char* database = args->arg_count != 4 ? thd->get_db() : args->args[3];
return columnstore_dataload_impl(initData->curl, initData->result, length, bucket, table, filename,
database, secret, key, region, cmapi_host, cmapi_port, cmapi_version,
::strlen(cmapi_key) == 0 ? parseCMAPIkey().c_str() : cmapi_key);
}
my_bool columnstore_dataload_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
if (args->arg_count != 3 && args->arg_count != 4)
{
strcpy(message, "COLUMNSTORE_DATALOAD() takes three or four arguments: (table, filename, bucket) or (table, filename, bucket, database)");
return 1;
}
initid->max_length = 1000 * 1000;
InitData* initData = new InitData;
initData->curl = curl_easy_init();
initid->ptr = (char*)(initData);
return 0;
}
}

View File

@ -188,6 +188,22 @@ static MYSQL_THDVAR_ULONGLONG(cache_flush_threshold, PLUGIN_VAR_RQCMDARG,
"Threshold on the number of rows in the cache to trigger a flush", NULL, NULL,
500000, 1, 1000000000, 1);
static MYSQL_THDVAR_STR(cmapi_host, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI host", NULL, NULL,
"https://localhost");
static MYSQL_THDVAR_STR(cmapi_version, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI version", NULL, NULL,
"0.4.0");
static MYSQL_THDVAR_STR(cmapi_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI key", NULL, NULL,
"");
static MYSQL_THDVAR_ULONGLONG(cmapi_port, PLUGIN_VAR_NOCMDOPT, "CMAPI port", NULL,
NULL, 8640, 100, 65356, 1);
static MYSQL_THDVAR_STR(s3_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Key ", NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_secret, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Secret", NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_region, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 region", NULL, NULL, "");
st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(fe_conn_info_ptr),
MYSQL_SYSVAR(original_optimizer_flags),
@ -217,6 +233,13 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(cache_inserts),
MYSQL_SYSVAR(cache_use_import),
MYSQL_SYSVAR(cache_flush_threshold),
MYSQL_SYSVAR(cmapi_host),
MYSQL_SYSVAR(cmapi_port),
MYSQL_SYSVAR(cmapi_version),
MYSQL_SYSVAR(cmapi_key),
MYSQL_SYSVAR(s3_key),
MYSQL_SYSVAR(s3_secret),
MYSQL_SYSVAR(s3_region),
NULL};
st_mysql_show_var mcs_status_variables[] = {{"columnstore_version", (char*)&cs_version, SHOW_CHAR},
@ -505,3 +528,73 @@ void set_cache_flush_threshold(THD* thd, ulonglong value)
{
THDVAR(thd, cache_flush_threshold) = value;
}
ulong get_cmapi_port(THD* thd)
{
return (thd == NULL) ? 5000 : THDVAR(thd, cmapi_port);
}
void set_cmapi_port(THD* thd, ulong value)
{
THDVAR(thd, cmapi_port) = value;
}
const char* get_cmapi_host(THD* thd)
{
return (thd == NULL) ? "127.0.0.1" : THDVAR(thd, cmapi_host);
}
void set_cmapi_host(THD* thd, char* value)
{
THDVAR(thd, cmapi_host) = value;
}
const char* get_cmapi_version(THD* thd)
{
return (thd == NULL) ? "0.4.0" : THDVAR(thd, cmapi_version);
}
void set_cmapi_version(THD* thd, char* value)
{
THDVAR(thd, cmapi_version) = value;
}
const char* get_cmapi_key(THD* thd)
{
return (thd == NULL) ? "" : THDVAR(thd, cmapi_key);
}
void set_cmapi_key(THD* thd, char* value)
{
THDVAR(thd, cmapi_key) = value;
}
const char* get_s3_key(THD* thd)
{
return THDVAR(thd, s3_key);
}
void set_s3_key(THD* thd, char* value)
{
THDVAR(thd, s3_key) = value;
}
const char* get_s3_secret(THD* thd)
{
return THDVAR(thd, s3_secret);
}
void set_s3_secret(THD* thd, char* value)
{
THDVAR(thd, s3_secret) = value;
}
const char* get_s3_region(THD* thd)
{
return THDVAR(thd, s3_region);
}
void set_s3_region(THD* thd, char* value)
{
THDVAR(thd, s3_region) = value;
}

View File

@ -143,3 +143,24 @@ void set_cache_use_import(THD* thd, bool value);
ulonglong get_cache_flush_threshold(THD* thd);
void set_cache_flush_threshold(THD* thd, ulonglong value);
ulong get_cmapi_port(THD* thd);
void set_cmapi_port(THD* thd, ulong value);
const char* get_cmapi_key(THD* thd);
void set_cmapi_key(THD* thd, char* value);
const char* get_cmapi_host(THD* thd);
void set_cmapi_host(THD* thd, char* value);
const char* get_cmapi_version(THD* thd);
void set_cmapi_version(THD* thd, char* value);
const char* get_s3_key(THD* thd);
void set_s3_key(THD* thd, char* value);
const char* get_s3_secret(THD* thd);
void set_s3_secret(THD* thd, char* value);
const char* get_s3_region(THD* thd);
void set_s3_region(THD* thd, char* value);

View File

@ -55,6 +55,7 @@ CREATE OR REPLACE FUNCTION mcssystemreadonly RETURNS INTEGER SONAME 'ha_columnst
CREATE OR REPLACE FUNCTION mcssystemprimary RETURNS INTEGER SONAME 'ha_columnstore.so';
CREATE OR REPLACE FUNCTION mcs_emindex_size RETURNS INTEGER SONAME 'ha_columnstore.so';
CREATE OR REPLACE FUNCTION mcs_emindex_free RETURNS INTEGER SONAME 'ha_columnstore.so';
CREATE OR REPLACE FUNCTION columnstore_dataload RETURNS STRING SONAME 'ha_columnstore.so';
CREATE OR REPLACE AGGREGATE FUNCTION regr_avgx RETURNS REAL SONAME 'libregr_mysql.so';
CREATE OR REPLACE AGGREGATE FUNCTION regr_avgy RETURNS REAL SONAME 'libregr_mysql.so';
CREATE OR REPLACE AGGREGATE FUNCTION regr_count RETURNS INTEGER SONAME 'libregr_mysql.so';

View File

@ -41,3 +41,19 @@ create table if not exists syscolumn (`schema` varchar(128),
`maxvalue` varchar(64),
compressiontype integer,
nextvalue bigint) engine=columnstore comment='SCHEMA SYNC ONLY';
DELIMITER //
CREATE OR REPLACE PROCEDURE columnstore_load_from_s3 (in tablename varchar(256) CHARACTER SET utf8,
in filename varchar(256) CHARACTER SET utf8,
in bucket varchar(256) CHARACTER SET utf8,
in dbname varchar(256) CHARACTER SET utf8)
LANGUAGE SQL
NOT DETERMINISTIC
MODIFIES SQL DATA
SQL SECURITY INVOKER
BEGIN
select columnstore_dataload(tablename, filename, bucket, dbname);
END //
DELIMITER ;

View File

@ -40,7 +40,7 @@ set(cpimport.bin_SRCS cpimport.cpp)
add_executable(cpimport.bin ${cpimport.bin_SRCS})
add_dependencies(cpimport.bin marias3)
target_link_libraries(cpimport.bin ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${S3API_DEPS} we_bulk we_xml)
target_link_libraries(cpimport.bin boost_program_options ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${S3API_DEPS} we_bulk we_xml)
install(TARGETS cpimport.bin DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)

View File

@ -172,11 +172,12 @@ void printUsage()
<< " -T Timezone used for TIMESTAMP datatype" << endl
<< " Possible values: \"SYSTEM\" (default)" << endl
<< " : Offset in the form +/-HH:MM" << endl
// << " -y S3 Authentication Key (for S3 imports)" << endl
// << " -K S3 Authentication Secret (for S3 imports)" << endl
// << " -t S3 Bucket (for S3 imports)" << endl
// << " -H S3 Hostname (for S3 imports, Amazon's S3 default)" << endl
// << " -g S3 Regions (for S3 imports)" << endl
<< endl
<< " -y S3 Authentication Key (for S3 imports)" << endl
<< " -K S3 Authentication Secret (for S3 imports)" << endl
<< " -t S3 Bucket (for S3 imports)" << endl
<< " -H S3 Hostname (for S3 imports, Amazon's S3 default)" << endl
<< " -g S3 Regions (for S3 imports)" << endl
<< " -U username of new data files owner. Default is mysql" << endl;
cout << " Example1:" << endl
@ -309,7 +310,7 @@ void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJob
BulkModeType bulkMode = BULK_MODE_LOCAL;
std::string jobUUID;
while ((option = getopt(argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:U:")) !=
while ((option = getopt(argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:y:K:t:H:g:U:")) !=
EOF)
{
switch (option)
@ -675,7 +676,7 @@ void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJob
BulkLoad::disableConsoleOutput(true);
break;
}
/*
case 'y':
{
curJob.setS3Key(optarg);
@ -705,7 +706,7 @@ void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJob
curJob.setS3Region(optarg);
break;
}
*/
case 'U':
{
curJob.setUsername(optarg);

View File

@ -561,11 +561,11 @@ void WECmdArgs::usage()
<< "\t-T\tTimezone used for TIMESTAMP datatype.\n"
<< "\t\tPossible values: \"SYSTEM\" (default)\n"
<< "\t\t : Offset in the form +/-HH:MM\n"
// << "\t-y\tS3 Authentication Key (for S3 imports)\n"
// << "\t-K\tS3 Authentication Secret (for S3 imports)\n"
// << "\t-t\tS3 Bucket (for S3 imports)\n"
// << "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n"
// << "\t-g\tS3 Region (for S3 imports)\n"
<< "\t-y\tS3 Authentication Key (for S3 imports)\n"
<< "\t-K\tS3 Authentication Secret (for S3 imports)\n"
<< "\t-t\tS3 Bucket (for S3 imports)\n"
<< "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n"
<< "\t-g\tS3 Region (for S3 imports)\n"
<< "\t-L\tDirectory for the output .err and .bad files.\n"
<< "\t\tDefault is " << string(MCSLOGDIR);
@ -598,7 +598,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
if (argc > 0)
fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; // argv[0] is splitter but we need cpimport
while ((aCh = getopt(argc, argv, "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:NU:L:")) != EOF)
while ((aCh = getopt(argc, argv, "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:Ny:K:t:H:g:U:L:")) != EOF)
{
switch (aCh)
{
@ -906,7 +906,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
fConsoleOutput = false;
break;
}
/*
case 'y': //-y S3 Key
{
fS3Key = optarg;
@ -936,7 +936,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
fS3Region = optarg;
break;
}
*/
case 'U': //-U username of the files owner
{
fUsername = optarg;