1
0
mirror of https://github.com/mariadb-corporation/mariadb-connector-c.git synced 2025-08-07 02:42:49 +03:00

Added support for remote files via plugin.

This will allow to read and import data from remote locations, e.g.
LOAD DATA LOCAL INFILE 'htp://www.example.com' or
mysql_options(mysql, MYSQL_READ_DEFAULT_FILE, "http://localhost/test.cnf");
The implementation uses libcurl. For a list of supported URL types
see http://curl.haxx.se/libcurl/
This commit is contained in:
Georg Richter
2015-03-18 19:52:51 +01:00
parent 77251b09ce
commit 95724c85a5
12 changed files with 642 additions and 126 deletions

View File

@@ -1,6 +1,5 @@
# CMakeLists.txt # CMakeLists.txt
# This is the LGPL libmariadb project. # This is the LGPL libmariadb project.
PROJECT(mariadb-connector-c C) PROJECT(mariadb-connector-c C)

33
include/mariadb/ma_io.h Normal file
View File

@@ -0,0 +1,33 @@
#ifndef _ma_io_h_
#define _ma_io_h_
#include <curl/curl.h>
enum enum_file_type {
MA_FILE_NONE=0,
MA_FILE_LOCAL=1,
MA_FILE_REMOTE=2
};
typedef struct
{
enum enum_file_type type;
void *ptr;
} MA_FILE;
struct st_rio_methods {
MA_FILE *(*open)(const char *url, const char *mode);
int (*close)(MA_FILE *ptr);
int (*feof)(MA_FILE *file);
size_t (*read)(void *ptr, size_t size, size_t nmemb, MA_FILE *file);
char * (*gets)(char *ptr, size_t size, MA_FILE *file);
};
/* function prototypes */
MA_FILE *ma_open(const char *location, const char *mode, MYSQL *mysql);
int ma_close(MA_FILE *file);
int ma_feof(MA_FILE *file);
size_t ma_read(void *ptr, size_t size, size_t nmemb, MA_FILE *file);
char *ma_gets(char *ptr, size_t size, MA_FILE *file);
#endif

View File

