1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5013: Load Data from S3 into Columnstore

Introduced UDF and stored prodecure.
usage:

set columnstore_s3_key='<s3_key>';
set columnstore_s3_secret='<s3_secret>';
set columnstore_s3_region='region';

and then use UDF
select columnstore_dataload("<tablename>", "<filename>", "<bucket>", "<db_name>");
for UDF db_name can be ommited, then current connection db will be used

or stored function
call calpontsys.columnstore_load_from_s3("<tablename>", "<filename>", "<bucket>", "<db_name>");
This commit is contained in:
Leonid Fedorov
2022-03-28 14:26:02 +00:00
parent 7d955a0f85
commit f5b2a6885f
9 changed files with 356 additions and 29 deletions

View File

@ -6,6 +6,8 @@ include_directories( ${ENGINE_COMMON_INCLUDES}
${SERVER_SOURCE_ROOT_DIR}/storage/maria )
SET(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/../../../)
SET(S3API_DEPS marias3 curl CACHE INTERNAL "S3API_DEPS")
SET ( libcalmysql_SRCS
../../datatypes/mcs_datatype.cpp
@ -31,7 +33,9 @@ SET ( libcalmysql_SRCS
is_columnstore_tables.cpp
is_columnstore_columns.cpp
is_columnstore_files.cpp
is_columnstore_extents.cpp)
is_columnstore_extents.cpp
columnstore_dataload.cpp)
set_source_files_properties(ha_mcs.cpp PROPERTIES COMPILE_FLAGS "-fno-implicit-templates")
@ -39,9 +43,9 @@ if (COMMAND mysql_add_plugin)
IF(NOT(RPM OR DEB))
SET(disabled DISABLED)
ENDIF()
mysql_add_plugin(columnstore ${libcalmysql_SRCS} STORAGE_ENGINE MODULE_ONLY ${disabled}
LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} threadpool
LINK_LIBRARIES ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${S3API_DEPS} threadpool
VERSION ${PACKAGE_VERSION}
COMPONENT columnstore-engine CONFIG columnstore.cnf)
else ()
@ -49,23 +53,24 @@ else ()
SET_TARGET_PROPERTIES(ha_columnstore PROPERTIES PREFIX "")
add_dependencies(ha_columnstore loggingcpp)
add_dependencies(ha_columnstore ${S3API_DEPS})
add_definitions(-DMYSQL_DYNAMIC_PLUGIN -DPLUGIN_COLUMNSTORE_VERSION="${PACKAGE_VERSION}")
target_link_libraries(ha_columnstore ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
target_link_libraries(ha_columnstore ${S3API_DEPS} ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${NETSNMP_LIBRARIES} ${SERVER_BUILD_DIR}/libservices/libmysqlservices.a threadpool)
install(TARGETS ha_columnstore DESTINATION ${MARIADB_PLUGINDIR} COMPONENT columnstore-engine)
# define this dummy target for standalone builds (ie, when mysql_add_plugin doesn't exist)
add_custom_target(columnstore DEPENDS ha_columnstore)
add_custom_target(columnstore DEPENDS ha_columnstore)
install(FILES columnstore.cnf DESTINATION ${MARIADB_MYCNFDIR} COMPONENT columnstore-engine)
endif ()
if (TARGET columnstore)
install(FILES syscatalog_mysql.sql
dumpcat_mysql.sql
calsetuserpriority.sql
calremoveuserpriority.sql
calshowprocesslist.sql
install(FILES syscatalog_mysql.sql
dumpcat_mysql.sql
calsetuserpriority.sql
calremoveuserpriority.sql
calshowprocesslist.sql
columnstore_info.sql
DESTINATION ${ENGINE_SUPPORTDIR} COMPONENT columnstore-engine)
install(PROGRAMS install_mcs_mysql.sh

View File

@ -0,0 +1,190 @@
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA.
*/
#include <curl/curl.h>
#define NEED_CALPONT_INTERFACE
#define PREFER_MY_CONFIG_H 1
#include "ha_mcs_impl.h"
#include "ha_mcs_impl_if.h"
using namespace cal_impl_if;
#include "errorcodes.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;
#include "columnstoreversion.h"
#include "ha_mcs_sysvars.h"
#include "utils/json/json.hpp"
#include <regex>
static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp)
{
((std::string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb;
}
std::string parseCMAPIkey()
{
std::regex pattern("\\s*x-api-key\\s*=\\s*'(.*)'");
auto configFile = std::string(MCSSYSCONFDIR) + "/columnstore/cmapi_server.conf";
std::ifstream in(configFile, std::ios::in | std::ios::binary);
if (!in.good())
return {};
std::string contents;
in.seekg(0, std::ios::end);
contents.reserve(in.tellg());
in.seekg(0, std::ios::beg);
std::copy((std::istreambuf_iterator<char>(in)), std::istreambuf_iterator<char>(),
std::back_inserter(contents));
in.close();
std::smatch matches;
if (std::regex_search(contents, matches, pattern))
return matches.str(1);
return {};
}
extern "C"
{
struct InitData
{
CURL* curl;
char* result;
};
void columnstore_dataload_deinit(UDF_INIT* initid)
{
InitData* initData = (InitData*)(initid->ptr);
curl_easy_cleanup(initData->curl);
delete initData->result;
}
const char* columnstore_dataload_impl(CURL* curl, char* result, unsigned long* length,
std::string_view bucket, std::string_view table,
std::string_view filename, std::string_view database,
std::string_view secret, std::string_view key,
std::string_view region, std::string_view cmapi_host,
ulong cmapi_port, std::string_view cmapi_version,
std::string_view cmapi_key)
{
CURLcode res;
std::string readBuffer;
nlohmann::json j;
j["bucket"] = bucket;
j["table"] = table;
j["filename"] = filename;
j["key"] = key;
j["secret"] = secret;
j["region"] = region;
j["database"] = database;
std::string param = j.dump();
struct curl_slist* hs = NULL;
hs = curl_slist_append(hs, "Content-Type: application/json");
std::string key_header = std::string("x-api-key:") + std::string(cmapi_key.begin(), cmapi_key.end());
hs = curl_slist_append(hs, key_header.c_str());
std::string url = std::string(cmapi_host.begin(), cmapi_host.end()) + "/cmapi/" +
std::string(cmapi_version.begin(), cmapi_version.end()) + "/cluster/load_s3data";
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, hs);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_PORT, cmapi_port);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, param.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
res = curl_easy_perform(curl);
if (res != CURLE_OK)
{
std::string msg = std::string("Remote request failed: ") + curl_easy_strerror(res);
memcpy(result, msg.c_str(), msg.length());
*length = msg.length();
return result;
}
result = new char[readBuffer.length() + 1];
memcpy(result, readBuffer.c_str(), readBuffer.size());
*length = readBuffer.size();
return result;
}
const char* columnstore_dataload(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length,
char* /*is_null*/, char* /*error*/)
{
InitData* initData = (InitData*)(initid->ptr);
if (!initData->curl)
{
std::string msg("CURL initialization failed, remote execution of dataload error");
memcpy(result, msg.c_str(), msg.length());
*length = msg.length();
return result;
}
const char* table = args->args[0];
const char* filename = args->args[1];
const char* bucket = args->args[2];
ulong cmapi_port = get_cmapi_port(_current_thd());
const char* cmapi_host = get_cmapi_host(_current_thd());
const char* cmapi_version = get_cmapi_version(_current_thd());
const char* cmapi_key = get_cmapi_key(_current_thd());
THD* thd = _current_thd();
const char* secret = get_s3_secret(thd);
const char* key = get_s3_key(thd);
const char* region = get_s3_region(thd);
const char* database = args->arg_count != 4 ? thd->get_db() : args->args[3];
return columnstore_dataload_impl(initData->curl, initData->result, length, bucket, table, filename,
database, secret, key, region, cmapi_host, cmapi_port, cmapi_version,
::strlen(cmapi_key) == 0 ? parseCMAPIkey().c_str() : cmapi_key);
}
my_bool columnstore_dataload_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
{
initid->max_length = 1000 * 1000;
InitData* initData = new InitData;
initData->curl = curl_easy_init();
initid->ptr = (char*)(initData);
if (args->arg_count != 3 && args->arg_count != 4)
{
strcpy(message, "COLUMNSTORE_DATALOAD() takes three or four arguments: (table, filename, bucket) or (table, filename, bucket, database)");
return 1;
}
return 0;
}
}

View File

@ -188,6 +188,22 @@ static MYSQL_THDVAR_ULONGLONG(cache_flush_threshold, PLUGIN_VAR_RQCMDARG,
"Threshold on the number of rows in the cache to trigger a flush", NULL, NULL,
500000, 1, 1000000000, 1);
static MYSQL_THDVAR_STR(cmapi_host, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI host", NULL, NULL,
"https://localhost");
static MYSQL_THDVAR_STR(cmapi_version, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI version", NULL, NULL,
"0.4.0");
static MYSQL_THDVAR_STR(cmapi_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "CMAPI key", NULL, NULL,
"");
static MYSQL_THDVAR_ULONGLONG(cmapi_port, PLUGIN_VAR_NOCMDOPT, "CMAPI port", NULL,
NULL, 8640, 100, 65356, 1);
static MYSQL_THDVAR_STR(s3_key, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Key ", NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_secret, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 Authentication Secret", NULL, NULL, "");
static MYSQL_THDVAR_STR(s3_region, PLUGIN_VAR_NOCMDOPT|PLUGIN_VAR_MEMALLOC, "S3 region", NULL, NULL, "");
st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(fe_conn_info_ptr),
MYSQL_SYSVAR(original_optimizer_flags),
@ -217,6 +233,13 @@ st_mysql_sys_var* mcs_system_variables[] = {MYSQL_SYSVAR(compression_type),
MYSQL_SYSVAR(cache_inserts),
MYSQL_SYSVAR(cache_use_import),
MYSQL_SYSVAR(cache_flush_threshold),
MYSQL_SYSVAR(cmapi_host),
MYSQL_SYSVAR(cmapi_port),
MYSQL_SYSVAR(cmapi_version),
MYSQL_SYSVAR(cmapi_key),
MYSQL_SYSVAR(s3_key),
MYSQL_SYSVAR(s3_secret),
MYSQL_SYSVAR(s3_region),
NULL};
st_mysql_show_var mcs_status_variables[] = {{"columnstore_version", (char*)&cs_version, SHOW_CHAR},
@ -505,3 +528,73 @@ void set_cache_flush_threshold(THD* thd, ulonglong value)
{
THDVAR(thd, cache_flush_threshold) = value;
}
ulong get_cmapi_port(THD* thd)
{
return (thd == NULL) ? 5000 : THDVAR(thd, cmapi_port);
}
void set_cmapi_port(THD* thd, ulong value)
{
THDVAR(thd, cmapi_port) = value;
}
const char* get_cmapi_host(THD* thd)
{
return (thd == NULL) ? "127.0.0.1" : THDVAR(thd, cmapi_host);
}
void set_cmapi_host(THD* thd, char* value)
{
THDVAR(thd, cmapi_host) = value;
}
const char* get_cmapi_version(THD* thd)
{
return (thd == NULL) ? "0.4.0" : THDVAR(thd, cmapi_version);
}
void set_cmapi_version(THD* thd, char* value)
{
THDVAR(thd, cmapi_version) = value;
}
const char* get_cmapi_key(THD* thd)
{
return (thd == NULL) ? "" : THDVAR(thd, cmapi_key);
}
void set_cmapi_key(THD* thd, char* value)
{
THDVAR(thd, cmapi_key) = value;
}
const char* get_s3_key(THD* thd)
{
return THDVAR(thd, s3_key);
}
void set_s3_key(THD* thd, char* value)
{
THDVAR(thd, s3_key) = value;
}
const char* get_s3_secret(THD* thd)
{
return THDVAR(thd, s3_secret);
}
void set_s3_secret(THD* thd, char* value)
{
THDVAR(thd, s3_secret) = value;
}
const char* get_s3_region(THD* thd)
{
return THDVAR(thd, s3_region);
}
void set_s3_region(THD* thd, char* value)
{
THDVAR(thd, s3_region) = value;
}

View File

@ -143,3 +143,24 @@ void set_cache_use_import(THD* thd, bool value);
ulonglong get_cache_flush_threshold(THD* thd);
void set_cache_flush_threshold(THD* thd, ulonglong value);
ulong get_cmapi_port(THD* thd);
void set_cmapi_port(THD* thd, ulong value);
const char* get_cmapi_key(THD* thd);
void set_cmapi_key(THD* thd, char* value);
const char* get_cmapi_host(THD* thd);
void set_cmapi_host(THD* thd, char* value);
const char* get_cmapi_version(THD* thd);
void set_cmapi_version(THD* thd, char* value);
const char* get_s3_key(THD* thd);
void set_s3_key(THD* thd, char* value);
const char* get_s3_secret(THD* thd);
void set_s3_secret(THD* thd, char* value);
const char* get_s3_region(THD* thd);
void set_s3_region(THD* thd, char* value);

View File

@ -55,6 +55,7 @@ CREATE OR REPLACE FUNCTION mcssystemreadonly RETURNS INTEGER SONAME 'ha_columnst
CREATE OR REPLACE FUNCTION mcssystemprimary RETURNS INTEGER SONAME 'ha_columnstore.so';
CREATE OR REPLACE FUNCTION mcs_emindex_size RETURNS INTEGER SONAME 'ha_columnstore.so';
CREATE OR REPLACE FUNCTION mcs_emindex_free RETURNS INTEGER SONAME 'ha_columnstore.so';
CREATE OR REPLACE FUNCTION columnstore_dataload RETURNS STRING SONAME 'ha_columnstore.so';
CREATE OR REPLACE AGGREGATE FUNCTION regr_avgx RETURNS REAL SONAME 'libregr_mysql.so';
CREATE OR REPLACE AGGREGATE FUNCTION regr_avgy RETURNS REAL SONAME 'libregr_mysql.so';
CREATE OR REPLACE AGGREGATE FUNCTION regr_count RETURNS INTEGER SONAME 'libregr_mysql.so';

View File

@ -41,3 +41,19 @@ create table if not exists syscolumn (`schema` varchar(128),
`maxvalue` varchar(64),
compressiontype integer,
nextvalue bigint) engine=columnstore comment='SCHEMA SYNC ONLY';
DELIMITER //
CREATE OR REPLACE PROCEDURE columnstore_load_from_s3 (in tablename varchar(256) CHARACTER SET utf8,
in filename varchar(256) CHARACTER SET utf8,
in bucket varchar(256) CHARACTER SET utf8,
in dbname varchar(256) CHARACTER SET utf8)
LANGUAGE SQL
NOT DETERMINISTIC
MODIFIES SQL DATA
SQL SECURITY INVOKER
BEGIN
select columnstore_dataload(tablename, filename, bucket, dbname);
END //
DELIMITER ;