1
0
mirror of https://github.com/MariaDB/server.git synced 2025-08-07 00:04:31 +03:00

Added HandlerSocket plugin

- Fixed compiler errors
- Modified Makefiles to be part of plugin directory
- Some minor changes in database.cpp to use the new MariaDB handler interface
This commit is contained in:
Michael Widenius
2011-02-20 15:22:10 +02:00
parent 26aa83bfc0
commit 2c7d6f12ee
109 changed files with 21060 additions and 0 deletions

View File

@@ -0,0 +1,27 @@
Copyright (c) 2010 DeNA Co.,Ltd.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of DeNA Co.,Ltd. nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY DeNA Co.,Ltd. "AS IS" AND ANY EXPRESS OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
EVENT SHALL DeNA Co.,Ltd. BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -0,0 +1,6 @@
Revision history for Perl extension HandlerSocket.
0.01 Wed Mar 31 11:50:23 2010
- original version; created by h2xs 1.23 with options
-A -n HandlerSocket

View File

@@ -0,0 +1,577 @@
// vim:ai:sw=2:ts=8
/*
* Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
* See COPYRIGHT.txt for details.
*/
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
#include "hstcpcli.hpp"
#define DBG(x)
static SV *
arr_get_entry(AV *av, I32 avmax, I32 idx)
{
if (idx > avmax) {
DBG(fprintf(stderr, "arr_get_entry1 %d %d\n", avmax, idx));
return 0;
}
SV **const ev = av_fetch(av, idx, 0);
if (ev == 0) {
DBG(fprintf(stderr, "arr_get_entry2 %d %d\n", avmax, idx));
return 0;
}
return *ev;
}
static int
arr_get_intval(AV *av, I32 avmax, I32 idx, int default_val = 0)
{
SV *const e = arr_get_entry(av, avmax, idx);
if (e == 0) {
return default_val;
}
return SvIV(e);
}
static const char *
sv_get_strval(SV *sv)
{
if (sv == 0 || !SvPOK(sv)) {
DBG(fprintf(stderr, "sv_get_strval\n"));
return 0;
}
return SvPV_nolen(sv);
}
static const char *
arr_get_strval(AV *av, I32 avmax, I32 idx)
{
SV *const e = arr_get_entry(av, avmax, idx);
return sv_get_strval(e);
}
static AV *
sv_get_arrval(SV *sv)
{
if (sv == 0 || !SvROK(sv)) {
DBG(fprintf(stderr, "sv_get_arrval1\n"));
return 0;
}
SV *const svtarget = SvRV(sv);
if (svtarget == 0 || SvTYPE(svtarget) != SVt_PVAV) {
DBG(fprintf(stderr, "sv_get_arrval2\n"));
return 0;
}
return (AV *)svtarget;
}
static AV *
arr_get_arrval(AV *av, I32 avmax, I32 idx)
{
SV *const e = arr_get_entry(av, avmax, idx);
if (e == 0) {
DBG(fprintf(stderr, "arr_get_arrval1\n"));
return 0;
}
return sv_get_arrval(e);
}
static void
hv_to_strmap(HV *hv, std::map<std::string, std::string>& m_r)
{
if (hv == 0) {
return;
}
hv_iterinit(hv);
HE *hent = 0;
while ((hent = hv_iternext(hv)) != 0) {
I32 klen = 0;
char *const k = hv_iterkey(hent, &klen);
DBG(fprintf(stderr, "k=%s\n", k));
const std::string key(k, klen);
SV *const vsv = hv_iterval(hv, hent);
STRLEN vlen = 0;
char *const v = SvPV(vsv, vlen);
DBG(fprintf(stderr, "v=%s\n", v));
const std::string val(v, vlen);
m_r[key] = val;
}
}
static void
strrefarr_push_back(std::vector<dena::string_ref>& a_r, SV *sv)
{
if (sv == 0 || SvTYPE(sv) == SVt_NULL) {
DBG(fprintf(stderr, "strrefarr_push_back: null\n"));
return a_r.push_back(dena::string_ref());
}
STRLEN vlen = 0;
char *const v = SvPV(sv, vlen);
DBG(fprintf(stderr, "strrefarr_push_back: %s\n", v));
a_r.push_back(dena::string_ref(v, vlen));
}
static void
av_to_strrefarr(AV *av, std::vector<dena::string_ref>& a_r)
{
if (av == 0) {
return;
}
const I32 len = av_len(av) + 1;
for (I32 i = 0; i < len; ++i) {
SV **const ev = av_fetch(av, i, 0);
strrefarr_push_back(a_r, ev ? *ev : 0);
}
}
static dena::string_ref
sv_get_string_ref(SV *sv)
{
if (sv == 0) {
return dena::string_ref();
}
STRLEN vlen = 0;
char *const v = SvPV(sv, vlen);
return dena::string_ref(v, vlen);
}
static IV
sv_get_iv(SV *sv)
{
if (sv == 0 || !SvIOK(sv)) {
return 0;
}
return SvIV(sv);
}
static void
av_to_filters(AV *av, std::vector<dena::hstcpcli_filter>& f_r)
{
DBG(fprintf(stderr, "av_to_filters: %p\n", av));
if (av == 0) {
return;
}
const I32 len = av_len(av) + 1;
DBG(fprintf(stderr, "av_to_filters: len=%d\n", (int)len));
for (I32 i = 0; i < len; ++i) {
AV *const earr = arr_get_arrval(av, len, i);
if (earr == 0) {
continue;
}
const I32 earrlen = av_len(earr) + 1;
dena::hstcpcli_filter fe;
fe.filter_type = sv_get_string_ref(arr_get_entry(earr, earrlen, 0));
fe.op = sv_get_string_ref(arr_get_entry(earr, earrlen, 1));
fe.ff_offset = sv_get_iv(arr_get_entry(earr, earrlen, 2));
fe.val = sv_get_string_ref(arr_get_entry(earr, earrlen, 3));
f_r.push_back(fe);
DBG(fprintf(stderr, "av_to_filters: %s %s %d %s\n",
fe.filter_action.begin(), fe.filter_op.begin(), (int)fe.ff_offset,
fe.value.begin()));
}
}
static void
set_process_verbose_level(const std::map<std::string, std::string>& m)
{
std::map<std::string, std::string>::const_iterator iter = m.find("verbose");
if (iter != m.end()) {
dena::verbose_level = atoi(iter->second.c_str());
}
}
static AV *
execute_internal(SV *obj, int id, const char *op, AV *keys, int limit,
int skip, const char *modop, AV *modvals, AV *filters)
{
AV *retval = (AV *)&PL_sv_undef;
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
do {
std::vector<dena::string_ref> keyarr, mvarr;
std::vector<dena::hstcpcli_filter> farr;
av_to_strrefarr(keys, keyarr);
dena::string_ref modop_ref;
if (modop != 0) {
modop_ref = dena::string_ref(modop, strlen(modop));
av_to_strrefarr(modvals, mvarr);
}
if (filters != 0) {
av_to_filters(filters, farr);
}
ptr->request_buf_exec_generic(id, dena::string_ref(op, strlen(op)),
&keyarr[0], keyarr.size(), limit, skip, modop_ref, &mvarr[0],
mvarr.size(), &farr[0], farr.size());
AV *const av = newAV();
retval = av;
if (ptr->request_send() != 0) {
break;
}
size_t nflds = 0;
ptr->response_recv(nflds);
const int e = ptr->get_error_code();
DBG(fprintf(stderr, "e=%d nflds=%zu\n", e, nflds));
av_push(av, newSViv(e));
if (e != 0) {
const std::string s = ptr->get_error();
av_push(av, newSVpvn(s.data(), s.size()));
} else {
const dena::string_ref *row = 0;
while ((row = ptr->get_next_row()) != 0) {
DBG(fprintf(stderr, "row=%p\n", row));
for (size_t i = 0; i < nflds; ++i) {
const dena::string_ref& v = row[i];
DBG(fprintf(stderr, "FLD %zu v=%s vbegin=%p\n", i,
std::string(v.begin(), v.size())
.c_str(), v.begin()));
if (v.begin() != 0) {
SV *const e = newSVpvn(
v.begin(), v.size());
av_push(av, e);
} else {
av_push(av, &PL_sv_undef);
}
}
}
}
if (e >= 0) {
ptr->response_buf_remove();
}
} while (0);
return retval;
}
struct execute_arg {
int id;
const char *op;
AV *keys;
int limit;
int skip;
const char *modop;
AV *modvals;
AV *filters;
execute_arg() : id(0), op(0), keys(0), limit(0), skip(0), modop(0),
modvals(0), filters(0) { }
};
static AV *
execute_multi_internal(SV *obj, const execute_arg *args, size_t num_args)
{
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
/* appends multiple requests to the send buffer */
for (size_t i = 0; i < num_args; ++i) {
std::vector<dena::string_ref> keyarr, mvarr;
std::vector<dena::hstcpcli_filter> farr;
const execute_arg& arg = args[i];
av_to_strrefarr(arg.keys, keyarr);
dena::string_ref modop_ref;
if (arg.modop != 0) {
modop_ref = dena::string_ref(arg.modop, strlen(arg.modop));
av_to_strrefarr(arg.modvals, mvarr);
}
if (arg.filters != 0) {
av_to_filters(arg.filters, farr);
}
ptr->request_buf_exec_generic(arg.id,
dena::string_ref(arg.op, strlen(arg.op)), &keyarr[0], keyarr.size(),
arg.limit, arg.skip, modop_ref, &mvarr[0], mvarr.size(), &farr[0],
farr.size());
}
AV *const retval = newAV();
/* sends the requests */
if (ptr->request_send() < 0) {
/* IO error */
AV *const av_respent = newAV();
av_push(retval, newRV_noinc((SV *)av_respent));
av_push(av_respent, newSViv(ptr->get_error_code()));
const std::string& s = ptr->get_error();
av_push(av_respent, newSVpvn(s.data(), s.size()));
return retval; /* retval : [ [ err_code, err_message ] ] */
}
/* receives responses */
for (size_t i = 0; i < num_args; ++i) {
AV *const av_respent = newAV();
av_push(retval, newRV_noinc((SV *)av_respent));
size_t nflds = 0;
const int e = ptr->response_recv(nflds);
av_push(av_respent, newSViv(e));
if (e != 0) {
const std::string& s = ptr->get_error();
av_push(av_respent, newSVpvn(s.data(), s.size()));
} else {
const dena::string_ref *row = 0;
while ((row = ptr->get_next_row()) != 0) {
for (size_t i = 0; i < nflds; ++i) {
const dena::string_ref& v = row[i];
DBG(fprintf(stderr, "%zu %s\n", i,
std::string(v.begin(), v.size()).c_str()));
if (v.begin() != 0) {
av_push(av_respent, newSVpvn(v.begin(), v.size()));
} else {
/* null */
av_push(av_respent, &PL_sv_undef);
}
}
}
}
if (e >= 0) {
ptr->response_buf_remove();
}
if (e < 0) {
return retval;
}
}
return retval;
}
MODULE = Net::HandlerSocket PACKAGE = Net::HandlerSocket
SV *
new(klass, args)
char *klass
HV *args
CODE:
RETVAL = &PL_sv_undef;
dena::config conf;
hv_to_strmap(args, conf);
set_process_verbose_level(conf);
dena::socket_args sargs;
sargs.set(conf);
dena::hstcpcli_ptr p = dena::hstcpcli_i::create(sargs);
SV *const objref = newSViv(0);
SV *const obj = newSVrv(objref, klass);
dena::hstcpcli_i *const ptr = p.get();
sv_setiv(obj, reinterpret_cast<IV>(ptr));
p.release();
SvREADONLY_on(obj);
RETVAL = objref;
OUTPUT:
RETVAL
void
DESTROY(obj)
SV *obj
CODE:
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
delete ptr;
void
close(obj)
SV *obj
CODE:
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
ptr->close();
int
reconnect(obj)
SV *obj
CODE:
RETVAL = 0;
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
RETVAL = ptr->reconnect();
OUTPUT:
RETVAL
int
stable_point(obj)
SV *obj
CODE:
RETVAL = 0;
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
const bool rv = ptr->stable_point();
RETVAL = static_cast<int>(rv);
OUTPUT:
RETVAL
int
get_error_code(obj)
SV *obj
CODE:
RETVAL = 0;
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
RETVAL = ptr->get_error_code();
OUTPUT:
RETVAL
SV *
get_error(obj)
SV *obj
CODE:
RETVAL = &PL_sv_undef;
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
const std::string s = ptr->get_error();
RETVAL = newSVpvn(s.data(), s.size());
OUTPUT:
RETVAL
int
open_index(obj, id, db, table, index, fields, ffields = 0)
SV *obj
int id
const char *db
const char *table
const char *index
const char *fields
SV *ffields
CODE:
const char *const ffields_str = sv_get_strval(ffields);
RETVAL = 0;
dena::hstcpcli_i *const ptr =
reinterpret_cast<dena::hstcpcli_i *>(SvIV(SvRV(obj)));
do {
ptr->request_buf_open_index(id, db, table, index, fields, ffields_str);
if (ptr->request_send() != 0) {
break;
}
size_t nflds = 0;
ptr->response_recv(nflds);
const int e = ptr->get_error_code();
DBG(fprintf(stderr, "errcode=%d\n", ptr->get_error_code()));
if (e >= 0) {
ptr->response_buf_remove();
}
DBG(fprintf(stderr, "errcode=%d\n", ptr->get_error_code()));
} while (0);
RETVAL = ptr->get_error_code();
OUTPUT:
RETVAL
AV *
execute_single(obj, id, op, keys, limit, skip, mop = 0, mvs = 0, fils = 0)
SV *obj
int id
const char *op
AV *keys
int limit
int skip
SV *mop
SV *mvs
SV *fils
CODE:
const char *const mop_str = sv_get_strval(mop);
AV *const mvs_av = sv_get_arrval(mvs);
AV *const fils_av = sv_get_arrval(fils);
RETVAL = execute_internal(obj, id, op, keys, limit, skip, mop_str, mvs_av,
fils_av);
sv_2mortal((SV *)RETVAL);
OUTPUT:
RETVAL
AV *
execute_multi(obj, cmds)
SV *obj
AV *cmds
CODE:
DBG(fprintf(stderr, "execute_multi0\n"));
const I32 cmdsmax = av_len(cmds);
execute_arg args[cmdsmax + 1]; /* GNU */
for (I32 i = 0; i <= cmdsmax; ++i) {
AV *const avtarget = arr_get_arrval(cmds, cmdsmax, i);
if (avtarget == 0) {
DBG(fprintf(stderr, "execute_multi1 %d\n", i));
continue;
}
const I32 argmax = av_len(avtarget);
if (argmax < 2) {
DBG(fprintf(stderr, "execute_multi2 %d\n", i));
continue;
}
execute_arg& ag = args[i];
ag.id = arr_get_intval(avtarget, argmax, 0);
ag.op = arr_get_strval(avtarget, argmax, 1);
ag.keys = arr_get_arrval(avtarget, argmax, 2);
ag.limit = arr_get_intval(avtarget, argmax, 3);
ag.skip = arr_get_intval(avtarget, argmax, 4);
ag.modop = arr_get_strval(avtarget, argmax, 5);
ag.modvals = arr_get_arrval(avtarget, argmax, 6);
ag.filters = arr_get_arrval(avtarget, argmax, 7);
DBG(fprintf(stderr, "execute_multi3 %d: %d %s %p %d %d %s %p %p\n",
i, ag.id, ag.op, ag.keys, ag.limit, ag.skip, ag.modop, ag.modvals,
ag.filters));
}
RETVAL = execute_multi_internal(obj, args, cmdsmax + 1);
sv_2mortal((SV *)RETVAL);
OUTPUT:
RETVAL
AV *
execute_find(obj, id, op, keys, limit, skip, mop = 0, mvs = 0, fils = 0)
SV *obj
int id
const char *op
AV *keys
int limit
int skip
SV *mop
SV *mvs
SV *fils
CODE:
const char *const mop_str = sv_get_strval(mop);
AV *const mvs_av = sv_get_arrval(mvs);
AV *const fils_av = sv_get_arrval(fils);
RETVAL = execute_internal(obj, id, op, keys, limit, skip, mop_str, mvs_av,
fils_av);
sv_2mortal((SV *)RETVAL);
OUTPUT:
RETVAL
AV *
execute_update(obj, id, op, keys, limit, skip, modvals, fils = 0)
SV *obj
int id
const char *op
AV *keys
int limit
int skip
AV *modvals
SV *fils
CODE:
AV *const fils_av = sv_get_arrval(fils);
RETVAL = execute_internal(obj, id, op, keys, limit, skip, "U",
modvals, fils_av);
sv_2mortal((SV *)RETVAL);
OUTPUT:
RETVAL
AV *
execute_delete(obj, id, op, keys, limit, skip, fils = 0)
SV *obj
int id
const char *op
AV *keys
int limit
int skip
SV *fils
CODE:
AV *const fils_av = sv_get_arrval(fils);
RETVAL = execute_internal(obj, id, op, keys, limit, skip, "D", 0, fils_av);
sv_2mortal((SV *)RETVAL);
OUTPUT:
RETVAL
AV *
execute_insert(obj, id, fvals)
SV *obj
int id
AV *fvals
CODE:
RETVAL = execute_internal(obj, id, "+", fvals, 0, 0, 0, 0, 0);
sv_2mortal((SV *)RETVAL);
OUTPUT:
RETVAL