@@ -39,11 +39,14 @@
#define MYSQL_CLIENT_DB_PLUGIN 0 #define MYSQL_CLIENT_DB_PLUGIN 0
#define MYSQL_CLIENT_reserved 1 #define MYSQL_CLIENT_reserved 1
#define MYSQL_CLIENT_AUTHENTICATION_PLUGIN 2 #define MYSQL_CLIENT_AUTHENTICATION_PLUGIN 2
#define MYSQL_CLIENT_reserved22 3
#define MYSQL_CLIENT_REMOTEIO_PLUGIN 4
#define MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION 0x0100 #define MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION 0x0100
#define MYSQL_CLIENT_DB_PLUGIN_INTERFACE_VERSION 0x0100 #define MYSQL_CLIENT_DB_PLUGIN_INTERFACE_VERSION 0x0100
#define MYSQL_CLIENT_REMOTEIO_PLUGIN_INTERFACE_VERSION 0x0100
#define MYSQL_CLIENT_MAX_PLUGINS 3 #define MYSQL_CLIENT_MAX_PLUGINS 5
#define mysql_declare_client_plugin(X) \ #define mysql_declare_client_plugin(X) \
struct st_mysql_client_plugin_ ## X \ struct st_mysql_client_plugin_ ## X \
@@ -76,22 +79,6 @@ typedef struct st_mariadb_client_plugin_DB
MYSQL_CLIENT_PLUGIN_HEADER MYSQL_CLIENT_PLUGIN_HEADER
/* functions */ /* functions */
struct st_mysql_methods *methods; struct st_mysql_methods *methods;
/*
MYSQL * (*db_connect)(MYSQL *mysql,const char *host, const char *user,
const char *passwd, const char *db, uint port,
const char *unix_socket,unsigned long client_flag);
void (*db_close)(MYSQL *mysql);
int (*db_query)(MYSQL *mysql, const char *query, size_t query_len);
int (*db_read_one_row)(MYSQL *mysql, uint fields, MYSQL_ROW row,
ulong *lengths);
MYSQL_DATA *(*db_read_all_rows)(MYSQL *mysql,
MYSQL_FIELD *mysql_fields, uint fields);
void (*db_query_end)(MYSQL *mysql);
int (*db_stmt_prepare)(MYSQL_STMT *stmt, const char *stmt_str, ulong length);
my_bool (*db_stmt_close)(MYSQL_STMT *stmt);
my_bool (*is_supported_buffer_type)(enum enum_field_types type);
int (*db_stmt_fetch)(MYSQL_STMT *stmt);
int (*db_stmt_execute)(MYSQL_STMT *stmt); */
} MARIADB_DB_PLUGIN; } MARIADB_DB_PLUGIN;
#define MARIADB_DB_DRIVER(a) ((a)->ext_db) #define MARIADB_DB_DRIVER(a) ((a)->ext_db)
@@ -105,6 +92,7 @@ struct st_mysql_client_plugin_AUTHENTICATION
int (*authenticate_user)(MYSQL_PLUGIN_VIO *vio, struct st_mysql *mysql); int (*authenticate_user)(MYSQL_PLUGIN_VIO *vio, struct st_mysql *mysql);
}; };
/** /**
type of the mysql_authentication_dialog_ask function type of the mysql_authentication_dialog_ask function
@@ -123,6 +111,17 @@ struct st_mysql_client_plugin_AUTHENTICATION
*/ */
typedef char *(*mysql_authentication_dialog_ask_t)(struct st_mysql *mysql, typedef char *(*mysql_authentication_dialog_ask_t)(struct st_mysql *mysql,
int type, const char *prompt, char *buf, int buf_len); int type, const char *prompt, char *buf, int buf_len);
/********************** remote IO plugin **********************/
#include <mariadb/ma_io.h>
/* Remote IO plugin */
struct st_mysql_client_plugin_REMOTEIO
{
MYSQL_CLIENT_PLUGIN_HEADER
struct st_rio_methods *methods;
};
/******** using plugins ************/ /******** using plugins ************/
/** /**

View File

@@ -263,7 +263,8 @@ is_prefix.c
libmariadb.c libmariadb.c
list.c list.c
llstr.c llstr.c
longlong2str.c longlong2str.c
ma_io.c
mf_dirname.c mf_dirname.c
mf_fn_ext.c mf_fn_ext.c
mf_format.c mf_format.c

View File

@@ -62,7 +62,25 @@ static uint plugin_version[MYSQL_CLIENT_MAX_PLUGINS]=
{ {
MYSQL_CLIENT_DB_PLUGIN_INTERFACE_VERSION, /* these two are taken by Connector/C */ MYSQL_CLIENT_DB_PLUGIN_INTERFACE_VERSION, /* these two are taken by Connector/C */
0, /* these two are taken by Connector/C */ 0, /* these two are taken by Connector/C */
MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION,
0,
MYSQL_CLIENT_REMOTEIO_PLUGIN_INTERFACE_VERSION
};
typedef struct st_mysql_client_plugin_AUTHENTICATION auth_plugin_t;
extern auth_plugin_t old_password_client_plugin;
extern auth_plugin_t native_password_client_plugin;
/* built in plugins:
These plugins are part of Connector/C, so no need to
load them
*/
struct st_mysql_client_plugin *mysql_client_builtins[]=
{
(struct st_mysql_client_plugin *)&old_password_client_plugin,
(struct st_mysql_client_plugin *)&native_password_client_plugin,
0
}; };
/* /*
@@ -85,7 +103,7 @@ static int is_not_initialized(MYSQL *mysql, const char *name)
my_set_error(mysql, CR_AUTH_PLUGIN_CANNOT_LOAD, my_set_error(mysql, CR_AUTH_PLUGIN_CANNOT_LOAD,
SQLSTATE_UNKNOWN, ER(CR_AUTH_PLUGIN_CANNOT_LOAD), SQLSTATE_UNKNOWN, ER(CR_AUTH_PLUGIN_CANNOT_LOAD),
name, "not initialized"); name ? name : "unknown" , "not initialized");
return 1; return 1;
} }
@@ -112,13 +130,14 @@ static struct st_mysql_client_plugin *find_plugin(const char *name, int type)
for (p= plugin_list[type]; p; p= p->next) for (p= plugin_list[type]; p; p= p->next)
{ {
if (!name)
return p->plugin;
if (strcmp(p->plugin->name, name) == 0) if (strcmp(p->plugin->name, name) == 0)
return p->plugin; return p->plugin;
} }
return NULL; return NULL;
} }
/** /**
verifies the plugin and adds it to the list verifies the plugin and adds it to the list
@@ -266,6 +285,7 @@ int mysql_client_plugin_init()
for (builtin= mysql_client_builtins; *builtin; builtin++) for (builtin= mysql_client_builtins; *builtin; builtin++)
add_plugin(&mysql, *builtin, 0, 0, unused); add_plugin(&mysql, *builtin, 0, 0, unused);
pthread_mutex_unlock(&LOCK_load_client_plugin); pthread_mutex_unlock(&LOCK_load_client_plugin);
load_env_plugins(&mysql); load_env_plugins(&mysql);
@@ -456,7 +476,8 @@ mysql_client_find_plugin(MYSQL *mysql, const char *name, int type)
return p; return p;
/* not found, load it */ /* not found, load it */
return mysql_load_plugin(mysql, name, type, 0); if (name)
return mysql_load_plugin(mysql, name, type, 0);
} }

