diff --git a/dbcon/mysql/CMakeLists.txt b/dbcon/mysql/CMakeLists.txt index 96abc5a25..d09a3c34e 100644 --- a/dbcon/mysql/CMakeLists.txt +++ b/dbcon/mysql/CMakeLists.txt @@ -6,6 +6,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES} ${SERVER_SOURCE_ROOT_DIR}/storage/maria ) SET(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/../../../) +SET(S3API_DEPS marias3 curl CACHE INTERNAL "S3API_DEPS") + SET ( libcalmysql_SRCS ../../datatypes/mcs_datatype.cpp @@ -31,7 +33,9 @@ SET ( libcalmysql_SRCS is_columnstore_tables.cpp is_columnstore_columns.cpp is_columnstore_files.cpp - is_columnstore_extents.cpp) + is_columnstore_extents.cpp + columnstore_dataload.cpp) + set_source_files_properties(ha_mcs.cpp PROPERTIES COMPILE_FLAGS "-fno-implicit-templates") @@ -39,9 +43,9 @@ if (COMMAND mysql_add_plugin) IF(NOT(RPM OR DEB)) SET(disabled DISABLED) ENDIF() - + mysql_add_plugin(columnstore ${libcalmysql_SRCS} STORAGE_ENGINE MODULE_ONLY ${disabled} - LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} threadpool + LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${S3API_DEPS} threadpool VERSION ${PACKAGE_VERSION} COMPONENT columnstore-engine CONFIG columnstore.cnf) else () @@ -49,23 +53,24 @@ else () SET_TARGET_PROPERTIES(ha_columnstore PROPERTIES PREFIX "") add_dependencies(ha_columnstore loggingcpp) + add_dependencies(ha_columnstore ${S3API_DEPS}) add_definitions(-DMYSQL_DYNAMIC_PLUGIN -DPLUGIN_COLUMNSTORE_VERSION="${PACKAGE_VERSION}") - - target_link_libraries(ha_columnstore ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool) + + target_link_libraries(ha_columnstore ${S3API_DEPS} ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool) install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine) - + # define this dummy target for standalone builds (ie, when mysql_add_plugin doesn't exist) - add_custom_target(columnstore DEPENDS ha_columnstore) + add_custom_target(columnstore DEPENDS ha_columnstore) install(FILES columnstore.cnf DESTINATION ${MARIADB_MYCNFDIR} COMPONENT columnstore-engine) endif () if (TARGET columnstore) - install(FILES syscatalog_mysql.sql - dumpcat_mysql.sql - calsetuserpriority.sql - calremoveuserpriority.sql - calshowprocesslist.sql + install(FILES syscatalog_mysql.sql + dumpcat_mysql.sql + calsetuserpriority.sql + calremoveuserpriority.sql + calshowprocesslist.sql columnstore_info.sql DESTINATION ${ENGINE_SUPPORTDIR} COMPONENT columnstore-engine) install(PROGRAMS install_mcs_mysql.sh diff --git a/dbcon/mysql/columnstore_dataload.cpp b/dbcon/mysql/columnstore_dataload.cpp new file mode 100644 index 000000000..225576bfb --- /dev/null +++ b/dbcon/mysql/columnstore_dataload.cpp @@ -0,0 +1,193 @@ +/* Copyright (C) 2022 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. +*/ + +#include + +#define NEED_CALPONT_INTERFACE +#define PREFER_MY_CONFIG_H 1 +#include "ha_mcs_impl.h" +#include "ha_mcs_impl_if.h" +using namespace cal_impl_if; + +#include "errorcodes.h" +#include "idberrorinfo.h" +#include "errorids.h" +using namespace logging; + +#include "columnstoreversion.h" +#include "ha_mcs_sysvars.h" + +#include "utils/json/json.hpp" +#include + +static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp) +{ + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; +} + +std::string parseCMAPIkey() +{ + std::regex pattern("\\s*x-api-key\\s*=\\s*'(.*)'"); + auto configFile = std::string(MCSSYSCONFDIR) + "/columnstore/cmapi_server.conf"; + std::ifstream in(configFile, std::ios::in | std::ios::binary); + + if (!in.good()) + return {}; + + std::string contents; + in.seekg(0, std::ios::end); + contents.reserve(in.tellg()); + in.seekg(0, std::ios::beg); + std::copy((std::istreambuf_iterator(in)), std::istreambuf_iterator(), + std::back_inserter(contents)); + in.close(); + + std::smatch matches; + + if (std::regex_search(contents, matches, pattern)) + return matches.str(1); + + return {}; +} + +extern "C" +{ + struct InitData + { + CURL* curl = nullptr; + char* result = nullptr; + }; + + void columnstore_dataload_deinit(UDF_INIT* initid) + { + InitData* initData = (InitData*)(initid->ptr); + if (!initData) + return; + + curl_easy_cleanup(initData->curl); + delete initData->result; + } + + const char* columnstore_dataload_impl(CURL* curl, char* result, unsigned long* length, + std::string_view bucket, std::string_view table, + std::string_view filename, std::string_view database, + std::string_view secret, std::string_view key, + std::string_view region, std::string_view cmapi_host, + ulong cmapi_port, std::string_view cmapi_version, + std::string_view cmapi_key) + + { + CURLcode res; + std::string readBuffer; + + nlohmann::json j; + + j["bucket"] = bucket; + j["table"] = table; + j["filename"] = filename; + j["key"] = key; + j["secret"] = secret; + j["region"] = region; + j["database"] = database; + + std::string param = j.dump(); + + struct curl_slist* hs = NULL; + hs = curl_slist_append(hs, "Content-Type: application/json"); + std::string key_header = std::string("x-api-key:") + std::string(cmapi_key.begin(), cmapi_key.end()); + hs = curl_slist_append(hs, key_header.c_str()); + + std::string url = std::string(cmapi_host.begin(), cmapi_host.end()) + "/cmapi/" + + std::string(cmapi_version.begin(), cmapi_version.end()) + "/cluster/load_s3data"; + + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, hs); + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_PORT, cmapi_port); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, param.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); + + res = curl_easy_perform(curl); + + if (res != CURLE_OK) + { + std::string msg = std::string("Remote request failed: ") + curl_easy_strerror(res); + memcpy(result, msg.c_str(), msg.length()); + *length = msg.length(); + return result; + } + + result = new char[readBuffer.length() + 1]; + memcpy(result, readBuffer.c_str(), readBuffer.size()); + *length = readBuffer.size(); + + return result; + } + + const char* columnstore_dataload(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length, + char* /*is_null*/, char* /*error*/) + { + InitData* initData = (InitData*)(initid->ptr); + + if (!initData->curl) + { + std::string msg("CURL initialization failed, remote execution of dataload error"); + memcpy(result, msg.c_str(), msg.length()); + *length = msg.length(); + return result; + } + + const char* table = args->args[0]; + const char* filename = args->args[1]; + const char* bucket = args->args[2]; + + ulong cmapi_port = get_cmapi_port(_current_thd()); + const char* cmapi_host = get_cmapi_host(_current_thd()); + const char* cmapi_version = get_cmapi_version(_current_thd()); + const char* cmapi_key = get_cmapi_key(_current_thd()); + + THD* thd = _current_thd(); + + const char* secret = get_s3_secret(thd); + const char* key = get_s3_key(thd); + const char* region = get_s3_region(thd); + const char* database = args->arg_count != 4 ? thd->get_db() : args->args[3]; + + return columnstore_dataload_impl(initData->curl, initData->result, length, bucket, table, filename, + database, secret, key, region, cmapi_host, cmapi_port, cmapi_version, + ::strlen(cmapi_key) == 0 ? parseCMAPIkey().c_str() : cmapi_key); + } + + my_bool columnstore_dataload_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + if (args->arg_count != 3 && args->arg_count != 4) + { + strcpy(message, "COLUMNSTORE_DATALOAD() takes three or four arguments: (table, filename, bucket) or (table, filename, bucket, database)"); + return 1; + } + + initid->max_length = 1000 * 1000; + InitData* initData = new InitData; + initData->curl = curl_easy_init(); + initid->ptr = (char*)(initData); + + return 0; + } +} \ No newline at end of file diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 68822fb66..f372d9ec8 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -188,6 +188,22 @@ static MYSQL_THDVAR_ULONGLONG(cache_flush_threshold, PLUGIN_VAR_RQCMDARG, "Threshold on the number of rows in the cache to trigger a flush", NULL, NULL, 500000, 1, 1000000000, 1); +static MYSQL_THDVAR_STR(cmapi_host, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI host", NULL, NULL, + "https://localhost"); + +static MYSQL_THDVAR_STR(cmapi_version, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI version", NULL, NULL, + "0.4.0"); + +static MYSQL_THDVAR_STR(cmapi_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI key", NULL, NULL, + ""); + +static MYSQL_THDVAR_ULONGLONG(cmapi_port, PLUGIN_VAR_NOCMDOPT, "CMAPI port", NULL, + NULL, 8640, 100, 65356, 1); + +static MYSQL_THDVAR_STR(s3_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Key ", NULL, NULL, ""); +static MYSQL_THDVAR_STR(s3_secret, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Secret", NULL, NULL, ""); +static MYSQL_THDVAR_STR(s3_region, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 region", NULL, NULL, ""); + st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(fe_conn_info_ptr), MYSQL_SYSVAR(original_optimizer_flags), @@ -217,6 +233,13 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type), MYSQL_SYSVAR(cache_inserts), MYSQL_SYSVAR(cache_use_import), MYSQL_SYSVAR(cache_flush_threshold), + MYSQL_SYSVAR(cmapi_host), + MYSQL_SYSVAR(cmapi_port), + MYSQL_SYSVAR(cmapi_version), + MYSQL_SYSVAR(cmapi_key), + MYSQL_SYSVAR(s3_key), + MYSQL_SYSVAR(s3_secret), + MYSQL_SYSVAR(s3_region), NULL}; st_mysql_show_var mcs_status_variables[] = {{"columnstore_version", (char*)&cs_version, SHOW_CHAR}, @@ -505,3 +528,73 @@ void set_cache_flush_threshold(THD* thd, ulonglong value) { THDVAR(thd, cache_flush_threshold) = value; } + +ulong get_cmapi_port(THD* thd) +{ + return (thd == NULL) ? 5000 : THDVAR(thd, cmapi_port); +} + +void set_cmapi_port(THD* thd, ulong value) +{ + THDVAR(thd, cmapi_port) = value; +} + +const char* get_cmapi_host(THD* thd) +{ + return (thd == NULL) ? "127.0.0.1" : THDVAR(thd, cmapi_host); +} + +void set_cmapi_host(THD* thd, char* value) +{ + THDVAR(thd, cmapi_host) = value; +} + +const char* get_cmapi_version(THD* thd) +{ + return (thd == NULL) ? "0.4.0" : THDVAR(thd, cmapi_version); +} + +void set_cmapi_version(THD* thd, char* value) +{ + THDVAR(thd, cmapi_version) = value; +} + +const char* get_cmapi_key(THD* thd) +{ + return (thd == NULL) ? "" : THDVAR(thd, cmapi_key); +} + +void set_cmapi_key(THD* thd, char* value) +{ + THDVAR(thd, cmapi_key) = value; +} + +const char* get_s3_key(THD* thd) +{ + return THDVAR(thd, s3_key); +} + +void set_s3_key(THD* thd, char* value) +{ + THDVAR(thd, s3_key) = value; +} + +const char* get_s3_secret(THD* thd) +{ + return THDVAR(thd, s3_secret); +} + +void set_s3_secret(THD* thd, char* value) +{ + THDVAR(thd, s3_secret) = value; +} + +const char* get_s3_region(THD* thd) +{ + return THDVAR(thd, s3_region); +} + +void set_s3_region(THD* thd, char* value) +{ + THDVAR(thd, s3_region) = value; +} \ No newline at end of file diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 44017bb58..ec116987d 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -143,3 +143,24 @@ void set_cache_use_import(THD* thd, bool value); ulonglong get_cache_flush_threshold(THD* thd); void set_cache_flush_threshold(THD* thd, ulonglong value); + +ulong get_cmapi_port(THD* thd); +void set_cmapi_port(THD* thd, ulong value); + +const char* get_cmapi_key(THD* thd); +void set_cmapi_key(THD* thd, char* value); + +const char* get_cmapi_host(THD* thd); +void set_cmapi_host(THD* thd, char* value); + +const char* get_cmapi_version(THD* thd); +void set_cmapi_version(THD* thd, char* value); + +const char* get_s3_key(THD* thd); +void set_s3_key(THD* thd, char* value); + +const char* get_s3_secret(THD* thd); +void set_s3_secret(THD* thd, char* value); + +const char* get_s3_region(THD* thd); +void set_s3_region(THD* thd, char* value); diff --git a/dbcon/mysql/install_mcs_mysql.sh.in b/dbcon/mysql/install_mcs_mysql.sh.in index 2728b1dca..cb980fe56 100755 --- a/dbcon/mysql/install_mcs_mysql.sh.in +++ b/dbcon/mysql/install_mcs_mysql.sh.in @@ -55,6 +55,7 @@ CREATE OR REPLACE FUNCTION mcssystemreadonly RETURNS INTEGER SONAME 'ha_columnst CREATE OR REPLACE FUNCTION mcssystemprimary RETURNS INTEGER SONAME 'ha_columnstore.so'; CREATE OR REPLACE FUNCTION mcs_emindex_size RETURNS INTEGER SONAME 'ha_columnstore.so'; CREATE OR REPLACE FUNCTION mcs_emindex_free RETURNS INTEGER SONAME 'ha_columnstore.so'; +CREATE OR REPLACE FUNCTION columnstore_dataload RETURNS STRING SONAME 'ha_columnstore.so'; CREATE OR REPLACE AGGREGATE FUNCTION regr_avgx RETURNS REAL SONAME 'libregr_mysql.so'; CREATE OR REPLACE AGGREGATE FUNCTION regr_avgy RETURNS REAL SONAME 'libregr_mysql.so'; CREATE OR REPLACE AGGREGATE FUNCTION regr_count RETURNS INTEGER SONAME 'libregr_mysql.so'; diff --git a/dbcon/mysql/syscatalog_mysql.sql b/dbcon/mysql/syscatalog_mysql.sql index 7444f1fa2..81a66460c 100644 --- a/dbcon/mysql/syscatalog_mysql.sql +++ b/dbcon/mysql/syscatalog_mysql.sql @@ -41,3 +41,19 @@ create table if not exists syscolumn (`schema` varchar(128), `maxvalue` varchar(64), compressiontype integer, nextvalue bigint) engine=columnstore comment='SCHEMA SYNC ONLY'; + + +DELIMITER // +CREATE OR REPLACE PROCEDURE columnstore_load_from_s3 (in tablename varchar(256) CHARACTER SET utf8, + in filename varchar(256) CHARACTER SET utf8, + in bucket varchar(256) CHARACTER SET utf8, + in dbname varchar(256) CHARACTER SET utf8) +LANGUAGE SQL +NOT DETERMINISTIC +MODIFIES SQL DATA +SQL SECURITY INVOKER +BEGIN + select columnstore_dataload(tablename, filename, bucket, dbname); +END // + +DELIMITER ; \ No newline at end of file diff --git a/writeengine/bulk/CMakeLists.txt b/writeengine/bulk/CMakeLists.txt index 9a4c4c37f..c69d143fa 100644 --- a/writeengine/bulk/CMakeLists.txt +++ b/writeengine/bulk/CMakeLists.txt @@ -40,7 +40,7 @@ set(cpimport.bin_SRCS cpimport.cpp) add_executable(cpimport.bin ${cpimport.bin_SRCS}) add_dependencies(cpimport.bin marias3) -target_link_libraries(cpimport.bin ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${S3API_DEPS} we_bulk we_xml) +target_link_libraries(cpimport.bin boost_program_options ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${S3API_DEPS} we_bulk we_xml) install(TARGETS cpimport.bin DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine) diff --git a/writeengine/bulk/cpimport.cpp b/writeengine/bulk/cpimport.cpp index 54aeaa2c5..cf22e9590 100644 --- a/writeengine/bulk/cpimport.cpp +++ b/writeengine/bulk/cpimport.cpp @@ -172,11 +172,12 @@ void printUsage() << " -T Timezone used for TIMESTAMP datatype" << endl << " Possible values: \"SYSTEM\" (default)" << endl << " : Offset in the form +/-HH:MM" << endl -// << " -y S3 Authentication Key (for S3 imports)" << endl -// << " -K S3 Authentication Secret (for S3 imports)" << endl -// << " -t S3 Bucket (for S3 imports)" << endl -// << " -H S3 Hostname (for S3 imports, Amazon's S3 default)" << endl -// << " -g S3 Regions (for S3 imports)" << endl + << endl + << " -y S3 Authentication Key (for S3 imports)" << endl + << " -K S3 Authentication Secret (for S3 imports)" << endl + << " -t S3 Bucket (for S3 imports)" << endl + << " -H S3 Hostname (for S3 imports, Amazon's S3 default)" << endl + << " -g S3 Regions (for S3 imports)" << endl << " -U username of new data files owner. Default is mysql" << endl; cout << " Example1:" << endl @@ -309,7 +310,7 @@ void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJob BulkModeType bulkMode = BULK_MODE_LOCAL; std::string jobUUID; - while ((option = getopt(argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:U:")) != + while ((option = getopt(argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:y:K:t:H:g:U:")) != EOF) { switch (option) @@ -675,7 +676,7 @@ void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJob BulkLoad::disableConsoleOutput(true); break; } -/* + case 'y': { curJob.setS3Key(optarg); @@ -705,7 +706,7 @@ void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJob curJob.setS3Region(optarg); break; } -*/ + case 'U': { curJob.setUsername(optarg); diff --git a/writeengine/splitter/we_cmdargs.cpp b/writeengine/splitter/we_cmdargs.cpp index 9ad76a451..27efb1d37 100644 --- a/writeengine/splitter/we_cmdargs.cpp +++ b/writeengine/splitter/we_cmdargs.cpp @@ -561,11 +561,11 @@ void WECmdArgs::usage() << "\t-T\tTimezone used for TIMESTAMP datatype.\n" << "\t\tPossible values: \"SYSTEM\" (default)\n" << "\t\t : Offset in the form +/-HH:MM\n" -// << "\t-y\tS3 Authentication Key (for S3 imports)\n" -// << "\t-K\tS3 Authentication Secret (for S3 imports)\n" -// << "\t-t\tS3 Bucket (for S3 imports)\n" -// << "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n" -// << "\t-g\tS3 Region (for S3 imports)\n" + << "\t-y\tS3 Authentication Key (for S3 imports)\n" + << "\t-K\tS3 Authentication Secret (for S3 imports)\n" + << "\t-t\tS3 Bucket (for S3 imports)\n" + << "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n" + << "\t-g\tS3 Region (for S3 imports)\n" << "\t-L\tDirectory for the output .err and .bad files.\n" << "\t\tDefault is " << string(MCSLOGDIR); @@ -598,7 +598,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) if (argc > 0) fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; // argv[0] is splitter but we need cpimport - while ((aCh = getopt(argc, argv, "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:NU:L:")) != EOF) + while ((aCh = getopt(argc, argv, "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:Ny:K:t:H:g:U:L:")) != EOF) { switch (aCh) { @@ -906,7 +906,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) fConsoleOutput = false; break; } -/* + case 'y': //-y S3 Key { fS3Key = optarg; @@ -936,7 +936,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) fS3Region = optarg; break; } -*/ + case 'U': //-U username of the files owner { fUsername = optarg;