View File

@@ -0,0 +1,8 @@
Changes
HandlerSocket.xs
Makefile.PL
MANIFEST
ppport.h
README
t/HandlerSocket.t
lib/HandlerSocket.pm

View File

@@ -0,0 +1,18 @@
# use 5.010000;
use ExtUtils::MakeMaker;
# See lib/ExtUtils/MakeMaker.pm for details of how to influence
# the contents of the Makefile that is written.
WriteMakefile(
NAME => 'Net::HandlerSocket',
VERSION_FROM => 'lib/Net/HandlerSocket.pm', # finds $VERSION
PREREQ_PM => {}, # e.g., Module::Name => 1.1
($] >= 5.005 ? ## Add these new keywords supported since 5.005
(ABSTRACT_FROM => 'lib/Net/HandlerSocket.pm', # retrieve abstract from module
AUTHOR => 'higuchi dot akira at dena dot jp>') : ()),
CC => 'g++ -fPIC',
LD => 'g++ -fPIC',
LIBS => ['-L../libhsclient -L../libhsclient/.libs -lhsclient'],
DEFINE => '',
INC => '-I. -I../libhsclient',
OPTIMIZE => '-g -O3 -Wall -Wno-unused',
);

View File

@@ -0,0 +1,20 @@
# use 5.010000;
use ExtUtils::MakeMaker;
# See lib/ExtUtils/MakeMaker.pm for details of how to influence
# the contents of the Makefile that is written.
WriteMakefile(
NAME => 'Net::HandlerSocket',
VERSION_FROM => 'lib/Net/HandlerSocket.pm', # finds $VERSION
PREREQ_PM => {}, # e.g., Module::Name => 1.1
($] >= 5.005 ? ## Add these new keywords supported since 5.005
(ABSTRACT_FROM => 'lib/Net/HandlerSocket.pm', # retrieve abstract from module
AUTHOR => 'higuchi dot akira at dena dot jp>') : ()),
CC => 'g++ -fPIC',
LD => 'g++ -fPIC',
LIBS => ['-lhsclient'], # e.g., '-lm'
DEFINE => '', # e.g., '-DHAVE_SOMETHING'
INC => '-I. -I/usr/include/handlersocket',
OPTIMIZE => '-g -O3 -Wall -Wno-unused',
# Un-comment this if you add C files to link with later:
# OBJECT => '$(O_FILES)', # link all the C files too
);