View File

@@ -41,6 +41,8 @@
#include <ctype.h> #include <ctype.h>
#include "m_ctype.h" #include "m_ctype.h"
#include <my_dir.h> #include <my_dir.h>
#include <mysql.h>
#include <mariadb/ma_io.h>
char *defaults_extra_file=0; char *defaults_extra_file=0;
@@ -225,42 +227,49 @@ static my_bool search_default_file(DYNAMIC_ARRAY *args, MEM_ROOT *alloc,
const char *ext, TYPELIB *group) const char *ext, TYPELIB *group)
{ {
char name[FN_REFLEN+10],buff[4096],*ptr,*end,*value,*tmp; char name[FN_REFLEN+10],buff[4096],*ptr,*end,*value,*tmp;
FILE *fp; MA_FILE *file;
uint line=0; uint line=0;
my_bool read_values= 0, found_group= 0, is_escaped= 0, is_quoted= 0; my_bool read_values= 0, found_group= 0, is_escaped= 0, is_quoted= 0;
if ((dir ? strlen(dir) : 0 )+strlen(config_file) >= FN_REFLEN-3) if (!strstr(config_file, "://"))
return 0; /* Ignore wrong paths */
if (dir)
{ {
strmov(name,dir); if ((dir ? strlen(dir) : 0 )+strlen(config_file) >= FN_REFLEN-3)
convert_dirname(name); return 0; /* Ignore wrong paths */
if (dir[0] == FN_HOMELIB) /* Add . to filenames in home */ if (dir)
strcat(name,".");
strxmov(strend(name),config_file,ext,NullS);
}
else
{
strmov(name,config_file);
}
fn_format(name,name,"","",4);
#if !defined(_WIN32) && !defined(OS2)
{
MY_STAT stat_info;
if (!my_stat(name,&stat_info,MYF(0)))
return 0;
if (stat_info.st_mode & S_IWOTH) /* ignore world-writeable files */
{ {
fprintf(stderr, "warning: World-writeable config file %s is ignored\n", strmov(name,dir);
name); convert_dirname(name);
return 0; if (dir[0] == FN_HOMELIB) /* Add . to filenames in home */
strcat(name,".");
strxmov(strend(name),config_file,ext,NullS);
}
else
{
strmov(name,config_file);
}
fn_format(name,name,"","",4);
#if !defined(_WIN32) && !defined(OS2)
{
MY_STAT stat_info;
if (!my_stat(name,&stat_info,MYF(0)))
return 0;
if (stat_info.st_mode & S_IWOTH) /* ignore world-writeable files */
{
fprintf(stderr, "warning: World-writeable config file %s is ignored\n",
name);
return 0;
}
} }
}
#endif #endif
if (!(fp = my_fopen(fn_format(name,name,"","",4),O_RDONLY,MYF(0)))) if (!(file = ma_open(fn_format(name,name,"","",4),"r", NULL)))
return 0; /* Ignore wrong files */ return 0;
}
else {
if (!(file = ma_open(config_file, "r", NULL)))
return 0;
}
while (fgets(buff,sizeof(buff)-1,fp)) while (ma_gets(buff,sizeof(buff)-1,file))
{ {
line++; line++;
/* Ignore comment and empty lines */ /* Ignore comment and empty lines */
@@ -372,11 +381,11 @@ static my_bool search_default_file(DYNAMIC_ARRAY *args, MEM_ROOT *alloc,
*ptr=0; *ptr=0;
} }
} }
my_fclose(fp,MYF(0)); ma_close(file);
return(0); return(0);
err: err:
my_fclose(fp,MYF(0)); ma_close(file);
return 1; return 1;
} }

View File

@@ -32,6 +32,7 @@
#include "mysql.h" #include "mysql.h"
#include "mysql_version.h" #include "mysql_version.h"
#include "mysqld_error.h" #include "mysqld_error.h"
#include <mariadb/ma_io.h>
#include "errmsg.h" #include "errmsg.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <signal.h> #include <signal.h>

View File

@@ -21,7 +21,7 @@ do {\
typedef char constraint[(A) ? 1 : -1];\ typedef char constraint[(A) ? 1 : -1];\
} while (0); } while (0);
static auth_plugin_t native_password_client_plugin= auth_plugin_t native_password_client_plugin=
{ {
MYSQL_CLIENT_AUTHENTICATION_PLUGIN, MYSQL_CLIENT_AUTHENTICATION_PLUGIN,
MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION, MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION,
@@ -34,7 +34,7 @@ static auth_plugin_t native_password_client_plugin=
native_password_auth_client native_password_auth_client
}; };
static auth_plugin_t old_password_client_plugin= auth_plugin_t old_password_client_plugin=
{ {
MYSQL_CLIENT_AUTHENTICATION_PLUGIN, MYSQL_CLIENT_AUTHENTICATION_PLUGIN,
MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION, MYSQL_CLIENT_AUTHENTICATION_PLUGIN_INTERFACE_VERSION,
@@ -46,21 +46,6 @@ static auth_plugin_t old_password_client_plugin=
NULL, NULL,
old_password_auth_client old_password_auth_client
}; };
typedef struct st_mariadb_client_plugin_DBAPI dbapi_plugin_t;
#ifdef HAVE_SQLITE
extern dbapi_plugin_t sqlite3_plugin;
#endif
struct st_mysql_client_plugin *mysql_client_builtins[]=
{
(struct st_mysql_client_plugin *)&old_password_client_plugin,
(struct st_mysql_client_plugin *)&native_password_client_plugin,
#ifdef HAVE_SQLITE
(struct st_mysql_client_plugin *)&sqlite3_plugin,
#endif
0
};
typedef struct { typedef struct {
int (*read_packet)(struct st_plugin_vio *vio, uchar **buf); int (*read_packet)(struct st_plugin_vio *vio, uchar **buf);

View File

@@ -46,6 +46,7 @@
#include <m_string.h> #include <m_string.h>
#include "errmsg.h" #include "errmsg.h"
#include "mysql.h" #include "mysql.h"
#include <mariadb/ma_io.h>
#include <string.h> #include <string.h>
#ifdef _WIN32 #ifdef _WIN32
#include <share.h> #include <share.h>
@@ -53,7 +54,7 @@
typedef struct st_mysql_infile_info typedef struct st_mysql_infile_info
{ {
int fd; MA_FILE *fp;
int error_no; int error_no;
char error_msg[MYSQL_ERRMSG_SIZE + 1]; char error_msg[MYSQL_ERRMSG_SIZE + 1];
const char *filename; const char *filename;
@@ -64,71 +65,37 @@ static
int mysql_local_infile_init(void **ptr, const char *filename, void *userdata) int mysql_local_infile_init(void **ptr, const char *filename, void *userdata)
{ {
MYSQL_INFILE_INFO *info; MYSQL_INFILE_INFO *info;
int CodePage= -1;
#ifdef _WIN32
MYSQL *mysql= (MYSQL *)userdata; MYSQL *mysql= (MYSQL *)userdata;
wchar_t *w_filename= NULL;
int Length;
#endif
DBUG_ENTER("mysql_local_infile_init"); DBUG_ENTER("mysql_local_infile_init");
info = (MYSQL_INFILE_INFO *)my_malloc(sizeof(MYSQL_INFILE_INFO), MYF(MY_ZEROFILL)); info = (MYSQL_INFILE_INFO *)my_malloc(sizeof(MYSQL_INFILE_INFO), MYF(MY_ZEROFILL));
if (!info) { if (!info) {
DBUG_RETURN(1); DBUG_RETURN(1);
} }
*ptr = info; *ptr = info;
info->filename = filename; info->filename = filename;
#ifdef _WIN32 info->fp= ma_open(filename, "rb", mysql);
if (mysql)
CodePage= madb_get_windows_cp(mysql->charset->csname);
#endif
if (CodePage == -1)
{
#ifdef _WIN32
info->fd= sopen(info->filename, _O_RDONLY | _O_BINARY, _SH_DENYNO , _S_IREAD | _S_IWRITE);
#else
info->fd = open(info->filename, O_RDONLY | O_BINARY, my_umask);
#endif
my_errno= errno;
}
#ifdef _WIN32
else
{
if ((Length= MultiByteToWideChar(CodePage, 0, info->filename, (int)strlen(info->filename), NULL, 0)))
{
if (!(w_filename= (wchar_t *)my_malloc((Length + 1) * sizeof(wchar_t), MYF(MY_ZEROFILL))))
{
info->error_no= CR_OUT_OF_MEMORY;
my_snprintf((char *)info->error_msg, sizeof(info->error_msg),
ER(CR_OUT_OF_MEMORY));
DBUG_RETURN(1);
}
Length= MultiByteToWideChar(CodePage, 0, info->filename, (int)strlen(info->filename), w_filename, (int)Length);
}
if (Length == 0)
{
my_free(w_filename);
info->error_no= CR_UNKNOWN_ERROR;
my_snprintf((char *)info->error_msg, sizeof(info->error_msg),
"Character conversion error: %d", GetLastError());
DBUG_RETURN(1);
}
info->fd= _wsopen(w_filename, _O_RDONLY | _O_BINARY, _SH_DENYNO , _S_IREAD | _S_IWRITE);
my_errno= errno;
my_free(w_filename);
}
#endif
if (info->fd < 0) if (!info->fp)
{ {
info->error_no = my_errno; /* error handling is done via mysql_local_infile_error function, so we
my_snprintf((char *)info->error_msg, sizeof(info->error_msg), need to copy error to info */
EE(EE_FILENOTFOUND), filename, info->error_no); if (mysql_errno(mysql) && !info->error_no)
{
info->error_no= mysql_errno(mysql);
strncpy(info->error_msg, mysql_error(mysql), MYSQL_ERRMSG_SIZE);
}
else
{
info->error_no = my_errno;
my_snprintf((char *)info->error_msg, sizeof(info->error_msg),
EE(EE_FILENOTFOUND), filename, info->error_no);
}
DBUG_RETURN(1); DBUG_RETURN(1);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* }}} */ /* }}} */
@@ -139,11 +106,11 @@ static
int mysql_local_infile_read(void *ptr, char * buf, unsigned int buf_len) int mysql_local_infile_read(void *ptr, char * buf, unsigned int buf_len)
{ {
MYSQL_INFILE_INFO *info = (MYSQL_INFILE_INFO *)ptr; MYSQL_INFILE_INFO *info = (MYSQL_INFILE_INFO *)ptr;
int count; size_t count;
DBUG_ENTER("mysql_local_infile_read"); DBUG_ENTER("mysql_local_infile_read");
count= read(info->fd, (void *)buf, (size_t)buf_len); count= ma_read((void *)buf, 1, (size_t)buf_len, info->fp);
if (count < 0) if (count < 0)
{ {
@@ -184,8 +151,8 @@ void mysql_local_infile_end(void *ptr)
if (info) if (info)
{ {
if (info->fd >= 0) if (info->fp)
close(info->fd); ma_close(info->fp);
my_free(ptr); my_free(ptr);
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;

19
plugins/io/CMakeLists.txt Normal file
View File

@@ -0,0 +1,19 @@
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include)
FIND_PACKAGE(CURL)
IF(CURL_FOUND)
ADD_DEFINITIONS(-DHAVE_CURL=1)
# remote file plugin
INCLUDE_DIRECTORIES(${CURL_INCLUDE_DIR})
SET(REMOTE_IO_SOURCES remote_io.c)
ADD_LIBRARY(remote_io SHARED ${REMOTE_IO_SOURCES} ${CMAKE_SOURCE_DIR}/plugins/plugin.def)
TARGET_LINK_LIBRARIES(remote_io ${CURL_LIBRARIES})
SET_TARGET_PROPERTIES(remote_io PROPERTIES PREFIX "")
INSTALL(TARGETS
remote_io
RUNTIME DESTINATION "${PLUGIN_INSTALL_DIR}"
LIBRARY DESTINATION "${PLUGIN_INSTALL_DIR}"
ARCHIVE DESTINATION "${PLUGIN_INSTALL_DIR}")
ENDIF()

433
plugins/io/remote_io.c Normal file
View File

@@ -0,0 +1,433 @@
/************************************************************************************
* Copyright (C) 2015 Monty Program AB
* Copyright (c) 2003 Simtec Electronics
*
* Re-implemented by Vincent Sanders <vince@kyllikki.org> with extensive
* reference to original curl example code
*
* Rewritten for MariaDB Connector/C by Georg Richter <georg@mariadb.com>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*************************************************************************************/
/*
This is a plugin for remote file access via libcurl.
The following URL types are supported:
http://
https://
ftp://
sftp://
ldap://
smb://
*/
#ifdef HAVE_CURL
#include <my_global.h>
#include <my_sys.h>
#include <mysql.h>
#include <mysql/client_plugin.h>
#include <string.h>
#include <memory.h>
#include <stdio.h>
#include <string.h>
#ifndef WIN32
#include <sys/time.h>
#endif
#include <stdlib.h>
#include <errno.h>
/* Internal file structure */
MA_FILE *ma_rio_open(const char *url,const char *operation);
int ma_rio_close(MA_FILE *file);
int ma_rio_feof(MA_FILE *file);
size_t ma_rio_read(void *ptr, size_t size, size_t nmemb, MA_FILE *file);
char * ma_rio_gets(char *ptr, size_t size, MA_FILE *file);
int ma_rio_init(char *, size_t, int, va_list);
int ma_rio_deinit(void);
struct st_rio_methods ma_rio_methods= {
ma_rio_open,
ma_rio_close,
ma_rio_feof,
ma_rio_read,
ma_rio_gets
};
typedef struct
{
CURL *curl;
size_t length,
offset;
uchar *buffer;
int in_progress;
} MA_REMOTE_FILE;
CURLM *multi_handle= NULL;
mysql_declare_client_plugin(REMOTEIO)
"remote_io",
"Georg Richter",
"Remote IO plugin",
{0,1,0},
ma_rio_init,
ma_rio_deinit,
&ma_rio_methods
mysql_end_client_plugin;
/* {{{ ma_rio_init - Plugin initialization */
int ma_rio_init(char *unused1, size_t unused2, int unused3, va_list unused4)
{
curl_global_init(CURL_GLOBAL_ALL);
if (!multi_handle)
multi_handle = curl_multi_init();
return 0;
}
/* }}} */
/* {{{ ma_rio_deinit - Plugin deinitialization */
int ma_rio_deinit(void)
{
if (multi_handle)
{
curl_multi_cleanup(multi_handle);
multi_handle= NULL;
}
curl_global_cleanup();
}
/* }}} */
/* {{{ curl_write_callback */
static size_t rio_write_callback(char *buffer,
size_t size,
size_t nitems,
void *ptr)
{
size_t free_bytes;
char *tmp;
MA_FILE *file= (MA_FILE *)ptr;
MA_REMOTE_FILE *curl_file = (MA_REMOTE_FILE *)file->ptr;
size *= nitems;
free_bytes= curl_file->length - curl_file->offset;
/* check if we need to allocate more memory */
if (size > free_bytes) {
tmp= (char *)realloc((gptr)curl_file->buffer, curl_file->length + (size - free_bytes));
if (!tmp)
size= free_bytes;
else {
curl_file->length+= size - free_bytes;
curl_file->buffer= tmp;
}
}
/* copy buffer into MA_FILE structure */
memcpy((char *)curl_file->buffer + curl_file->offset, buffer, size);
curl_file->offset+= size;
return size;
}
/* }}} */
/* use to attempt to fill the read buffer up to requested number of bytes */
static int fill_buffer(MA_FILE *file, size_t want)
{
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
struct timeval timeout;
int rc;
CURLMcode mc; /* curl_multi_fdset() return code */
MA_REMOTE_FILE *rf= (MA_REMOTE_FILE *)file->ptr;
/* only attempt to fill buffer if transactions still running and buffer
doesnt exceed required size already */
if (!rf->in_progress || (rf->offset > want))
return 0;
/* try to fill buffer */
do {
int maxfd = -1;
long curl_timeo = -1;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
/* set a suitable timeout to fail on */
timeout.tv_sec = 20; /* 20 seconds */
timeout.tv_usec = 0;
curl_multi_timeout(multi_handle, &curl_timeo);
if(curl_timeo >= 0) {
timeout.tv_sec = curl_timeo / 1000;
if(timeout.tv_sec > 1)
timeout.tv_sec = 1;
else
timeout.tv_usec = (curl_timeo % 1000) * 1000;
}
/* get file descriptors from the transfers */
mc = curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd);
if(mc != CURLM_OK)
{
/* todo: error handling */
break;
}
/* On success the value of maxfd is guaranteed to be >= -1. We call
select(maxfd + 1, ...); specially in case of (maxfd == -1) there are
no fds ready yet so we call select(0, ...) */
if(maxfd == -1) {
struct timeval wait = { 0, 100 * 1000 }; /* 100ms */
rc = select(0, NULL, NULL, NULL, &wait);
}
else {
rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
}
switch(rc) {
case -1:
/* select error */
break;
case 0:
default:
/* timeout or readable/writable sockets */
curl_multi_perform(multi_handle, &rf->in_progress);
break;
}
} while(rf->in_progress && (rf->offset < want));
return 1;
}
/* use to remove want bytes from the front of a files buffer */
static int use_buffer(MA_FILE *file,int want)
{
MA_REMOTE_FILE *rf= (MA_REMOTE_FILE *)file->ptr;
/* sort out buffer */
if((rf->offset - want) <=0) {
/* ditch buffer - write will recreate */
if (rf->buffer)
free(rf->buffer);
rf->buffer=NULL;
rf->offset=0;
rf->length=0;
}
else {
/* move rest down make it available for later */
memmove(rf->buffer,
&rf->buffer[want],
(rf->offset - want));
rf->offset -= want;
}
return 0;
}
MA_FILE *ma_rio_open(const char *url,const char *operation)
{
/* this code could check for URLs or types in the 'url' and
basicly use the real fopen() for standard files */
MA_FILE *file;
MA_REMOTE_FILE *rf;
(void)operation;
if (!(file = (MA_FILE *)calloc(sizeof(MA_FILE), 1)))
return NULL;
file->type= MA_FILE_REMOTE;
if (!(file->ptr= rf= (MA_REMOTE_FILE *)calloc(sizeof(MA_REMOTE_FILE), 1)))
{
free(file);
return NULL;
}
rf->curl = curl_easy_init();
curl_easy_setopt(rf->curl, CURLOPT_URL, url);
curl_easy_setopt(rf->curl, CURLOPT_WRITEDATA, file);
curl_easy_setopt(rf->curl, CURLOPT_VERBOSE, 0L);
curl_easy_setopt(rf->curl, CURLOPT_WRITEFUNCTION, rio_write_callback);
curl_multi_add_handle(multi_handle, rf->curl);
/* lets start the fetch */
curl_multi_perform(multi_handle, &rf->in_progress);
if((rf->offset == 0) && (!rf->in_progress)) {
/* if in_progress is 0 now, we should return NULL */
/* make sure the easy handle is not in the multi handle anymore */
curl_multi_remove_handle(multi_handle, rf->curl);
/* cleanup */
curl_easy_cleanup(rf->curl);
free(file);
file = NULL;
}
return file;
}
int ma_rio_close(MA_FILE *file)
{
int ret=0;/* default is good return */
MA_REMOTE_FILE *rf= (MA_REMOTE_FILE *)file->ptr;
switch(file->type) {
case MA_FILE_REMOTE:
curl_multi_remove_handle(multi_handle, rf->curl);
/* cleanup */
curl_easy_cleanup(rf->curl);
break;
default: /* unknown or supported type - oh dear */
ret=EOF;
errno=EBADF;
break;
}
if(rf->buffer)
free(rf->buffer);/* free any allocated buffer space */
free(rf);
free(file);
return ret;
}
int ma_rio_feof(MA_FILE *file)
{
int ret=0;
MA_REMOTE_FILE *rf= (MA_REMOTE_FILE *)file->ptr;
switch(file->type) {
case MA_FILE_REMOTE:
if((rf->offset == 0) && (!rf->in_progress))
ret = 1;
break;
default: /* unknown or supported type - oh dear */
ret=-1;
errno=EBADF;
break;
}
return ret;
}
size_t ma_rio_read(void *ptr, size_t size, size_t nmemb, MA_FILE *file)
{
size_t want;
MA_REMOTE_FILE *rf= (MA_REMOTE_FILE *)file->ptr;
switch(file->type) {
case MA_FILE_REMOTE:
want = nmemb * size;
fill_buffer(file,want);
/* check if theres data in the buffer - if not fill_buffer()
* either errored or EOF */
if(!rf->offset)
return 0;
/* ensure only available data is considered */
if(rf->offset < want)
want = rf->offset;
/* xfer data to caller */
memcpy(ptr, rf->buffer, want);
use_buffer(file,want);
want = want / size; /* number of items */
break;
default: /* unknown or supported type - oh dear */
want=0;
errno=EBADF;
break;
}
return want;
}
char *ma_rio_gets(char *ptr, size_t size, MA_FILE *file)
{
size_t want = size - 1;/* always need to leave room for zero termination */
size_t loop;
switch(file->type) {
case MA_FILE_REMOTE:
{
MA_REMOTE_FILE *rf= (MA_REMOTE_FILE *)file->ptr;
fill_buffer(file,want);
/* check if theres data in the buffer - if not fill either errored or
* EOF */
if(!rf->offset)
return NULL;
/* ensure only available data is considered */
if(rf->offset < want)
want = rf->offset;
/*buffer contains data */
/* look for newline or eof */
for(loop=0;loop < want;loop++) {
if(rf->buffer[loop] == '\n') {
want=loop+1;/* include newline */
break;
}
}
/* xfer data to caller */
memcpy(ptr, rf->buffer, want);
ptr[want]=0;/* allways null terminate */
use_buffer(file,want);
break;
}
default: /* unknown or supported type - oh dear */
ptr=NULL;
errno=EBADF;
break;
}
return ptr;/*success */
}
#endif

View File

@@ -26,6 +26,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <mysql/client_plugin.h> #include <mysql/client_plugin.h>
void *remote_plugin;
/* /*
Bug#28075 "COM_DEBUG crashes mysqld" Bug#28075 "COM_DEBUG crashes mysqld"
@@ -917,7 +918,7 @@ static int test_connect_attrs(MYSQL *my)
rc= mysql_query(my, "SELECT * FROM performance_schema.session_connect_attrs LIMIT 1"); rc= mysql_query(my, "SELECT * FROM performance_schema.session_connect_attrs LIMIT 1");
if (rc != 0) if (rc != 0)
{ {
diag("Server doesn't connection attributes"); diag("Server doesn't support connection attributes");
return SKIP; return SKIP;
} }
@@ -1001,7 +1002,55 @@ static int test_conc117(MYSQL *mysql)
return OK; return OK;
} }
static int test_remote1(MYSQL *mysql)
{
int rc;
remote_plugin= (void *)mysql_client_find_plugin(mysql, "remote_io", MYSQL_CLIENT_REMOTEIO_PLUGIN);
if (!remote_plugin)
{
diag("skip - no remote io plugin available");
return SKIP;
}
rc= mysql_query(mysql, "DROP TABLE IF EXISTS t1");
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "CREATE TABLE t1 (a text)");
check_mysql_rc(rc, mysql);
rc= mysql_query(mysql, "LOAD DATA LOCAL INFILE 'http://www.example.com' INTO TABLE t1");
if (rc && mysql_errno(mysql) == 2058)
{
diag("remote_io plugin not available");
return SKIP;
}
check_mysql_rc(rc, mysql);
return OK;
}
static int test_remote2(MYSQL *my)
{
MYSQL *mysql= mysql_init(NULL);
if (!remote_plugin)
{
diag("skip - no remote io plugin available");
return SKIP;
}
mysql_options(mysql, MYSQL_READ_DEFAULT_FILE, "http://localhost/test.cnf");
mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "test");
mysql_real_connect(mysql, hostname, username, password, schema,
0, socketname, 0), mysql_error(my);
diag("port: %d", mysql->port);
mysql_close(mysql);
return OK;
}
struct my_tests_st my_tests[] = { struct my_tests_st my_tests[] = {
{"test_remote1", test_remote1, TEST_CONNECTION_NEW, 0, NULL, NULL},
{"test_remote2", test_remote2, TEST_CONNECTION_NEW, 0, NULL, NULL},
{"test_conc117", test_conc117, TEST_CONNECTION_DEFAULT, 0, NULL, NULL}, {"test_conc117", test_conc117, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"test_conc_114", test_conc_114, TEST_CONNECTION_DEFAULT, 0, NULL, NULL}, {"test_conc_114", test_conc_114, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"test_connect_attrs", test_connect_attrs, TEST_CONNECTION_DEFAULT, 0, NULL, NULL}, {"test_connect_attrs", test_connect_attrs, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},