View File

@@ -0,0 +1,30 @@
HandlerSocket version 0.01
==========================
The README is used to introduce the module and provide instructions on
how to install the module, any machine dependencies it may have (for
example C compilers and installed libraries) and any other information
that should be provided before the module is installed.
A README file is required for CPAN modules since CPAN extracts the
README file from a module distribution so that people browsing the
archive can use it get an idea of the modules uses. It is usually a
good idea to provide version information here so that people can
decide whether fixes for the module are worth downloading.
INSTALLATION
To install this module type the following:
perl Makefile.PL
make
make test
make install
DEPENDENCIES
COPYRIGHT AND LICENCE
Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
See COPYRIGHT.txt for details.

View File

@@ -0,0 +1,68 @@
package Net::HandlerSocket;
use strict;
use warnings;
require Exporter;
our @ISA = qw(Exporter);
# Items to export into callers namespace by default. Note: do not export
# names by default without a very good reason. Use EXPORT_OK instead.
# Do not simply export all your public functions/methods/constants.
# This allows declaration use Net::HandlerSocket ':all';
# If you do not need this, moving things directly into @EXPORT or @EXPORT_OK
# will save memory.
our %EXPORT_TAGS = ( 'all' => [ qw(
) ] );
our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );
our @EXPORT = qw(
);
our $VERSION = '0.01';
require XSLoader;
XSLoader::load('Net::HandlerSocket', $VERSION);
# Preloaded methods go here.
1;
__END__
# Below is stub documentation for your module. You'd better edit it!
=head1 NAME
Net::HandlerSocket - Perl extension for blah blah blah
=head1 SYNOPSIS
use Net::HandlerSocket;
my $hsargs = { host => 'localhost', port => 9999 };
my $cli = new Net::HandlerSocket($hsargs);
$cli->open_index(1, 'testdb', 'testtable1', 'PRIMARY', 'foo,bar,baz');
$cli->open_index(2, 'testdb', 'testtable2', 'i2', 'hoge,fuga');
$cli->execute_find(1, '>=', [ 'aaa', 'bbb' ], 5, 100);
# select foo,bar,baz from testdb.testtable1
# where pk1 = 'aaa' and pk2 = 'bbb' order by pk1, pk2
# limit 100, 5
=head1 DESCRIPTION
Stub documentation for Net::HandlerSocket, created by h2xs.
=head1 AUTHOR
Akira HiguchiE<lt>higuchi dot akira at dena dot jpE<gt>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
See COPYRIGHT.txt for details.
=cut

View File

@@ -0,0 +1,362 @@
#!/usr/bin/perl
package Net::HandlerSocket::HSPool;
use strict;
use warnings;
use Net::HandlerSocket;
use Socket;
sub new {
my $self = {
config => $_[1],
reopen_interval => 60,
hostmap => { },
};
return bless $self, $_[0];
}
sub clear_pool {
my ($self) = @_;
$self->{hostmap} = { };
}
sub on_error {
my ($self, $obj) = @_;
my $error_func = $self->{config}->{error};
if (defined($error_func)) {
return &{$error_func}($obj);
}
die $obj;
}
sub on_warning {
my ($self, $obj) = @_;
my $warning_func = $self->{config}->{warning};
if (defined($warning_func)) {
return &{$warning_func}($obj);
}
}
sub get_conf {
my ($self, $dbtbl) = @_;
my $hcent = $self->{config}->{hostmap}->{$dbtbl};
if (!defined($hcent)) {
$self->on_error("get_conf: $dbtbl not found");
return undef;
}
my %cpy = %$hcent;
$cpy{port} ||= 9998;
$cpy{timeout} ||= 2;
return \%cpy;
}
sub resolve_hostname {
my ($self, $hcent, $host_ip_list) = @_;
if (defined($host_ip_list)) {
if (scalar(@$host_ip_list) > 0) {
$hcent->{host} = shift(@$host_ip_list);
return $host_ip_list;
}
return undef; # no more ip
}
my $host = $hcent->{host}; # unresolved name
$hcent->{hostname} = $host;
my $resolve_list_func = $self->{config}->{resolve_list};
if (defined($resolve_list_func)) {
$host_ip_list = &{$resolve_list_func}($host);
if (scalar(@$host_ip_list) > 0) {
$hcent->{host} = shift(@$host_ip_list);
return $host_ip_list;
}
return undef; # no more ip
}
my $resolve_func = $self->{config}->{resolve};
if (defined($resolve_func)) {
$hcent->{host} = &{$resolve_func}($host);
return [];
}
my $packed = gethostbyname($host);
if (!defined($packed)) {
return undef;
}
$hcent->{host} = inet_ntoa($packed);
return [];
}
sub get_handle_exec {
my ($self, $db, $tbl, $idx, $cols, $exec_multi, $exec_args) = @_;
my $now = time();
my $dbtbl = join('.', $db, $tbl);
my $hcent = $self->get_conf($dbtbl); # copy
if (!defined($hcent)) {
return undef;
}
my $hmkey = join(':', $hcent->{host}, $hcent->{port});
my $hment = $self->{hostmap}->{$hmkey};
# [ open_time, handle, index_map, host, next_index_id ]
my $host_ip_list;
TRY_OTHER_IP:
if (!defined($hment) ||
$hment->[0] + $self->{reopen_interval} < $now ||
!$hment->[1]->stable_point()) {
$host_ip_list = $self->resolve_hostname($hcent, $host_ip_list);
if (!defined($host_ip_list)) {
my $hostport = $hmkey . '(' . $hcent->{host} . ')';
$self->on_error("HSPool::get_handle" .
"($db, $tbl, $idx, $cols): host=$hmkey: " .
"no more active ip");
return undef;
}
my $hnd = new Net::HandlerSocket($hcent);
my %m = ();
$hment = [ $now, $hnd, \%m, $hcent->{host}, 1 ];
$self->{hostmap}->{$hmkey} = $hment;
}
my $hnd = $hment->[1];
my $idxmap = $hment->[2];
my $imkey = join(':', $idx, $cols);
my $idx_id = $idxmap->{$imkey};
if (!defined($idx_id)) {
$idx_id = $hment->[4];
my $e = $hnd->open_index($idx_id, $db, $tbl, $idx, $cols);
if ($e != 0) {
my $estr = $hnd->get_error();
my $hostport = $hmkey . '(' . $hcent->{host} . ')';
my $errmess = "HSPool::get_handle open_index" .
"($db, $tbl, $idx, $cols): host=$hostport " .
"err=$e($estr)";
$self->on_warning($errmess);
$hnd->close();
$hment = undef;
goto TRY_OTHER_IP;
}
$hment->[4]++;
$idxmap->{$imkey} = $idx_id;
}
if ($exec_multi) {
my $resarr;
for my $cmdent (@$exec_args) {
$cmdent->[0] = $idx_id;
}
if (scalar(@$exec_args) == 0) {
$resarr = [];
} else {
$resarr = $hnd->execute_multi($exec_args);
}
my $i = 0;
for my $res (@$resarr) {
if ($res->[0] != 0) {
my $cmdent = $exec_args->[$i];
my $ec = $res->[0];
my $estr = $res->[1];
my $op = $cmdent->[1];
my $kfvs = $cmdent->[2];
my $kvstr = defined($kfvs)
? join(',', @$kfvs) : '';
my $limit = $cmdent->[3] || 0;
my $skip = $cmdent->[4] || 0;
my $hostport = $hmkey . '(' . $hcent->{host}
. ')';
my $errmess = "HSPool::get_handle execm" .
"($db, $tbl, $idx, [$cols], " .
"($idx_id), $op, [$kvstr] " .
"$limit, $skip): " .
"host=$hostport err=$ec($estr)";
if ($res->[0] < 0 || $res->[0] == 2) {
$self->on_warning($errmess);
$hnd->close();
$hment = undef;
goto TRY_OTHER_IP;
} else {
$self->on_error($errmess);
}
}
shift(@$res);
++$i;
}
return $resarr;
} else {
my $res = $hnd->execute_find($idx_id, @$exec_args);
if ($res->[0] != 0) {
my ($op, $kfvals, $limit, $skip) = @$exec_args;
my $ec = $res->[0];
my $estr = $res->[1];
my $kvstr = join(',', @$kfvals);
my $hostport = $hmkey . '(' . $hcent->{host} . ')';
my $errmess = "HSPool::get_handle exec" .
"($db, $tbl, $idx, [$cols], ($idx_id), " .
"$op, [$kvstr], $limit, $skip): " .
"host=$hostport err=$ec($estr)";
if ($res->[0] < 0 || $res->[0] == 2) {
$self->on_warning($errmess);
$hnd->close();
$hment = undef;
goto TRY_OTHER_IP;
} else {
$self->on_error($errmess);
}
}
shift(@$res);
return $res;
}
}
sub index_find {
my ($self, $db, $tbl, $idx, $cols, $op, $kfvals, $limit, $skip) = @_;
# cols: comma separated list
# kfvals: arrayref
$limit ||= 0;
$skip ||= 0;
my $res = $self->get_handle_exec($db, $tbl, $idx, $cols,
0, [ $op, $kfvals, $limit, $skip ]);
return $res;
}
sub index_find_multi {
my ($self, $db, $tbl, $idx, $cols, $cmdlist) = @_;
# cols : comma separated list
# cmdlist : [ dummy, op, kfvals, limit, skip ]
# kfvals : arrayref
my $resarr = $self->get_handle_exec($db, $tbl, $idx, $cols,
1, $cmdlist);
return $resarr;
}
sub result_single_to_arrarr {
my ($numcols, $hsres, $ret) = @_;
my $hsreslen = scalar(@$hsres);
my $rlen = int($hsreslen / $numcols);
$ret = [ ] if !defined($ret);
my @r = ();
my $p = 0;
for (my $i = 0; $i < $rlen; ++$i) {
my @a = splice(@$hsres, $p, $numcols);
$p += $numcols;
push(@$ret, \@a);
}
return $ret; # arrayref of arrayrefs
}
sub result_multi_to_arrarr {
my ($numcols, $mhsres, $ret) = @_;
$ret = [ ] if !defined($ret);
for my $hsres (@$mhsres) {
my $hsreslen = scalar(@$hsres);
my $rlen = int($hsreslen / $numcols);
my $p = 0;
for (my $i = 0; $i < $rlen; ++$i) {
my @a = splice(@$hsres, $p, $numcols);
$p += $numcols;
push(@$ret, \@a);
}
}
return $ret; # arrayref of arrayrefs
}
sub result_single_to_hasharr {
my ($names, $hsres, $ret) = @_;
my $nameslen = scalar(@$names);
my $hsreslen = scalar(@$hsres);
my $rlen = int($hsreslen / $nameslen);
$ret = [ ] if !defined($ret);
my $p = 0;
for (my $i = 0; $i < $rlen; ++$i) {
my %h = ();
for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
$h{$names->[$j]} = $hsres->[$p];
}
push(@$ret, \%h);
}
return $ret; # arrayref of hashrefs
}
sub result_multi_to_hasharr {
my ($names, $mhsres, $ret) = @_;
my $nameslen = scalar(@$names);
$ret = [ ] if !defined($ret);
for my $hsres (@$mhsres) {
my $hsreslen = scalar(@$hsres);
my $rlen = int($hsreslen / $nameslen);
my $p = 0;
for (my $i = 0; $i < $rlen; ++$i) {
my %h = ();
for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
$h{$names->[$j]} = $hsres->[$p];
}
push(@$ret, \%h);
}
}
return $ret; # arrayref of hashrefs
}
sub result_single_to_hashhash {
my ($names, $key, $hsres, $ret) = @_;
my $nameslen = scalar(@$names);
my $hsreslen = scalar(@$hsres);
my $rlen = int($hsreslen / $nameslen);
$ret = { } if !defined($ret);
my $p = 0;
for (my $i = 0; $i < $rlen; ++$i) {
my %h = ();
for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
$h{$names->[$j]} = $hsres->[$p];
}
my $k = $h{$key};
$ret->{$k} = \%h if defined($k);
}
return $ret; # hashref of hashrefs
}
sub result_multi_to_hashhash {
my ($names, $key, $mhsres, $ret) = @_;
my $nameslen = scalar(@$names);
$ret = { } if !defined($ret);
for my $hsres (@$mhsres) {
my $hsreslen = scalar(@$hsres);
my $rlen = int($hsreslen / $nameslen);
my $p = 0;
for (my $i = 0; $i < $rlen; ++$i) {
my %h = ();
for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
$h{$names->[$j]} = $hsres->[$p];
}
my $k = $h{$key};
$ret->{$k} = \%h if defined($k);
}
}
return $ret; # hashref of hashrefs
}
sub select_cols_where_eq_aa {
# SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1
my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref) = @_;
my $cols_str = join(',', @$cols_aref);
my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref);
return result_single_to_arrarr(scalar(@$cols_aref), $res);
}
sub select_cols_where_eq_hh {
# SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1
my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref, $retkey) = @_;
my $cols_str = join(',', @$cols_aref);
my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref);
my $r = result_single_to_hashhash($cols_aref, $retkey, $res);
return $r;
}
sub select_cols_where_in_hh {
# SELECT $cols FROM $db.$tbl WHERE $idx_key in ($vals)
my ($self, $db, $tbl, $idx, $cols_aref, $vals_aref, $retkey) = @_;
my $cols_str = join(',', @$cols_aref);
my @cmdlist = ();
for my $v (@$vals_aref) {
push(@cmdlist, [ -1, '=', [ $v ] ]);
}
my $res = $self->index_find_multi($db, $tbl, $idx, $cols_str,
\@cmdlist);
return result_multi_to_hashhash($cols_aref, $retkey, $res);
}
1;

View File

@@ -0,0 +1,127 @@
#
# - HandlerSocket -
# This spec file was automatically generated by cpan2rpm [ver: 2.027]
# The following arguments were used:
# --no-sign perl-Net-HandlerSocket.tar.gz
# For more information on cpan2rpm please visit: http://perl.arix.com/
#
%define pkgname perl-Net-HandlerSocket
%define filelist %{pkgname}-%{version}-filelist
%define NVR %{pkgname}-%{version}-%{release}
%define maketest 1
name: perl-Net-HandlerSocket
summary: HandlerSocket - Perl extension for handlersocket
version: HANDLERSOCKET_VERSION
release: 1%{?dist}
packager: Akira Higuchi <higuchi dot akira at dena dot jp>
license: BSD
group: Applications/CPAN
group: System Environment/Libraries
buildroot: %{_tmppath}/%{name}-%{version}-%(id -u -n)
prefix: %(echo %{_prefix})
source: perl-Net-HandlerSocket.tar.gz
BuildRequires: libhsclient
Requires: libhsclient
Obsoletes: perl-DB-HandlerSocket
%description
Stub documentation for HandlerSocket, created by h2xs. It looks like the
author of the extension was negligent enough to leave the stub
unedited.
#
# This package was generated automatically with the cpan2rpm
# utility. To get this software or for more information
# please visit: http://perl.arix.com/
#
%prep
%setup -q -n %{pkgname}
chmod -R u+w %{_builddir}/%{pkgname}
%build
grep -rsl '^#!.*perl' . |
grep -v '.bak$' |xargs --no-run-if-empty \
%__perl -MExtUtils::MakeMaker -e 'MY->fixin(@ARGV)'
CFLAGS="$RPM_OPT_FLAGS"
%{__perl} Makefile.PL.installed `%{__perl} -MExtUtils::MakeMaker -e ' print qq|PREFIX=%{buildroot}%{_prefix}| if \$ExtUtils::MakeMaker::VERSION =~ /5\.9[1-6]|6\.0[0-5]/ '`
%{__make}
%if %maketest
%{__make} test
%endif
%install
[ "%{buildroot}" != "/" ] && rm -rf %{buildroot}
%{makeinstall} `%{__perl} -MExtUtils::MakeMaker -e ' print \$ExtUtils::MakeMaker::VERSION <= 6.05 ? qq|PREFIX=%{buildroot}%{_prefix}| : qq|DESTDIR=%{buildroot}| '`
cmd=/usr/share/spec-helper/compress_files
[ -x $cmd ] || cmd=/usr/lib/rpm/brp-compress
[ -x $cmd ] && $cmd
# SuSE Linux
if [ -e /etc/SuSE-release -o -e /etc/UnitedLinux-release ]
then
%{__mkdir_p} %{buildroot}/var/adm/perl-modules
%{__cat} `find %{buildroot} -name "perllocal.pod"` \
| %{__sed} -e s+%{buildroot}++g \
> %{buildroot}/var/adm/perl-modules/%{name}
fi
# remove special files
find %{buildroot} -name "perllocal.pod" \
-o -name ".packlist" \
-o -name "*.bs" \
|xargs -i rm -f {}
# no empty directories
find %{buildroot}%{_prefix} \
-type d -depth \
-exec rmdir {} \; 2>/dev/null
%{__perl} -MFile::Find -le '
find({ wanted => \&wanted, no_chdir => 1}, "%{buildroot}");
print "%doc Changes README";
for my $x (sort @dirs, @files) {
push @ret, $x unless indirs($x);
}
print join "\n", sort @ret;
sub wanted {
return if /auto$/;
local $_ = $File::Find::name;
my $f = $_; s|^\Q%{buildroot}\E||;
return unless length;
return $files[@files] = $_ if -f $f;
$d = $_;
/\Q$d\E/ && return for reverse sort @INC;
$d =~ /\Q$_\E/ && return
for qw|/etc %_prefix/man %_prefix/bin %_prefix/share|;
$dirs[@dirs] = $_;
}
sub indirs {
my $x = shift;
$x =~ /^\Q$_\E\// && $x ne $_ && return 1 for @dirs;
}
' > %filelist
[ -z %filelist ] && {
echo "ERROR: empty %files listing"
exit -1
}
%clean
[ "%{buildroot}" != "/" ] && rm -rf %{buildroot}
%files -f %filelist
%defattr(-,root,root)
%changelog
* Thu Apr 1 2010 a@localhost.localdomain
- Initial build.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,15 @@
# Before `make install' is performed this script should be runnable with
# `make test'. After `make install' it should work as `perl HandlerSocket.t'
#########################
# change 'tests => 1' to 'tests => last_test_to_print';
use Test::More tests => 1;
BEGIN { use_ok('Net::HandlerSocket') };
#########################
# Insert your test code below, the Test::More module is use()ed here so read
# its man page ( perldoc Test::More ) for help writing this test script.