1
0
mirror of https://github.com/MariaDB/server.git synced 2025-11-28 17:36:30 +03:00
Files
mariadb/storage/sphinx/ha_sphinx.cc
Nikita Malyavin 21809f9a45 MDEV-17556 Assertion `bitmap_is_set_all(&table->s->all_set)' failed
The assertion failed in handler::ha_reset upon SELECT under
READ UNCOMMITTED from table with index on virtual column.

This was the debug-only failure, though the problem is mush wider:
* MY_BITMAP is a structure containing my_bitmap_map, the latter is a raw
 bitmap.
* read_set, write_set and vcol_set of TABLE are the pointers to MY_BITMAP
* The rest of MY_BITMAPs are stored in TABLE and TABLE_SHARE
* The pointers to the stored MY_BITMAPs, like orig_read_set etc, and
 sometimes all_set and tmp_set, are assigned to the pointers.
* Sometimes tmp_use_all_columns is used to substitute the raw bitmap
 directly with all_set.bitmap
* Sometimes even bitmaps are directly modified, like in
TABLE::update_virtual_field(): bitmap_clear_all(&tmp_set) is called.

The last three bullets in the list, when used together (which is mostly
always) make the program flow cumbersome and impossible to follow,
notwithstanding the errors they cause, like this MDEV-17556, where tmp_set
pointer was assigned to read_set, write_set and vcol_set, then its bitmap
was substituted with all_set.bitmap by dbug_tmp_use_all_columns() call,
and then bitmap_clear_all(&tmp_set) was applied to all this.

To untangle this knot, the rule should be applied:
* Never substitute bitmaps! This patch is about this.
 orig_*, all_set bitmaps are never substituted already.

This patch changes the following function prototypes:
* tmp_use_all_columns, dbug_tmp_use_all_columns
 to accept MY_BITMAP** and to return MY_BITMAP * instead of my_bitmap_map*
* tmp_restore_column_map, dbug_tmp_restore_column_maps to accept
 MY_BITMAP* instead of my_bitmap_map*

These functions now will substitute read_set/write_set/vcol_set directly,
and won't touch underlying bitmaps.
2021-01-27 00:50:55 +10:00

3702 lines
92 KiB
C++

//
// $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
//
//
// Copyright (c) 2001-2014, Andrew Aksyonoff
// Copyright (c) 2008-2014, Sphinx Technologies Inc
// All rights reserved
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License. You should have
// received a copy of the GPL license along with this program; if you
// did not, you can find it at http://www.gnu.org/
//
#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation // gcc: Class implementation
#endif
#if defined(_MSC_VER) && _MSC_VER>=1400
#define _CRT_SECURE_NO_DEPRECATE 1
#define _CRT_NONSTDC_NO_DEPRECATE 1
#endif
#include <my_global.h>
#include <mysql_version.h>
#if MYSQL_VERSION_ID>=50515
#include "sql_class.h"
#include "sql_array.h"
#elif MYSQL_VERSION_ID>50100
#include "mysql_priv.h"
#include <mysql/plugin.h>
#else
#include "../mysql_priv.h"
#endif
#include <mysys_err.h>
#include <my_sys.h>
#include <mysql.h> // include client for INSERT table (sort of redoing federated..)
#ifndef __WIN__
// UNIX-specific
#include <my_net.h>
#include <netdb.h>
#include <sys/un.h>
#define RECV_FLAGS MSG_WAITALL
#define sphSockClose(_sock) ::close(_sock)
#else
// Windows-specific
#include <io.h>
#define snprintf _snprintf
#define RECV_FLAGS 0
#define sphSockClose(_sock) ::closesocket(_sock)
#endif
#include <ctype.h>
#include "ha_sphinx.h"
#ifndef MSG_WAITALL
#define MSG_WAITALL 0
#endif
#if defined(_MSC_VER) && _MSC_VER>=1400
#pragma warning(push,4)
#endif
/////////////////////////////////////////////////////////////////////////////
/// there might be issues with min() on different platforms (eg. Gentoo, they say)
#define Min(a,b) ((a)<(b)?(a):(b))
/// unaligned RAM accesses are forbidden on SPARC
#if defined(sparc) || defined(__sparc__)
#define UNALIGNED_RAM_ACCESS 0
#else
#define UNALIGNED_RAM_ACCESS 1
#endif
#if UNALIGNED_RAM_ACCESS
/// pass-through wrapper
template < typename T > inline T sphUnalignedRead ( const T & tRef )
{
return tRef;
}
/// pass-through wrapper
template < typename T > void sphUnalignedWrite ( void * pPtr, const T & tVal )
{
*(T*)pPtr = tVal;
}
#else
/// unaligned read wrapper for some architectures (eg. SPARC)
template < typename T >
inline T sphUnalignedRead ( const T & tRef )
{
T uTmp;
byte * pSrc = (byte *) &tRef;
byte * pDst = (byte *) &uTmp;
for ( int i=0; i<(int)sizeof(T); i++ )
*pDst++ = *pSrc++;
return uTmp;
}
/// unaligned write wrapper for some architectures (eg. SPARC)
template < typename T >
void sphUnalignedWrite ( void * pPtr, const T & tVal )
{
byte * pDst = (byte *) pPtr;
byte * pSrc = (byte *) &tVal;
for ( int i=0; i<(int)sizeof(T); i++ )
*pDst++ = *pSrc++;
}
#endif
#if MYSQL_VERSION_ID>=50515
#define sphinx_hash_init my_hash_init
#define sphinx_hash_free my_hash_free
#define sphinx_hash_search my_hash_search
#define sphinx_hash_delete my_hash_delete
#else
#define sphinx_hash_init hash_init
#define sphinx_hash_free hash_free
#define sphinx_hash_search hash_search
#define sphinx_hash_delete hash_delete
#endif
/////////////////////////////////////////////////////////////////////////////
// FIXME! make this all dynamic
#define SPHINXSE_MAX_FILTERS 32
#define SPHINXAPI_DEFAULT_HOST "127.0.0.1"
#define SPHINXAPI_DEFAULT_PORT 9312
#define SPHINXAPI_DEFAULT_INDEX "*"
#define SPHINXQL_DEFAULT_PORT 9306
#define SPHINXSE_SYSTEM_COLUMNS 3
#define SPHINXSE_MAX_ALLOC (16*1024*1024)
#define SPHINXSE_MAX_KEYWORDSTATS 4096
#define SPHINXSE_VERSION "2.2.6-release"
// FIXME? the following is cut-n-paste from sphinx.h and searchd.cpp
// cut-n-paste is somewhat simpler that adding dependencies however..
enum
{
SPHINX_SEARCHD_PROTO = 1,
SEARCHD_COMMAND_SEARCH = 0,
VER_COMMAND_SEARCH = 0x119,
};
/// search query sorting orders
enum ESphSortOrder
{
SPH_SORT_RELEVANCE = 0, ///< sort by document relevance desc, then by date
SPH_SORT_ATTR_DESC = 1, ///< sort by document date desc, then by relevance desc
SPH_SORT_ATTR_ASC = 2, ///< sort by document date asc, then by relevance desc
SPH_SORT_TIME_SEGMENTS = 3, ///< sort by time segments (hour/day/week/etc) desc, then by relevance desc
SPH_SORT_EXTENDED = 4, ///< sort by SQL-like expression (eg. "@relevance DESC, price ASC, @id DESC")
SPH_SORT_EXPR = 5, ///< sort by expression
SPH_SORT_TOTAL
};
/// search query matching mode
enum ESphMatchMode
{
SPH_MATCH_ALL = 0, ///< match all query words
SPH_MATCH_ANY, ///< match any query word
SPH_MATCH_PHRASE, ///< match this exact phrase
SPH_MATCH_BOOLEAN, ///< match this boolean query
SPH_MATCH_EXTENDED, ///< match this extended query
SPH_MATCH_FULLSCAN, ///< match all document IDs w/o fulltext query, apply filters
SPH_MATCH_EXTENDED2, ///< extended engine V2
SPH_MATCH_TOTAL
};
/// search query relevance ranking mode
enum ESphRankMode
{
SPH_RANK_PROXIMITY_BM25 = 0, ///< default mode, phrase proximity major factor and BM25 minor one
SPH_RANK_BM25 = 1, ///< statistical mode, BM25 ranking only (faster but worse quality)
SPH_RANK_NONE = 2, ///< no ranking, all matches get a weight of 1
SPH_RANK_WORDCOUNT = 3, ///< simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
SPH_RANK_PROXIMITY = 4, ///< phrase proximity
SPH_RANK_MATCHANY = 5, ///< emulate old match-any weighting
SPH_RANK_FIELDMASK = 6, ///< sets bits where there were matches
SPH_RANK_SPH04 = 7, ///< codename SPH04, phrase proximity + bm25 + head/exact boost
SPH_RANK_EXPR = 8, ///< expression based ranker
SPH_RANK_TOTAL,
SPH_RANK_DEFAULT = SPH_RANK_PROXIMITY_BM25
};
/// search query grouping mode
enum ESphGroupBy
{
SPH_GROUPBY_DAY = 0, ///< group by day
SPH_GROUPBY_WEEK = 1, ///< group by week
SPH_GROUPBY_MONTH = 2, ///< group by month
SPH_GROUPBY_YEAR = 3, ///< group by year
SPH_GROUPBY_ATTR = 4, ///< group by attribute value
SPH_GROUPBY_ATTRPAIR = 5, ///< group by sequential attrs pair (rendered redundant by 64bit attrs support; removed)
SPH_GROUPBY_MULTIPLE = 6 ///< group by on multiple attribute values
};
/// known attribute types
enum
{
SPH_ATTR_NONE = 0, ///< not an attribute at all
SPH_ATTR_INTEGER = 1, ///< this attr is just an integer
SPH_ATTR_TIMESTAMP = 2, ///< this attr is a timestamp
SPH_ATTR_ORDINAL = 3, ///< this attr is an ordinal string number (integer at search time, specially handled at indexing time)
SPH_ATTR_BOOL = 4, ///< this attr is a boolean bit field
SPH_ATTR_FLOAT = 5,
SPH_ATTR_BIGINT = 6,
SPH_ATTR_STRING = 7, ///< string (binary; in-memory)
SPH_ATTR_UINT32SET = 0x40000001UL, ///< this attr is multiple int32 values (0 or more)
SPH_ATTR_UINT64SET = 0x40000002UL ///< this attr is multiple int64 values (0 or more)
};
/// known answers
enum
{
SEARCHD_OK = 0, ///< general success, command-specific reply follows
SEARCHD_ERROR = 1, ///< general failure, error message follows
SEARCHD_RETRY = 2, ///< temporary failure, error message follows, client should retry later
SEARCHD_WARNING = 3 ///< general success, warning message and command-specific reply follow
};
//////////////////////////////////////////////////////////////////////////////
#define SPHINX_DEBUG_OUTPUT 0
#define SPHINX_DEBUG_CALLS 0
#include <stdarg.h>
#if SPHINX_DEBUG_OUTPUT
inline void SPH_DEBUG ( const char * format, ... )
{
va_list ap;
va_start ( ap, format );
fprintf ( stderr, "SphinxSE: " );
vfprintf ( stderr, format, ap );
fprintf ( stderr, "\n" );
va_end ( ap );
}
#else
inline void SPH_DEBUG ( const char *, ... ) {}
#endif
#if SPHINX_DEBUG_CALLS
#define SPH_ENTER_FUNC() { SPH_DEBUG ( "enter %s", __FUNCTION__ ); }
#define SPH_ENTER_METHOD() { SPH_DEBUG ( "enter %s(this=%08x)", __FUNCTION__, this ); }
#define SPH_RET(_arg) { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return _arg; }
#define SPH_VOID_RET() { SPH_DEBUG ( "leave %s", __FUNCTION__ ); return; }
#else
#define SPH_ENTER_FUNC()
#define SPH_ENTER_METHOD()
#define SPH_RET(_arg) { return(_arg); }
#define SPH_VOID_RET() { return; }
#endif
#define SafeDelete(_arg) { delete ( _arg ); (_arg) = NULL; }
#define SafeDeleteArray(_arg) { if ( _arg ) { delete [] ( _arg ); (_arg) = NULL; } }
//////////////////////////////////////////////////////////////////////////////
/// per-table structure that will be shared among all open Sphinx SE handlers
struct CSphSEShare
{
pthread_mutex_t m_tMutex;
THR_LOCK m_tLock;
char * m_sTable;
char * m_sScheme; ///< our connection string
char * m_sHost; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
char * m_sSocket; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
char * m_sIndex; ///< points into m_sScheme buffer, DO NOT FREE EXPLICITLY
ushort m_iPort;
bool m_bSphinxQL; ///< is this read-only SphinxAPI table, or write-only SphinxQL table?
uint m_iTableNameLen;
uint m_iUseCount;
#if MYSQL_VERSION_ID<50610
CHARSET_INFO * m_pTableQueryCharset;
#else
const CHARSET_INFO * m_pTableQueryCharset;
#endif
int m_iTableFields;
char ** m_sTableField;
enum_field_types * m_eTableFieldType;
CSphSEShare ()
: m_sTable ( NULL )
, m_sScheme ( NULL )
, m_sHost ( NULL )
, m_sSocket ( NULL )
, m_sIndex ( NULL )
, m_iPort ( 0 )
, m_bSphinxQL ( false )
, m_iTableNameLen ( 0 )
, m_iUseCount ( 1 )
, m_pTableQueryCharset ( NULL )
, m_iTableFields ( 0 )
, m_sTableField ( NULL )
, m_eTableFieldType ( NULL )
{
thr_lock_init ( &m_tLock );
pthread_mutex_init ( &m_tMutex, MY_MUTEX_INIT_FAST );
}
~CSphSEShare ()
{
pthread_mutex_destroy ( &m_tMutex );
thr_lock_delete ( &m_tLock );
SafeDeleteArray ( m_sTable );
SafeDeleteArray ( m_sScheme );
ResetTable ();
}
void ResetTable ()
{
for ( int i=0; i<m_iTableFields; i++ )
SafeDeleteArray ( m_sTableField[i] );
SafeDeleteArray ( m_sTableField );
SafeDeleteArray ( m_eTableFieldType );
}
};
/// schema attribute
struct CSphSEAttr
{
char * m_sName; ///< attribute name (received from Sphinx)
uint32 m_uType; ///< attribute type (received from Sphinx)
int m_iField; ///< field index in current table (-1 if none)
CSphSEAttr()
: m_sName ( NULL )
, m_uType ( SPH_ATTR_NONE )
, m_iField ( -1 )
{}
~CSphSEAttr ()
{
SafeDeleteArray ( m_sName );
}
};
/// word stats
struct CSphSEWordStats
{
char * m_sWord;
int m_iDocs;
int m_iHits;
CSphSEWordStats ()
: m_sWord ( NULL )
, m_iDocs ( 0 )
, m_iHits ( 0 )
{}
~CSphSEWordStats ()
{
SafeDeleteArray ( m_sWord );
}
};
/// request stats
struct CSphSEStats
{
public:
int m_iMatchesTotal;
int m_iMatchesFound;
int m_iQueryMsec;
int m_iWords;
CSphSEWordStats * m_dWords;
bool m_bLastError;
char m_sLastMessage[1024];
CSphSEStats()
: m_dWords ( NULL )
{
Reset ();
}
void Reset ()
{
m_iMatchesTotal = 0;
m_iMatchesFound = 0;
m_iQueryMsec = 0;
m_iWords = 0;
m_bLastError = false;
m_sLastMessage[0] = '\0';
SafeDeleteArray ( m_dWords );
}
~CSphSEStats()
{
SafeDeleteArray ( m_dWords );
}
};
/// thread local storage
struct CSphSEThreadTable
{
static const int MAX_QUERY_LEN = 262144; // 256k should be enough, right?
bool m_bStats;
CSphSEStats m_tStats;
bool m_bQuery;
char m_sQuery[MAX_QUERY_LEN];
#if MYSQL_VERSION_ID<50610
CHARSET_INFO * m_pQueryCharset;
#else
const CHARSET_INFO * m_pQueryCharset;
#endif
bool m_bReplace; ///< are we doing an INSERT or REPLACE
bool m_bCondId; ///< got a value from condition pushdown
longlong m_iCondId; ///< value acquired from id=value condition pushdown
bool m_bCondDone; ///< index_read() is now over
const ha_sphinx * m_pHandler;
CSphSEThreadTable * m_pTableNext;
CSphSEThreadTable ( const ha_sphinx * pHandler )
: m_bStats ( false )
, m_bQuery ( false )
, m_pQueryCharset ( NULL )
, m_bReplace ( false )
, m_bCondId ( false )
, m_iCondId ( 0 )
, m_bCondDone ( false )
, m_pHandler ( pHandler )
, m_pTableNext ( NULL )
{}
};
struct CSphTLS
{
CSphSEThreadTable * m_pHeadTable;
explicit CSphTLS ( const ha_sphinx * pHandler )
{
m_pHeadTable = new CSphSEThreadTable ( pHandler );
}
~CSphTLS()
{
CSphSEThreadTable * pCur = m_pHeadTable;
while ( pCur )
{
CSphSEThreadTable * pNext = pCur->m_pTableNext;
SafeDelete ( pCur );
pCur = pNext;
}
}
};
/// filter types
enum ESphFilter
{
SPH_FILTER_VALUES = 0, ///< filter by integer values set
SPH_FILTER_RANGE = 1, ///< filter by integer range
SPH_FILTER_FLOATRANGE = 2 ///< filter by float range
};
/// search query filter
struct CSphSEFilter
{
public:
ESphFilter m_eType;
char * m_sAttrName;
longlong m_uMinValue;
longlong m_uMaxValue;
float m_fMinValue;
float m_fMaxValue;
int m_iValues;
longlong * m_pValues;
int m_bExclude;
public:
CSphSEFilter ()
: m_eType ( SPH_FILTER_VALUES )
, m_sAttrName ( NULL )
, m_uMinValue ( 0 )
, m_uMaxValue ( UINT_MAX )
, m_fMinValue ( 0.0f )
, m_fMaxValue ( 0.0f )
, m_iValues ( 0 )
, m_pValues ( NULL )
, m_bExclude ( 0 )
{
}
~CSphSEFilter ()
{
SafeDeleteArray ( m_pValues );
}
};
/// float vs dword conversion
inline uint32 sphF2DW ( float f ) { union { float f; uint32 d; } u; u.f = f; return u.d; }
/// dword vs float conversion
inline float sphDW2F ( uint32 d ) { union { float f; uint32 d; } u; u.d = d; return u.f; }
/// client-side search query
struct CSphSEQuery
{
public:
const char * m_sHost;
int m_iPort;
private:
char * m_sQueryBuffer;
const char * m_sIndex;
int m_iOffset;
int m_iLimit;
bool m_bQuery;
const char * m_sQuery;
uint32 * m_pWeights;
int m_iWeights;
ESphMatchMode m_eMode;
ESphRankMode m_eRanker;
char * m_sRankExpr;
ESphSortOrder m_eSort;
const char * m_sSortBy;
int m_iMaxMatches;
int m_iMaxQueryTime;
uint32 m_iMinID;
uint32 m_iMaxID;
int m_iFilters;
CSphSEFilter m_dFilters[SPHINXSE_MAX_FILTERS];
ESphGroupBy m_eGroupFunc;
const char * m_sGroupBy;
const char * m_sGroupSortBy;
int m_iCutoff;
int m_iRetryCount;
int m_iRetryDelay;
const char * m_sGroupDistinct; ///< points to query buffer; do NOT delete
int m_iIndexWeights;
char * m_sIndexWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
int m_iIndexWeight[SPHINXSE_MAX_FILTERS];
int m_iFieldWeights;
char * m_sFieldWeight[SPHINXSE_MAX_FILTERS]; ///< points to query buffer; do NOT delete
int m_iFieldWeight[SPHINXSE_MAX_FILTERS];
bool m_bGeoAnchor;
const char * m_sGeoLatAttr;
const char * m_sGeoLongAttr;
float m_fGeoLatitude;
float m_fGeoLongitude;
char * m_sComment;
char * m_sSelect;
struct Override_t
{
union Value_t
{
uint32 m_uValue;
longlong m_iValue64;
float m_fValue;
};
char * m_sName; ///< points to query buffer
int m_iType;
Dynamic_array<ulonglong> m_dIds;
Dynamic_array<Value_t> m_dValues;
};
Dynamic_array<Override_t *> m_dOverrides;
public:
char m_sParseError[256];
public:
CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex );
~CSphSEQuery ();
bool Parse ();
int BuildRequest ( char ** ppBuffer );
protected:
char * m_pBuf;
char * m_pCur;
int m_iBufLeft;
bool m_bBufOverrun;
template < typename T > int ParseArray ( T ** ppValues, const char * sValue );
bool ParseField ( char * sField );
void SendBytes ( const void * pBytes, int iBytes );
void SendWord ( short int v ) { v = ntohs(v); SendBytes ( &v, sizeof(v) ); }
void SendInt ( int v ) { v = ntohl(v); SendBytes ( &v, sizeof(v) ); }
void SendDword ( uint v ) { v = ntohl(v) ;SendBytes ( &v, sizeof(v) ); }
void SendUint64 ( ulonglong v ) { SendDword ( (uint)(v>>32) ); SendDword ( (uint)(v&0xFFFFFFFFUL) ); }
void SendString ( const char * v ) { int iLen = strlen(v); SendDword(iLen); SendBytes ( v, iLen ); }
void SendFloat ( float v ) { SendDword ( sphF2DW(v) ); }
};
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
template int CSphSEQuery::ParseArray<uint32> ( uint32 **, const char * );
template int CSphSEQuery::ParseArray<longlong> ( longlong **, const char * );
#endif
//////////////////////////////////////////////////////////////////////////////
#if MYSQL_VERSION_ID>50100
#if MYSQL_VERSION_ID<50114
#error Sphinx SE requires MySQL 5.1.14 or higher if compiling for 5.1.x series!
#endif
static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root );
static int sphinx_init_func ( void * p );
static int sphinx_close_connection ( handlerton * hton, THD * thd );
static int sphinx_panic ( handlerton * hton, enum ha_panic_function flag );
static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type );
#else
static bool sphinx_init_func_for_handlerton ();
static int sphinx_close_connection ( THD * thd );
bool sphinx_show_status ( THD * thd );
#endif // >50100
//////////////////////////////////////////////////////////////////////////////
static const char sphinx_hton_name[] = "SPHINX";
static const char sphinx_hton_comment[] = "Sphinx storage engine " SPHINXSE_VERSION;
#if MYSQL_VERSION_ID<50100
handlerton sphinx_hton =
{
#ifdef MYSQL_HANDLERTON_INTERFACE_VERSION
MYSQL_HANDLERTON_INTERFACE_VERSION,
#endif
sphinx_hton_name,
SHOW_OPTION_YES,
sphinx_hton_comment,
DB_TYPE_SPHINX_DB,
sphinx_init_func_for_handlerton,
0, // slot
0, // savepoint size
sphinx_close_connection, // close_connection
NULL, // savepoint
NULL, // rollback to savepoint
NULL, // release savepoint
NULL, // commit
NULL, // rollback
NULL, // prepare
NULL, // recover
NULL, // commit_by_xid
NULL, // rollback_by_xid
NULL, // create_cursor_read_view
NULL, // set_cursor_read_view
NULL, // close_cursor_read_view
HTON_CAN_RECREATE
};
#else
static handlerton * sphinx_hton_ptr = NULL;
#endif
//////////////////////////////////////////////////////////////////////////////
// variables for Sphinx shared methods
pthread_mutex_t sphinx_mutex; // mutex to init the hash
static int sphinx_init = 0; // flag whether the hash was initialized
static HASH sphinx_open_tables; // hash used to track open tables
//////////////////////////////////////////////////////////////////////////////
// INITIALIZATION AND SHUTDOWN
//////////////////////////////////////////////////////////////////////////////
// hashing function
#if MYSQL_VERSION_ID>=50120
typedef size_t GetKeyLength_t;
#else
typedef uint GetKeyLength_t;
#endif
static byte * sphinx_get_key ( const byte * pSharePtr, GetKeyLength_t * pLength, my_bool )
{
CSphSEShare * pShare = (CSphSEShare *) pSharePtr;
*pLength = (size_t) pShare->m_iTableNameLen;
return (byte*) pShare->m_sTable;
}
#if MYSQL_VERSION_ID<50100
static int sphinx_init_func ( void * ) // to avoid unused arg warning
#else
static int sphinx_init_func ( void * p )
#endif
{
SPH_ENTER_FUNC();
if ( !sphinx_init )
{
sphinx_init = 1;
void ( pthread_mutex_init ( &sphinx_mutex, MY_MUTEX_INIT_FAST ) );
sphinx_hash_init ( &sphinx_open_tables, system_charset_info, 32, 0, 0,
sphinx_get_key, 0, 0 );
#if MYSQL_VERSION_ID > 50100
handlerton * hton = (handlerton*) p;
hton->state = SHOW_OPTION_YES;
hton->db_type = DB_TYPE_AUTOASSIGN;
hton->create = sphinx_create_handler;
hton->close_connection = sphinx_close_connection;
hton->show_status = sphinx_show_status;
hton->panic = sphinx_panic;
hton->flags = HTON_CAN_RECREATE;
#endif
}
SPH_RET(0);
}
#if MYSQL_VERSION_ID<50100
static bool sphinx_init_func_for_handlerton ()
{
return sphinx_init_func ( &sphinx_hton );
}
#endif
#if MYSQL_VERSION_ID>50100
static int sphinx_close_connection ( handlerton * hton, THD * thd )
{
// deallocate common handler data
SPH_ENTER_FUNC();
void ** tmp = thd_ha_data ( thd, hton );
CSphTLS * pTls = (CSphTLS *) (*tmp);
SafeDelete ( pTls );
*tmp = NULL;
SPH_RET(0);
}
static int sphinx_done_func ( void * )
{
SPH_ENTER_FUNC();
int error __attribute__ ((unused)) = 0;
if ( sphinx_init )
{
sphinx_init = 0;
if ( sphinx_open_tables.records )
error = 1;
sphinx_hash_free ( &sphinx_open_tables );
pthread_mutex_destroy ( &sphinx_mutex );
}
SPH_RET(0);
}
static int sphinx_panic ( handlerton * hton, enum ha_panic_function )
{
return sphinx_done_func ( hton );
}
#else
static int sphinx_close_connection ( THD * thd )
{
// deallocate common handler data
SPH_ENTER_FUNC();
CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
SafeDelete ( pTls );
thd->ha_data[sphinx_hton.slot] = NULL;
SPH_RET(0);
}
#endif // >50100
//////////////////////////////////////////////////////////////////////////////
// SHOW STATUS
//////////////////////////////////////////////////////////////////////////////
#if MYSQL_VERSION_ID>50100
static bool sphinx_show_status ( handlerton * hton, THD * thd, stat_print_fn * stat_print,
enum ha_stat_type )
#else
bool sphinx_show_status ( THD * thd )
#endif
{
SPH_ENTER_FUNC();
#if MYSQL_VERSION_ID<50100
Protocol * protocol = thd->protocol;
List<Item> field_list;
#endif
char buf1[IO_SIZE];
uint buf1len;
char buf2[IO_SIZE];
uint buf2len = 0;
String words;
buf1[0] = '\0';
buf2[0] = '\0';
#if MYSQL_VERSION_ID>50100
// 5.1.x style stats
CSphTLS * pTls = (CSphTLS*) ( *thd_ha_data ( thd, hton ) );
#define LOC_STATS(_key,_keylen,_val,_vallen) \
stat_print ( thd, sphinx_hton_name, strlen(sphinx_hton_name), _key, _keylen, _val, _vallen );
#else
// 5.0.x style stats
if ( have_sphinx_db!=SHOW_OPTION_YES )
{
my_message ( ER_NOT_SUPPORTED_YET,
"failed to call SHOW SPHINX STATUS: --skip-sphinx was specified",
MYF(0) );
SPH_RET(TRUE);
}
CSphTLS * pTls = (CSphTLS*) thd->ha_data[sphinx_hton.slot];
field_list.push_back ( new Item_empty_string ( thd, "Type", 10 ) );
field_list.push_back ( new Item_empty_string ( thd, "Name", FN_REFLEN ) );
field_list.push_back ( new Item_empty_string ( thd, "Status", 10 ) );
if ( protocol->send_fields ( &field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF ) )
SPH_RET(TRUE);
#define LOC_STATS(_key,_keylen,_val,_vallen) \
protocol->prepare_for_resend (); \
protocol->store ( "SPHINX", 6, system_charset_info ); \
protocol->store ( _key, _keylen, system_charset_info ); \
protocol->store ( _val, _vallen, system_charset_info ); \
if ( protocol->write() ) \
SPH_RET(TRUE);
#endif
// show query stats
if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
{
const CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
buf1len = my_snprintf ( buf1, sizeof(buf1),
"total: %d, total found: %d, time: %d, words: %d",
pStats->m_iMatchesTotal, pStats->m_iMatchesFound, pStats->m_iQueryMsec, pStats->m_iWords );
LOC_STATS ( "stats", 5, buf1, buf1len );
if ( pStats->m_iWords )
{
for ( int i=0; i<pStats->m_iWords; i++ )
{
CSphSEWordStats & tWord = pStats->m_dWords[i];
buf2len = my_snprintf ( buf2, sizeof(buf2), "%s%s:%d:%d ",
buf2, tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
}
// convert it if we can
const char * sWord = buf2;
int iWord = buf2len;
String sBuf3;
if ( pTls->m_pHeadTable->m_pQueryCharset )
{
uint iErrors;
sBuf3.copy ( buf2, buf2len, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
sWord = sBuf3.c_ptr();
iWord = sBuf3.length();
}
LOC_STATS ( "words", 5, sWord, iWord );
}
}
// show last error or warning (either in addition to stats, or on their own)
if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_tStats.m_sLastMessage[0] )
{
const char * sMessageType = pTls->m_pHeadTable->m_tStats.m_bLastError ? "error" : "warning";
LOC_STATS (
sMessageType, strlen ( sMessageType ),
pTls->m_pHeadTable->m_tStats.m_sLastMessage, strlen ( pTls->m_pHeadTable->m_tStats.m_sLastMessage ) );
} else
{
// well, nothing to show just yet
#if MYSQL_VERSION_ID < 50100
LOC_STATS ( "stats", 5, "no query has been executed yet", sizeof("no query has been executed yet")-1 );
#endif
}
#if MYSQL_VERSION_ID < 50100
send_eof(thd);
#endif
SPH_RET(FALSE);
}
//////////////////////////////////////////////////////////////////////////////
// HELPERS
//////////////////////////////////////////////////////////////////////////////
static char * sphDup ( const char * sSrc, int iLen=-1 )
{
if ( !sSrc )
return NULL;
if ( iLen<0 )
iLen = strlen(sSrc);
char * sRes = new char [ 1+iLen ];
memcpy ( sRes, sSrc, iLen );
sRes[iLen] = '\0';
return sRes;
}
static void sphLogError ( const char * sFmt, ... )
{
// emit timestamp
#ifdef __WIN__
SYSTEMTIME t;
GetLocalTime ( &t );
fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
(int)t.wYear % 100, (int)t.wMonth, (int)t.wDay,
(int)t.wHour, (int)t.wMinute, (int)t.wSecond );
#else
// Unix version
time_t tStamp;
time ( &tStamp );
struct tm * pParsed;
#ifdef HAVE_LOCALTIME_R
struct tm tParsed;
localtime_r ( &tStamp, &tParsed );
pParsed = &tParsed;
#else
pParsed = localtime ( &tStamp );
#endif // HAVE_LOCALTIME_R
fprintf ( stderr, "%02d%02d%02d %2d:%02d:%02d SphinxSE: internal error: ",
pParsed->tm_year % 100, pParsed->tm_mon + 1, pParsed->tm_mday,
pParsed->tm_hour, pParsed->tm_min, pParsed->tm_sec);
#endif // __WIN__
// emit message
va_list ap;
va_start ( ap, sFmt );
vfprintf ( stderr, sFmt, ap );
va_end ( ap );
// emit newline
fprintf ( stderr, "\n" );
}
// the following scheme variants are recognized
//
// sphinx://host[:port]/index
// sphinxql://host[:port]/index
// unix://unix/domain/socket[:index]
static bool ParseUrl ( CSphSEShare * share, TABLE * table, bool bCreate )
{
SPH_ENTER_FUNC();
if ( share )
{
// check incoming stuff
if ( !table )
{
sphLogError ( "table==NULL in ParseUrl()" );
return false;
}
if ( !table->s )
{
sphLogError ( "(table->s)==NULL in ParseUrl()" );
return false;
}
// free old stuff
share->ResetTable ();
// fill new stuff
share->m_iTableFields = table->s->fields;
if ( share->m_iTableFields )
{
share->m_sTableField = new char * [ share->m_iTableFields ];
share->m_eTableFieldType = new enum_field_types [ share->m_iTableFields ];
for ( int i=0; i<share->m_iTableFields; i++ )
{
share->m_sTableField[i] = sphDup ( table->field[i]->field_name.str );
share->m_eTableFieldType[i] = table->field[i]->type();
}
}
}
// defaults
bool bOk = true;
bool bQL = false;
char * sScheme = NULL;
char * sHost = (char*) SPHINXAPI_DEFAULT_HOST;
char * sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
int iPort = SPHINXAPI_DEFAULT_PORT;
// parse connection string, if any
while ( table->s->connect_string.length!=0 )
{
sScheme = sphDup ( table->s->connect_string.str, table->s->connect_string.length );
sHost = strstr ( sScheme, "://" );
if ( !sHost )
{
bOk = false;
break;
}
sHost[0] = '\0';
sHost += 3;
/////////////////////////////
// sphinxapi via unix socket
/////////////////////////////
if ( !strcmp ( sScheme, "unix" ) )
{
sHost--; // reuse last slash
iPort = 0;
if (!( sIndex = strrchr ( sHost, ':' ) ))
sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
else
{
*sIndex++ = '\0';
if ( !*sIndex )
sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
}
bOk = true;
break;
}
/////////////////////
// sphinxapi via tcp
/////////////////////
if ( !strcmp ( sScheme, "sphinx" ) )
{
char * sPort = strchr ( sHost, ':' );
if ( sPort )
{
*sPort++ = '\0';
if ( *sPort )
{
sIndex = strchr ( sPort, '/' );
if ( sIndex )
*sIndex++ = '\0';
else
sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
iPort = atoi(sPort);
if ( !iPort )
iPort = SPHINXAPI_DEFAULT_PORT;
}
} else
{
sIndex = strchr ( sHost, '/' );
if ( sIndex )
*sIndex++ = '\0';
else
sIndex = (char*) SPHINXAPI_DEFAULT_INDEX;
}
bOk = true;
break;
}
////////////
// sphinxql
////////////
if ( !strcmp ( sScheme, "sphinxql" ) )
{
bQL = true;
iPort = SPHINXQL_DEFAULT_PORT;
// handle port
char * sPort = strchr ( sHost, ':' );
sIndex = sHost; // starting point for index name search
if ( sPort )
{
*sPort++ = '\0';
sIndex = sPort;
iPort = atoi(sPort);
if ( !iPort )
{
bOk = false; // invalid port; can report ER_FOREIGN_DATA_STRING_INVALID
break;
}
}
// find index
sIndex = strchr ( sIndex, '/' );
if ( sIndex )
*sIndex++ = '\0';
// final checks
// host and index names are required
bOk = ( sHost && *sHost && sIndex && *sIndex );
break;
}
// unknown case
bOk = false;
break;
}
if ( !bOk )
{
my_error ( bCreate ? ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE : ER_FOREIGN_DATA_STRING_INVALID,
MYF(0), table->s->connect_string.str);
} else
{
if ( share )
{
SafeDeleteArray ( share->m_sScheme );
share->m_sScheme = sScheme;
share->m_sHost = sHost;
share->m_sIndex = sIndex;
share->m_iPort = (ushort)iPort;
share->m_bSphinxQL = bQL;
}
}
if ( !bOk && !share )
SafeDeleteArray ( sScheme );
SPH_RET(bOk);
}
// Example of simple lock controls. The "share" it creates is structure we will
// pass to each sphinx handler. Do you have to have one of these? Well, you have
// pieces that are used for locking, and they are needed to function.
static CSphSEShare * get_share ( const char * table_name, TABLE * table )
{
SPH_ENTER_FUNC();
pthread_mutex_lock ( &sphinx_mutex );
CSphSEShare * pShare = NULL;
for ( ;; )
{
// check if we already have this share
#if MYSQL_VERSION_ID>=50120
pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const uchar *) table_name, strlen(table_name) );
#else
#ifdef __WIN__
pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, (const byte *) table_name, strlen(table_name) );
#else
pShare = (CSphSEShare*) sphinx_hash_search ( &sphinx_open_tables, table_name, strlen(table_name) );
#endif // win
#endif // pre-5.1.20
if ( pShare )
{
pShare->m_iUseCount++;
break;
}
// try to allocate new share
pShare = new CSphSEShare ();
if ( !pShare )
break;
// try to setup it
if ( !ParseUrl ( pShare, table, false ) )
{
SafeDelete ( pShare );
break;
}
if ( !pShare->m_bSphinxQL )
pShare->m_pTableQueryCharset = table->field[2]->charset();
// try to hash it
pShare->m_iTableNameLen = strlen(table_name);
pShare->m_sTable = sphDup ( table_name );
if ( my_hash_insert ( &sphinx_open_tables, (const byte *)pShare ) )
{
SafeDelete ( pShare );
break;
}
// all seems fine
break;
}
pthread_mutex_unlock ( &sphinx_mutex );
SPH_RET(pShare);
}
// Free lock controls. We call this whenever we close a table. If the table had
// the last reference to the share then we free memory associated with it.
static int free_share ( CSphSEShare * pShare )
{
SPH_ENTER_FUNC();
pthread_mutex_lock ( &sphinx_mutex );
if ( !--pShare->m_iUseCount )
{
sphinx_hash_delete ( &sphinx_open_tables, (byte *)pShare );
SafeDelete ( pShare );
}
pthread_mutex_unlock ( &sphinx_mutex );
SPH_RET(0);
}
#if MYSQL_VERSION_ID>50100
static handler * sphinx_create_handler ( handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root )
{
sphinx_hton_ptr = hton;
return new ( mem_root ) ha_sphinx ( hton, table );
}
#endif
//////////////////////////////////////////////////////////////////////////////
// CLIENT-SIDE REQUEST STUFF
//////////////////////////////////////////////////////////////////////////////
CSphSEQuery::CSphSEQuery ( const char * sQuery, int iLength, const char * sIndex )
: m_sHost ( "" )
, m_iPort ( 0 )
, m_sIndex ( sIndex ? sIndex : "*" )
, m_iOffset ( 0 )
, m_iLimit ( 20 )
, m_bQuery ( false )
, m_sQuery ( "" )
, m_pWeights ( NULL )
, m_iWeights ( 0 )
, m_eMode ( SPH_MATCH_ALL )
, m_eRanker ( SPH_RANK_PROXIMITY_BM25 )
, m_sRankExpr ( NULL )
, m_eSort ( SPH_SORT_RELEVANCE )
, m_sSortBy ( "" )
, m_iMaxMatches ( 1000 )
, m_iMaxQueryTime ( 0 )
, m_iMinID ( 0 )
, m_iMaxID ( 0 )
, m_iFilters ( 0 )
, m_eGroupFunc ( SPH_GROUPBY_DAY )
, m_sGroupBy ( "" )
, m_sGroupSortBy ( "@group desc" )
, m_iCutoff ( 0 )
, m_iRetryCount ( 0 )
, m_iRetryDelay ( 0 )
, m_sGroupDistinct ( "" )
, m_iIndexWeights ( 0 )
, m_iFieldWeights ( 0 )
, m_bGeoAnchor ( false )
, m_sGeoLatAttr ( "" )
, m_sGeoLongAttr ( "" )
, m_fGeoLatitude ( 0.0f )
, m_fGeoLongitude ( 0.0f )
, m_sComment ( (char*) "" )
, m_sSelect ( (char*) "*" )
, m_pBuf ( NULL )
, m_pCur ( NULL )
, m_iBufLeft ( 0 )
, m_bBufOverrun ( false )
{
m_sQueryBuffer = new char [ iLength+2 ];
memcpy ( m_sQueryBuffer, sQuery, iLength );
m_sQueryBuffer[iLength] = ';';
m_sQueryBuffer[iLength+1] = '\0';
}
CSphSEQuery::~CSphSEQuery ()
{
SPH_ENTER_METHOD();
SafeDeleteArray ( m_sQueryBuffer );
SafeDeleteArray ( m_pWeights );
SafeDeleteArray ( m_pBuf );
for ( size_t i=0; i<m_dOverrides.elements(); i++ )
SafeDelete ( m_dOverrides.at(i) );
SPH_VOID_RET();
}
template < typename T >
int CSphSEQuery::ParseArray ( T ** ppValues, const char * sValue )
{
SPH_ENTER_METHOD();
assert ( ppValues );
assert ( !(*ppValues) );
const char * pValue;
bool bPrevDigit = false;
int iValues = 0;
// count the values
for ( pValue=sValue; *pValue; pValue++ )
{
bool bDigit = (*pValue)>='0' && (*pValue)<='9';
if ( bDigit && !bPrevDigit )
iValues++;
bPrevDigit = bDigit;
}
if ( !iValues )
SPH_RET(0);
// extract the values
T * pValues = new T [ iValues ];
*ppValues = pValues;
int iIndex = 0, iSign = 1;
T uValue = 0;
bPrevDigit = false;
for ( pValue=sValue ;; pValue++ )
{
bool bDigit = (*pValue)>='0' && (*pValue)<='9';
if ( bDigit )
{
if ( !bPrevDigit )
uValue = 0;
uValue = uValue*10 + ( (*pValue)-'0' );
} else if ( bPrevDigit )
{
assert ( iIndex<iValues );
pValues [ iIndex++ ] = uValue * iSign;
iSign = 1;
} else if ( *pValue=='-' )
iSign = -1;
bPrevDigit = bDigit;
if ( !*pValue )
break;
}
SPH_RET ( iValues );
}
static char * chop ( char * s )
{
while ( *s && isspace(*s) )
s++;
char * p = s + strlen(s);
while ( p>s && isspace ( p[-1] ) )
p--;
*p = '\0';
return s;
}
static bool myisattr ( char c )
{
return
( c>='0' && c<='9' ) ||
( c>='a' && c<='z' ) ||
( c>='A' && c<='Z' ) ||
c=='_';
}
static bool myismagic ( char c )
{
return c=='@';
}
static bool myisjson ( char c )
{
return
c=='.' ||
c=='[' ||
c==']';
}
bool CSphSEQuery::ParseField ( char * sField )
{
SPH_ENTER_METHOD();
// look for option name/value separator
char * sValue = strchr ( sField, '=' );
if ( !sValue || sValue==sField || sValue[-1]=='\\' )
{
// by default let's assume it's just query
if ( sField[0] )
{
if ( m_bQuery )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "search query already specified; '%s' is redundant", sField );
SPH_RET(false);
} else
{
m_sQuery = sField;
m_bQuery = true;
// unescape only 1st one
char *s = sField, *d = sField;
int iSlashes = 0;
while ( *s )
{
iSlashes = ( *s=='\\' ) ? iSlashes+1 : 0;
if ( ( iSlashes%2 )==0 ) *d++ = *s;
s++;
}
*d = '\0';
}
}
SPH_RET(true);
}
// split
*sValue++ = '\0';
sValue = chop ( sValue );
int iValue = atoi ( sValue );
// handle options
char * sName = chop ( sField );
if ( !strcmp ( sName, "query" ) ) m_sQuery = sValue;
else if ( !strcmp ( sName, "host" ) ) m_sHost = sValue;
else if ( !strcmp ( sName, "port" ) ) m_iPort = iValue;
else if ( !strcmp ( sName, "index" ) ) m_sIndex = sValue;
else if ( !strcmp ( sName, "offset" ) ) m_iOffset = iValue;
else if ( !strcmp ( sName, "limit" ) ) m_iLimit = iValue;
else if ( !strcmp ( sName, "weights" ) ) m_iWeights = ParseArray<uint32> ( &m_pWeights, sValue );
else if ( !strcmp ( sName, "minid" ) ) m_iMinID = iValue;
else if ( !strcmp ( sName, "maxid" ) ) m_iMaxID = iValue;
else if ( !strcmp ( sName, "maxmatches" ) ) m_iMaxMatches = iValue;
else if ( !strcmp ( sName, "maxquerytime" ) ) m_iMaxQueryTime = iValue;
else if ( !strcmp ( sName, "groupsort" ) ) m_sGroupSortBy = sValue;
else if ( !strcmp ( sName, "distinct" ) ) m_sGroupDistinct = sValue;
else if ( !strcmp ( sName, "cutoff" ) ) m_iCutoff = iValue;
else if ( !strcmp ( sName, "comment" ) ) m_sComment = sValue;
else if ( !strcmp ( sName, "select" ) ) m_sSelect = sValue;
else if ( !strcmp ( sName, "mode" ) )
{
m_eMode = SPH_MATCH_ALL;
if ( !strcmp ( sValue, "any" ) ) m_eMode = SPH_MATCH_ANY;
else if ( !strcmp ( sValue, "phrase" ) ) m_eMode = SPH_MATCH_PHRASE;
else if ( !strcmp ( sValue, "boolean" ) ) m_eMode = SPH_MATCH_BOOLEAN;
else if ( !strcmp ( sValue, "ext" ) ) m_eMode = SPH_MATCH_EXTENDED;
else if ( !strcmp ( sValue, "extended" ) ) m_eMode = SPH_MATCH_EXTENDED;
else if ( !strcmp ( sValue, "ext2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
else if ( !strcmp ( sValue, "extended2" ) ) m_eMode = SPH_MATCH_EXTENDED2;
else if ( !strcmp ( sValue, "all" ) ) m_eMode = SPH_MATCH_ALL;
else if ( !strcmp ( sValue, "fullscan" ) ) m_eMode = SPH_MATCH_FULLSCAN;
else
{
snprintf ( m_sParseError, sizeof(m_sParseError), "unknown matching mode '%s'", sValue );
SPH_RET(false);
}
} else if ( !strcmp ( sName, "ranker" ) )
{
m_eRanker = SPH_RANK_PROXIMITY_BM25;
if ( !strcmp ( sValue, "proximity_bm25" ) ) m_eRanker = SPH_RANK_PROXIMITY_BM25;
else if ( !strcmp ( sValue, "bm25" ) ) m_eRanker = SPH_RANK_BM25;
else if ( !strcmp ( sValue, "none" ) ) m_eRanker = SPH_RANK_NONE;
else if ( !strcmp ( sValue, "wordcount" ) ) m_eRanker = SPH_RANK_WORDCOUNT;
else if ( !strcmp ( sValue, "proximity" ) ) m_eRanker = SPH_RANK_PROXIMITY;
else if ( !strcmp ( sValue, "matchany" ) ) m_eRanker = SPH_RANK_MATCHANY;
else if ( !strcmp ( sValue, "fieldmask" ) ) m_eRanker = SPH_RANK_FIELDMASK;
else if ( !strcmp ( sValue, "sph04" ) ) m_eRanker = SPH_RANK_SPH04;
else if ( !strncmp ( sValue, "expr:", 5 ) )
{
m_eRanker = SPH_RANK_EXPR;
m_sRankExpr = sValue+5;
} else
{
snprintf ( m_sParseError, sizeof(m_sParseError), "unknown ranking mode '%s'", sValue );
SPH_RET(false);
}
} else if ( !strcmp ( sName, "sort" ) )
{
static const struct
{
const char * m_sName;
ESphSortOrder m_eSort;
} dSortModes[] =
{
{ "relevance", SPH_SORT_RELEVANCE },
{ "attr_desc:", SPH_SORT_ATTR_DESC },
{ "attr_asc:", SPH_SORT_ATTR_ASC },
{ "time_segments:", SPH_SORT_TIME_SEGMENTS },
{ "extended:", SPH_SORT_EXTENDED },
{ "expr:", SPH_SORT_EXPR }
};
int i;
const int nModes = sizeof(dSortModes)/sizeof(dSortModes[0]);
for ( i=0; i<nModes; i++ )
if ( !strncmp ( sValue, dSortModes[i].m_sName, strlen ( dSortModes[i].m_sName ) ) )
{
m_eSort = dSortModes[i].m_eSort;
m_sSortBy = sValue + strlen ( dSortModes[i].m_sName );
break;
}
if ( i==nModes )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "unknown sorting mode '%s'", sValue );
SPH_RET(false);
}
} else if ( !strcmp ( sName, "groupby" ) )
{
static const struct
{
const char * m_sName;
ESphGroupBy m_eFunc;
} dGroupModes[] =
{
{ "day:", SPH_GROUPBY_DAY },
{ "week:", SPH_GROUPBY_WEEK },
{ "month:", SPH_GROUPBY_MONTH },
{ "year:", SPH_GROUPBY_YEAR },
{ "attr:", SPH_GROUPBY_ATTR },
{ "multi:", SPH_GROUPBY_MULTIPLE }
};
int i;
const int nModes = sizeof(dGroupModes)/sizeof(dGroupModes[0]);
for ( i=0; i<nModes; i++ )
if ( !strncmp ( sValue, dGroupModes[i].m_sName, strlen ( dGroupModes[i].m_sName ) ) )
{
m_eGroupFunc = dGroupModes[i].m_eFunc;
m_sGroupBy = sValue + strlen ( dGroupModes[i].m_sName );
break;
}
if ( i==nModes )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "unknown groupby mode '%s'", sValue );
SPH_RET(false);
}
} else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
( !strcmp ( sName, "range" ) || !strcmp ( sName, "!range" ) || !strcmp ( sName, "floatrange" ) || !strcmp ( sName, "!floatrange" ) ) )
{
for ( ;; )
{
char * p = sName;
CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
tFilter.m_bExclude = ( *p=='!' ); if ( tFilter.m_bExclude ) p++;
tFilter.m_eType = ( *p=='f' ) ? SPH_FILTER_FLOATRANGE : SPH_FILTER_RANGE;
if (!( p = strchr ( sValue, ',' ) ))
break;
*p++ = '\0';
tFilter.m_sAttrName = chop ( sValue );
sValue = p;
if (!( p = strchr ( sValue, ',' ) ))
break;
*p++ = '\0';
if ( tFilter.m_eType==SPH_FILTER_RANGE )
{
tFilter.m_uMinValue = strtoll ( sValue, NULL, 10 );
tFilter.m_uMaxValue = strtoll ( p, NULL, 10 );
} else
{
tFilter.m_fMinValue = (float)atof(sValue);
tFilter.m_fMaxValue = (float)atof(p);
}
// all ok
m_iFilters++;
break;
}
} else if ( m_iFilters<SPHINXSE_MAX_FILTERS &&
( !strcmp ( sName, "filter" ) || !strcmp ( sName, "!filter" ) ) )
{
for ( ;; )
{
CSphSEFilter & tFilter = m_dFilters [ m_iFilters ];
tFilter.m_eType = SPH_FILTER_VALUES;
tFilter.m_bExclude = ( strcmp ( sName, "!filter" )==0 );
// get the attr name
while ( (*sValue) && !( myisattr(*sValue) || myismagic(*sValue) ) )
sValue++;
if ( !*sValue )
break;
tFilter.m_sAttrName = sValue;
while ( (*sValue) && ( myisattr(*sValue) || myismagic(*sValue) || myisjson(*sValue) ) )
sValue++;
if ( !*sValue )
break;
*sValue++ = '\0';
// get the values
tFilter.m_iValues = ParseArray<longlong> ( &tFilter.m_pValues, sValue );
if ( !tFilter.m_iValues )
{
assert ( !tFilter.m_pValues );
break;
}
// all ok
m_iFilters++;
break;
}
} else if ( !strcmp ( sName, "indexweights" ) || !strcmp ( sName, "fieldweights" ) )
{
bool bIndex = !strcmp ( sName, "indexweights" );
int * pCount = bIndex ? &m_iIndexWeights : &m_iFieldWeights;
char ** pNames = bIndex ? &m_sIndexWeight[0] : &m_sFieldWeight[0];
int * pWeights = bIndex ? &m_iIndexWeight[0] : &m_iFieldWeight[0];
*pCount = 0;
char * p = sValue;
while ( *p && *pCount<SPHINXSE_MAX_FILTERS )
{
// extract attr name
if ( !myisattr(*p) )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "%s: index name expected near '%s'", sName, p );
SPH_RET(false);
}
pNames[*pCount] = p;
while ( myisattr(*p) ) p++;
if ( *p!=',' )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
SPH_RET(false);
}
*p++ = '\0';
// extract attr value
char * sVal = p;
while ( isdigit(*p) ) p++;
if ( p==sVal )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "%s: integer weight expected near '%s'", sName, sVal );
SPH_RET(false);
}
pWeights[*pCount] = atoi(sVal);
(*pCount)++;
if ( !*p )
break;
if ( *p!=',' )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "%s: comma expected near '%s'", sName, p );
SPH_RET(false);
}
p++;
}
} else if ( !strcmp ( sName, "geoanchor" ) )
{
m_bGeoAnchor = false;
for ( ;; )
{
char * sLat = sValue;
char * p = sValue;
if (!( p = strchr ( p, ',' ) )) break;
*p++ = '\0';
char * sLong = p;
if (!( p = strchr ( p, ',' ) )) break;
*p++ = '\0';
char * sLatVal = p;
if (!( p = strchr ( p, ',' ) )) break;
*p++ = '\0';
char * sLongVal = p;
m_sGeoLatAttr = chop(sLat);
m_sGeoLongAttr = chop(sLong);
m_fGeoLatitude = (float)atof ( sLatVal );
m_fGeoLongitude = (float)atof ( sLongVal );
m_bGeoAnchor = true;
break;
}
if ( !m_bGeoAnchor )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "geoanchor: parse error, not enough comma-separated arguments" );
SPH_RET(false);
}
} else if ( !strcmp ( sName, "override" ) ) // name,type,id:value,id:value,...
{
sName = NULL;
int iType = 0;
CSphSEQuery::Override_t * pOverride = NULL;
// get name and type
char * sRest = sValue;
for ( ;; )
{
sName = sRest;
if ( !*sName )
break;
if (!( sRest = strchr ( sRest, ',' ) ))
break;
*sRest++ = '\0';
char * sType = sRest;
if (!( sRest = strchr ( sRest, ',' ) ))
break;
static const struct
{
const char * m_sName;
int m_iType;
}
dAttrTypes[] =
{
{ "int", SPH_ATTR_INTEGER },
{ "timestamp", SPH_ATTR_TIMESTAMP },
{ "bool", SPH_ATTR_BOOL },
{ "float", SPH_ATTR_FLOAT },
{ "bigint", SPH_ATTR_BIGINT }
};
for ( uint i=0; i<sizeof(dAttrTypes)/sizeof(*dAttrTypes); i++ )
if ( !strncmp ( sType, dAttrTypes[i].m_sName, sRest - sType ) )
{
iType = dAttrTypes[i].m_iType;
break;
}
break;
}
// fail
if ( !sName || !*sName || !iType )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "override: malformed query" );
SPH_RET(false);
}
// grab id:value pairs
sRest++;
while ( sRest )
{
char * sId = sRest;
if (!( sRest = strchr ( sRest, ':' ) )) break;
*sRest++ = '\0';
if (!( sRest - sId )) break;
sValue = sRest;
if ( ( sRest = strchr ( sRest, ',' ) )!=NULL )
*sRest++ = '\0';
if ( !*sValue )
break;
if ( !pOverride )
{
pOverride = new CSphSEQuery::Override_t;
pOverride->m_sName = chop(sName);
pOverride->m_iType = iType;
m_dOverrides.append ( pOverride );
}
ulonglong uId = strtoull ( sId, NULL, 10 );
CSphSEQuery::Override_t::Value_t tValue;
if ( iType==SPH_ATTR_FLOAT )
tValue.m_fValue = (float)atof(sValue);
else if ( iType==SPH_ATTR_BIGINT )
tValue.m_iValue64 = strtoll ( sValue, NULL, 10 );
else
tValue.m_uValue = (uint32)strtoul ( sValue, NULL, 10 );
pOverride->m_dIds.append ( uId );
pOverride->m_dValues.append ( tValue );
}
if ( !pOverride )
{
snprintf ( m_sParseError, sizeof(m_sParseError), "override: id:value mapping expected" );
SPH_RET(false);
}
SPH_RET(true);
} else
{
snprintf ( m_sParseError, sizeof(m_sParseError), "unknown parameter '%s'", sName );
SPH_RET(false);
}
// !COMMIT handle syntax errors
SPH_RET(true);
}
bool CSphSEQuery::Parse ()
{
SPH_ENTER_METHOD();
SPH_DEBUG ( "query [[ %s ]]", m_sQueryBuffer );
m_bQuery = false;
char * pCur = m_sQueryBuffer;
char * pNext = pCur;
while ( ( pNext = strchr ( pNext, ';' ) )!=NULL )
{
// handle escaped semicolons
if ( pNext>m_sQueryBuffer && pNext[-1]=='\\' && pNext[1]!='\0' )
{
pNext++;
continue;
}
// handle semicolon-separated clauses
*pNext++ = '\0';
if ( !ParseField ( pCur ) )
SPH_RET(false);
pCur = pNext;
}
SPH_DEBUG ( "q [[ %s ]]", m_sQuery );
SPH_RET(true);
}
void CSphSEQuery::SendBytes ( const void * pBytes, int iBytes )
{
SPH_ENTER_METHOD();
if ( m_iBufLeft<iBytes )
{
m_bBufOverrun = true;
SPH_VOID_RET();
}
memcpy ( m_pCur, pBytes, iBytes );
m_pCur += iBytes;
m_iBufLeft -= iBytes;
SPH_VOID_RET();
}
int CSphSEQuery::BuildRequest ( char ** ppBuffer )
{
SPH_ENTER_METHOD();
// calc request length
int iReqSize = 128 + 4*m_iWeights
+ strlen ( m_sSortBy )
+ strlen ( m_sQuery )
+ strlen ( m_sIndex )
+ strlen ( m_sGroupBy )
+ strlen ( m_sGroupSortBy )
+ strlen ( m_sGroupDistinct )
+ strlen ( m_sComment )
+ strlen ( m_sSelect );
if ( m_eRanker==SPH_RANK_EXPR )
iReqSize += 4 + strlen(m_sRankExpr);
for ( int i=0; i<m_iFilters; i++ )
{
const CSphSEFilter & tFilter = m_dFilters[i];
iReqSize += 12 + strlen ( tFilter.m_sAttrName ); // string attr-name; int type; int exclude-flag
switch ( tFilter.m_eType )
{
case SPH_FILTER_VALUES: iReqSize += 4 + 8*tFilter.m_iValues; break;
case SPH_FILTER_RANGE: iReqSize += 16; break;
case SPH_FILTER_FLOATRANGE: iReqSize += 8; break;
}
}
if ( m_bGeoAnchor ) // 1.14+
iReqSize += 16 + strlen ( m_sGeoLatAttr ) + strlen ( m_sGeoLongAttr );
for ( int i=0; i<m_iIndexWeights; i++ ) // 1.15+
iReqSize += 8 + strlen(m_sIndexWeight[i] );
for ( int i=0; i<m_iFieldWeights; i++ ) // 1.18+
iReqSize += 8 + strlen(m_sFieldWeight[i] );
// overrides
iReqSize += 4;
for ( size_t i=0; i<m_dOverrides.elements(); i++ )
{
CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
const uint32 uSize = pOverride->m_iType==SPH_ATTR_BIGINT ? 16 : 12; // id64 + value
iReqSize += strlen ( pOverride->m_sName ) + 12 + uSize*pOverride->m_dIds.elements();
}
// select
iReqSize += 4;
m_iBufLeft = 0;
SafeDeleteArray ( m_pBuf );
m_pBuf = new char [ iReqSize ];
if ( !m_pBuf )
SPH_RET(-1);
m_pCur = m_pBuf;
m_iBufLeft = iReqSize;
m_bBufOverrun = false;
(*ppBuffer) = m_pBuf;
// build request
SendWord ( SEARCHD_COMMAND_SEARCH ); // command id
SendWord ( VER_COMMAND_SEARCH ); // command version
SendInt ( iReqSize-8 ); // packet body length
SendInt ( 0 ); // its a client
SendInt ( 1 ); // number of queries
SendInt ( m_iOffset );
SendInt ( m_iLimit );
SendInt ( m_eMode );
SendInt ( m_eRanker ); // 1.16+
if ( m_eRanker==SPH_RANK_EXPR )
SendString ( m_sRankExpr );
SendInt ( m_eSort );
SendString ( m_sSortBy ); // sort attr
SendString ( m_sQuery ); // query
SendInt ( m_iWeights );
for ( int j=0; j<m_iWeights; j++ )
SendInt ( m_pWeights[j] ); // weights
SendString ( m_sIndex ); // indexes
SendInt ( 1 ); // id64 range follows
SendUint64 ( m_iMinID ); // id/ts ranges
SendUint64 ( m_iMaxID );
SendInt ( m_iFilters );
for ( int j=0; j<m_iFilters; j++ )
{
const CSphSEFilter & tFilter = m_dFilters[j];
SendString ( tFilter.m_sAttrName );
SendInt ( tFilter.m_eType );
switch ( tFilter.m_eType )
{
case SPH_FILTER_VALUES:
SendInt ( tFilter.m_iValues );
for ( int k=0; k<tFilter.m_iValues; k++ )
SendUint64 ( tFilter.m_pValues[k] );
break;
case SPH_FILTER_RANGE:
SendUint64 ( tFilter.m_uMinValue );
SendUint64 ( tFilter.m_uMaxValue );
break;
case SPH_FILTER_FLOATRANGE:
SendFloat ( tFilter.m_fMinValue );
SendFloat ( tFilter.m_fMaxValue );
break;
}
SendInt ( tFilter.m_bExclude );
}
SendInt ( m_eGroupFunc );
SendString ( m_sGroupBy );
SendInt ( m_iMaxMatches );
SendString ( m_sGroupSortBy );
SendInt ( m_iCutoff ); // 1.9+
SendInt ( m_iRetryCount ); // 1.10+
SendInt ( m_iRetryDelay );
SendString ( m_sGroupDistinct ); // 1.11+
SendInt ( m_bGeoAnchor ); // 1.14+
if ( m_bGeoAnchor )
{
SendString ( m_sGeoLatAttr );
SendString ( m_sGeoLongAttr );
SendFloat ( m_fGeoLatitude );
SendFloat ( m_fGeoLongitude );
}
SendInt ( m_iIndexWeights ); // 1.15+
for ( int i=0; i<m_iIndexWeights; i++ )
{
SendString ( m_sIndexWeight[i] );
SendInt ( m_iIndexWeight[i] );
}
SendInt ( m_iMaxQueryTime ); // 1.17+
SendInt ( m_iFieldWeights ); // 1.18+
for ( int i=0; i<m_iFieldWeights; i++ )
{
SendString ( m_sFieldWeight[i] );
SendInt ( m_iFieldWeight[i] );
}
SendString ( m_sComment );
// overrides
SendInt ( m_dOverrides.elements() );
for ( size_t i=0; i<m_dOverrides.elements(); i++ )
{
CSphSEQuery::Override_t * pOverride = m_dOverrides.at(i);
SendString ( pOverride->m_sName );
SendDword ( pOverride->m_iType );
SendInt ( pOverride->m_dIds.elements() );
for ( size_t j=0; j<pOverride->m_dIds.elements(); j++ )
{
SendUint64 ( pOverride->m_dIds.at(j) );
if ( pOverride->m_iType==SPH_ATTR_FLOAT )
SendFloat ( pOverride->m_dValues.at(j).m_fValue );
else if ( pOverride->m_iType==SPH_ATTR_BIGINT )
SendUint64 ( pOverride->m_dValues.at(j).m_iValue64 );
else
SendDword ( pOverride->m_dValues.at(j).m_uValue );
}
}
// select
SendString ( m_sSelect );
// detect buffer overruns and underruns, and report internal error
if ( m_bBufOverrun || m_iBufLeft!=0 || m_pCur-m_pBuf!=iReqSize )
SPH_RET(-1);
// all fine
SPH_RET ( iReqSize );
}
//////////////////////////////////////////////////////////////////////////////
// SPHINX HANDLER
//////////////////////////////////////////////////////////////////////////////
#if MYSQL_VERSION_ID<50100
ha_sphinx::ha_sphinx ( TABLE_ARG * table )
: handler ( &sphinx_hton, table )
#else
ha_sphinx::ha_sphinx ( handlerton * hton, TABLE_ARG * table )
: handler ( hton, table )
#endif
, m_pShare ( NULL )
, m_iMatchesTotal ( 0 )
, m_iCurrentPos ( 0 )
, m_pCurrentKey ( NULL )
, m_iCurrentKeyLen ( 0 )
, m_pResponse ( NULL )
, m_pResponseEnd ( NULL )
, m_pCur ( NULL )
, m_bUnpackError ( false )
, m_iFields ( 0 )
, m_dFields ( NULL )
, m_iAttrs ( 0 )
, m_dAttrs ( NULL )
, m_bId64 ( 0 )
, m_dUnboundFields ( NULL )
{
SPH_ENTER_METHOD();
SPH_VOID_RET();
}
ha_sphinx::~ha_sphinx()
{
SafeDeleteArray ( m_dAttrs );
SafeDeleteArray ( m_dUnboundFields );
if ( m_dFields )
{
for (uint32 i=0; i< m_iFields; i++ )
SafeDeleteArray ( m_dFields[i] );
delete [] m_dFields;
}
}
// Used for opening tables. The name will be the name of the file.
// A table is opened when it needs to be opened. For instance
// when a request comes in for a select on the table (tables are not
// open and closed for each request, they are cached).
//
// Called from handler.cc by handler::ha_open(). The server opens all tables by
// calling ha_open() which then calls the handler specific open().
int ha_sphinx::open ( const char * name, int, uint )
{
SPH_ENTER_METHOD();
m_pShare = get_share ( name, table );
if ( !m_pShare )
SPH_RET(1);
thr_lock_data_init ( &m_pShare->m_tLock, &m_tLock, NULL );
#if MYSQL_VERSION_ID>50100
*thd_ha_data ( table->in_use, ht ) = NULL;
#else
table->in_use->ha_data [ sphinx_hton.slot ] = NULL;
#endif
SPH_RET(0);
}
int ha_sphinx::Connect ( const char * sHost, ushort uPort )
{
struct sockaddr_in sin;
#ifndef __WIN__
struct sockaddr_un saun;
#endif
int iDomain = 0;
int iSockaddrSize = 0;
struct sockaddr * pSockaddr = NULL;
in_addr_t ip_addr;
if ( uPort )
{
iDomain = AF_INET;
iSockaddrSize = sizeof(sin);
pSockaddr = (struct sockaddr *) &sin;
memset ( &sin, 0, sizeof(sin) );
sin.sin_family = AF_INET;
sin.sin_port = htons(uPort);
// prepare host address
if ( (int)( ip_addr = inet_addr(sHost) )!=(int)INADDR_NONE )
{
memcpy ( &sin.sin_addr, &ip_addr, sizeof(ip_addr) );
} else
{
int tmp_errno;
bool bError = false;
#if MYSQL_VERSION_ID>=50515
struct addrinfo *hp = NULL;
tmp_errno = getaddrinfo ( sHost, NULL, NULL, &hp );
if ( tmp_errno || !hp || !hp->ai_addr )
{
bError = true;
if ( hp )
freeaddrinfo ( hp );
}
#else
struct hostent tmp_hostent, *hp;
char buff2 [ GETHOSTBYNAME_BUFF_SIZE ];
hp = my_gethostbyname_r ( sHost, &tmp_hostent, buff2, sizeof(buff2), &tmp_errno );
if ( !hp )
{
my_gethostbyname_r_free();
bError = true;
}
#endif
if ( bError )
{
char sError[256];
my_snprintf ( sError, sizeof(sError), "failed to resolve searchd host (name=%s)", sHost );
my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
SPH_RET(-1);
}
#if MYSQL_VERSION_ID>=50515
struct sockaddr_in *in = (sockaddr_in *)hp->ai_addr;
memcpy ( &sin.sin_addr, &in->sin_addr, Min ( sizeof(sin.sin_addr), sizeof(in->sin_addr) ) );
freeaddrinfo ( hp );
#else
memcpy ( &sin.sin_addr, hp->h_addr, Min ( sizeof(sin.sin_addr), (size_t)hp->h_length ) );
my_gethostbyname_r_free();
#endif
}
} else
{
#ifndef __WIN__
iDomain = AF_UNIX;
iSockaddrSize = sizeof(saun);
pSockaddr = (struct sockaddr *) &saun;
memset ( &saun, 0, sizeof(saun) );
saun.sun_family = AF_UNIX;
strncpy ( saun.sun_path, sHost, sizeof(saun.sun_path)-1 );
#else
my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "UNIX sockets are not supported on Windows" );
SPH_RET(-1);
#endif
}
char sError[512];
int iSocket = (int) socket ( iDomain, SOCK_STREAM, 0 );
if ( iSocket<0 )
{
my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "failed to create client socket" );
SPH_RET(-1);
}
if ( connect ( iSocket, pSockaddr, iSockaddrSize )<0 )
{
sphSockClose ( iSocket );
my_snprintf ( sError, sizeof(sError), "failed to connect to searchd (host=%s, errno=%d, port=%d)",
sHost, errno, (int)uPort );
my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
SPH_RET(-1);
}
return iSocket;
}
int ha_sphinx::ConnectAPI ( const char * sQueryHost, int iQueryPort )
{
SPH_ENTER_METHOD();
const char * sHost = ( sQueryHost && *sQueryHost ) ? sQueryHost : m_pShare->m_sHost;
ushort uPort = iQueryPort ? (ushort)iQueryPort : m_pShare->m_iPort;
int iSocket = Connect ( sHost, uPort );
if ( iSocket<0 )
SPH_RET ( iSocket );
char sError[512];
int version;
if ( ::recv ( iSocket, (char *)&version, sizeof(version), 0 )!=sizeof(version) )
{
sphSockClose ( iSocket );
my_snprintf ( sError, sizeof(sError), "failed to receive searchd version (host=%s, port=%d)",
sHost, (int)uPort );
my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
SPH_RET(-1);
}
uint uClientVersion = htonl ( SPHINX_SEARCHD_PROTO );
if ( ::send ( iSocket, (char*)&uClientVersion, sizeof(uClientVersion), 0 )!=sizeof(uClientVersion) )
{
sphSockClose ( iSocket );
my_snprintf ( sError, sizeof(sError), "failed to send client version (host=%s, port=%d)",
sHost, (int)uPort );
my_error ( ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), sError );
SPH_RET(-1);
}
SPH_RET ( iSocket );
}
// Closes a table. We call the free_share() function to free any resources
// that we have allocated in the "shared" structure.
//
// Called from sql_base.cc, sql_select.cc, and table.cc.
// In sql_select.cc it is only used to close up temporary tables or during
// the process where a temporary table is converted over to being a
// myisam table.
// For sql_base.cc look at close_data_tables().
int ha_sphinx::close()
{
SPH_ENTER_METHOD();
SPH_RET ( free_share ( m_pShare ) );
}
int ha_sphinx::HandleMysqlError ( MYSQL * pConn, int iErrCode )
{
CSphSEThreadTable * pTable = GetTls ();
if ( pTable )
{
strncpy ( pTable->m_tStats.m_sLastMessage, mysql_error ( pConn ), sizeof pTable->m_tStats.m_sLastMessage - 1 );
pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
pTable->m_tStats.m_bLastError = true;
}
mysql_close ( pConn );
my_error ( iErrCode, MYF(0), pTable->m_tStats.m_sLastMessage );
return -1;
}
int ha_sphinx::extra ( enum ha_extra_function op )
{
CSphSEThreadTable * pTable = GetTls();
if ( pTable )
{
if ( op==HA_EXTRA_WRITE_CAN_REPLACE )
pTable->m_bReplace = true;
else if ( op==HA_EXTRA_WRITE_CANNOT_REPLACE )
pTable->m_bReplace = false;
}
return 0;
}
int ha_sphinx::write_row ( byte * )
{
SPH_ENTER_METHOD();
if ( !m_pShare || !m_pShare->m_bSphinxQL )
SPH_RET ( HA_ERR_WRONG_COMMAND );
// SphinxQL inserts only, pretty much similar to abandoned federated
char sQueryBuf[1024];
char sValueBuf[1024];
String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
String sValue ( sValueBuf, sizeof(sQueryBuf), &my_charset_bin );
sQuery.length ( 0 );
sValue.length ( 0 );
CSphSEThreadTable * pTable = GetTls ();
sQuery.append ( pTable && pTable->m_bReplace ? "REPLACE INTO " : "INSERT INTO " );
sQuery.append ( m_pShare->m_sIndex );
sQuery.append ( " (" );
for ( Field ** ppField = table->field; *ppField; ppField++ )
{
sQuery.append ( (*ppField)->field_name.str );
if ( ppField[1] )
sQuery.append ( ", " );
}
sQuery.append ( ") VALUES (" );
for ( Field ** ppField = table->field; *ppField; ppField++ )
{
if ( (*ppField)->is_null() )
{
sQuery.append ( "''" );
} else
{
THD *thd= ha_thd();
if ( (*ppField)->type()==MYSQL_TYPE_TIMESTAMP )
{
Item_field * pWrap = new (thd->mem_root) Item_field(thd, *ppField); // autofreed by query arena, I assume
Item_func_unix_timestamp * pConv = new (thd->mem_root) Item_func_unix_timestamp(thd, pWrap);
pConv->quick_fix_field();
unsigned int uTs = (unsigned int) pConv->val_int();
snprintf ( sValueBuf, sizeof(sValueBuf), "'%u'", uTs );
sQuery.append ( sValueBuf );
} else
{
(*ppField)->val_str ( &sValue );
sQuery.append ( "'" );
sValue.print ( &sQuery );
sQuery.append ( "'" );
sValue.length(0);
}
}
if ( ppField[1] )
sQuery.append ( ", " );
}
sQuery.append ( ")" );
// FIXME? pretty inefficient to reconnect every time under high load,
// but this was intentionally written for a low load scenario..
MYSQL * pConn = mysql_init ( NULL );
if ( !pConn )
SPH_RET ( ER_OUT_OF_RESOURCES );
unsigned int uTimeout = 1;
mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
my_bool my_true= 1;
mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
// all ok!
mysql_close ( pConn );
SPH_RET(0);
}
static inline bool IsIntegerFieldType ( enum_field_types eType )
{
return eType==MYSQL_TYPE_LONG || eType==MYSQL_TYPE_LONGLONG;
}
static inline bool IsIDField ( Field * pField )
{
enum_field_types eType = pField->type();
if ( eType==MYSQL_TYPE_LONGLONG )
return true;
if ( eType==MYSQL_TYPE_LONG && ((Field_num*)pField)->unsigned_flag )
return true;
return false;
}
int ha_sphinx::delete_row ( const byte * )
{
SPH_ENTER_METHOD();
if ( !m_pShare || !m_pShare->m_bSphinxQL )
SPH_RET ( HA_ERR_WRONG_COMMAND );
char sQueryBuf[1024];
String sQuery ( sQueryBuf, sizeof(sQueryBuf), &my_charset_bin );
sQuery.length ( 0 );
sQuery.append ( "DELETE FROM " );
sQuery.append ( m_pShare->m_sIndex );
sQuery.append ( " WHERE id=" );
char sValue[32];
snprintf ( sValue, sizeof(sValue), "%lld", table->field[0]->val_int() );
sQuery.append ( sValue );
// FIXME? pretty inefficient to reconnect every time under high load,
// but this was intentionally written for a low load scenario..
MYSQL * pConn = mysql_init ( NULL );
if ( !pConn )
SPH_RET ( ER_OUT_OF_RESOURCES );
unsigned int uTimeout = 1;
mysql_options ( pConn, MYSQL_OPT_CONNECT_TIMEOUT, (const char*)&uTimeout );
my_bool my_true= 1;
mysql_options(pConn, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, (char*) &my_true);
if ( !mysql_real_connect ( pConn, m_pShare->m_sHost, "root", "", "", m_pShare->m_iPort, m_pShare->m_sSocket, 0 ) )
SPH_RET ( HandleMysqlError ( pConn, ER_CONNECT_TO_FOREIGN_DATA_SOURCE ) );
if ( mysql_real_query ( pConn, sQuery.ptr(), sQuery.length() ) )
SPH_RET ( HandleMysqlError ( pConn, ER_QUERY_ON_FOREIGN_DATA_SOURCE ) );
// all ok!
mysql_close ( pConn );
SPH_RET(0);
}
int ha_sphinx::update_row ( const byte *, const byte * )
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_WRONG_COMMAND );
}
// keynr is key (index) number
// sorted is 1 if result MUST be sorted according to index
int ha_sphinx::index_init ( uint keynr, bool )
{
SPH_ENTER_METHOD();
active_index = keynr;
CSphSEThreadTable * pTable = GetTls();
if ( pTable )
pTable->m_bCondDone = false;
SPH_RET(0);
}
int ha_sphinx::index_end()
{
SPH_ENTER_METHOD();
SPH_RET(0);
}
bool ha_sphinx::CheckResponcePtr ( int iLen )
{
if ( m_pCur+iLen>m_pResponseEnd )
{
m_pCur = m_pResponseEnd;
m_bUnpackError = true;
return false;
}
return true;
}
uint32 ha_sphinx::UnpackDword ()
{
if ( !CheckResponcePtr ( sizeof(uint32) ) ) // NOLINT
{
return 0;
}
uint32 uRes = ntohl ( sphUnalignedRead ( *(uint32*)m_pCur ) );
m_pCur += sizeof(uint32); // NOLINT
return uRes;
}
char * ha_sphinx::UnpackString ()
{
uint32 iLen = UnpackDword ();
if ( !iLen )
return NULL;
if ( !CheckResponcePtr ( iLen ) )
{
return NULL;
}
char * sRes = new char [ 1+iLen ];
memcpy ( sRes, m_pCur, iLen );
sRes[iLen] = '\0';
m_pCur += iLen;
return sRes;
}
bool ha_sphinx::UnpackSchema ()
{
SPH_ENTER_METHOD();
// cleanup
if ( m_dFields )
for ( int i=0; i<(int)m_iFields; i++ )
SafeDeleteArray ( m_dFields[i] );
SafeDeleteArray ( m_dFields );
// unpack network packet
uint32 uStatus = UnpackDword ();
char * sMessage = NULL;
if ( uStatus!=SEARCHD_OK )
{
sMessage = UnpackString ();
CSphSEThreadTable * pTable = GetTls ();
if ( pTable )
{
strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
pTable->m_tStats.m_bLastError = ( uStatus==SEARCHD_ERROR );
}
if ( uStatus==SEARCHD_ERROR )
{
char sError[1024];
my_snprintf ( sError, sizeof(sError), "searchd error: %s", sMessage );
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
SafeDeleteArray ( sMessage );
SPH_RET ( false );
}
}
m_iFields = UnpackDword ();
m_dFields = new char * [ m_iFields ];
if ( !m_dFields )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (fields alloc error)" );
SPH_RET(false);
}
for ( uint32 i=0; i<m_iFields; i++ )
m_dFields[i] = UnpackString ();
SafeDeleteArray ( m_dAttrs );
m_iAttrs = UnpackDword ();
m_dAttrs = new CSphSEAttr [ m_iAttrs ];
if ( !m_dAttrs )
{
for ( int i=0; i<(int)m_iFields; i++ )
SafeDeleteArray ( m_dFields[i] );
SafeDeleteArray ( m_dFields );
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (attrs alloc error)" );
SPH_RET(false);
}
for ( uint32 i=0; i<m_iAttrs; i++ )
{
m_dAttrs[i].m_sName = UnpackString ();
m_dAttrs[i].m_uType = UnpackDword ();
if ( m_bUnpackError ) // m_sName may be null
break;
m_dAttrs[i].m_iField = -1;
for ( int j=SPHINXSE_SYSTEM_COLUMNS; j<m_pShare->m_iTableFields; j++ )
{
const char * sTableField = m_pShare->m_sTableField[j];
const char * sAttrField = m_dAttrs[i].m_sName;
if ( m_dAttrs[i].m_sName[0]=='@' )
{
const char * sAtPrefix = "_sph_";
if ( strncmp ( sTableField, sAtPrefix, strlen(sAtPrefix) ) )
continue;
sTableField += strlen(sAtPrefix);
sAttrField++;
}
if ( !strcasecmp ( sAttrField, sTableField ) )
{
// we're almost good, but
// let's enforce that timestamp columns can only receive timestamp attributes
if ( m_pShare->m_eTableFieldType[j]!=MYSQL_TYPE_TIMESTAMP || m_dAttrs[i].m_uType==SPH_ATTR_TIMESTAMP )
m_dAttrs[i].m_iField = j;
break;
}
}
}
m_iMatchesTotal = UnpackDword ();
m_bId64 = UnpackDword ();
if ( m_bId64 && m_pShare->m_eTableFieldType[0]!=MYSQL_TYPE_LONGLONG )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: 1st column must be bigint to accept 64-bit DOCIDs" );
SPH_RET(false);
}
// network packet unpacked; build unbound fields map
SafeDeleteArray ( m_dUnboundFields );
m_dUnboundFields = new int [ m_pShare->m_iTableFields ];
for ( int i=0; i<m_pShare->m_iTableFields; i++ )
{
if ( i<SPHINXSE_SYSTEM_COLUMNS )
m_dUnboundFields[i] = SPH_ATTR_NONE;
else if ( m_pShare->m_eTableFieldType[i]==MYSQL_TYPE_TIMESTAMP )
m_dUnboundFields[i] = SPH_ATTR_TIMESTAMP;
else
m_dUnboundFields[i] = SPH_ATTR_INTEGER;
}
for ( uint32 i=0; i<m_iAttrs; i++ )
if ( m_dAttrs[i].m_iField>=0 )
m_dUnboundFields [ m_dAttrs[i].m_iField ] = SPH_ATTR_NONE;
if ( m_bUnpackError )
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackSchema() failed (unpack error)" );
SPH_RET ( !m_bUnpackError );
}
bool ha_sphinx::UnpackStats ( CSphSEStats * pStats )
{
assert ( pStats );
char * pCurSave = m_pCur;
for ( uint m=0; m<m_iMatchesTotal && m_pCur<m_pResponseEnd-sizeof(uint32); m++ ) // NOLINT
{
m_pCur += m_bId64 ? 12 : 8; // skip id+weight
for ( uint32 i=0; i<m_iAttrs && m_pCur<m_pResponseEnd-sizeof(uint32); i++ ) // NOLINT
{
if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
{
// skip MVA list
uint32 uCount = UnpackDword ();
m_pCur += uCount*4;
} else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING )
{
uint32 iLen = UnpackDword();
m_pCur += iLen;
} else // skip normal value
m_pCur += m_dAttrs[i].m_uType==SPH_ATTR_BIGINT ? 8 : 4;
}
}
pStats->m_iMatchesTotal = UnpackDword ();
pStats->m_iMatchesFound = UnpackDword ();
pStats->m_iQueryMsec = UnpackDword ();
pStats->m_iWords = UnpackDword ();
if ( m_bUnpackError )
return false;
if ( pStats->m_iWords<0 || pStats->m_iWords>=SPHINXSE_MAX_KEYWORDSTATS )
return false;
SafeDeleteArray ( pStats->m_dWords );
pStats->m_dWords = new CSphSEWordStats [ pStats->m_iWords ];
if ( !pStats->m_dWords )
return false;
for ( int i=0; i<pStats->m_iWords; i++ )
{
CSphSEWordStats & tWord = pStats->m_dWords[i];
tWord.m_sWord = UnpackString ();
tWord.m_iDocs = UnpackDword ();
tWord.m_iHits = UnpackDword ();
}
if ( m_bUnpackError )
return false;
m_pCur = pCurSave;
return true;
}
/// condition pushdown implementation, to properly intercept WHERE clauses on my columns
#if MYSQL_VERSION_ID<50610
const COND * ha_sphinx::cond_push ( const COND * cond )
#else
const Item * ha_sphinx::cond_push ( const Item *cond )
#endif
{
// catch the simplest case: query_column="some text"
for ( ;; )
{
if ( cond->type()!=Item::FUNC_ITEM )
break;
Item_func * condf = (Item_func *)cond;
if ( condf->functype()!=Item_func::EQ_FUNC || condf->argument_count()!=2 )
break;
// get my tls
CSphSEThreadTable * pTable = GetTls ();
if ( !pTable )
break;
Item ** args = condf->arguments();
if ( !m_pShare->m_bSphinxQL )
{
// on non-QL tables, intercept query=value condition for SELECT
if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::STRING_ITEM ))
break;
Item_field * pField = (Item_field *) args[0];
if ( pField->field->field_index!=2 ) // FIXME! magic key index
break;
// copy the query, and let know that we intercepted this condition
String *pString= args[1]->val_str(NULL);
pTable->m_bQuery = true;
strncpy ( pTable->m_sQuery, pString->c_ptr(), sizeof(pTable->m_sQuery) );
pTable->m_sQuery[sizeof(pTable->m_sQuery)-1] = '\0';
pTable->m_pQueryCharset = pString->charset();
} else
{
if (!( args[0]->type()==Item::FIELD_ITEM && args[1]->type()==Item::INT_ITEM ))
break;
// on QL tables, intercept id=value condition for DELETE
Item_field * pField = (Item_field *) args[0];
if ( pField->field->field_index!=0 ) // FIXME! magic key index
break;
Item_int * pVal = (Item_int *) args[1];
pTable->m_iCondId = pVal->val_int();
pTable->m_bCondId = true;
}
// we intercepted this condition
return NULL;
}
// don't change anything
return cond;
}
/// condition popup
void ha_sphinx::cond_pop ()
{
CSphSEThreadTable * pTable = GetTls ();
if ( pTable )
pTable->m_bQuery = false;
}
/// get TLS (maybe allocate it, too)
CSphSEThreadTable * ha_sphinx::GetTls()
{
SPH_ENTER_METHOD()
// where do we store that pointer in today's version?
CSphTLS ** ppTls;
#if MYSQL_VERSION_ID>50100
ppTls = (CSphTLS**) thd_ha_data ( table->in_use, ht );
#else
ppTls = (CSphTLS**) &current_thd->ha_data[sphinx_hton.slot];
#endif // >50100
CSphSEThreadTable * pTable = NULL;
// allocate if needed
if ( !*ppTls )
{
*ppTls = new CSphTLS ( this );
pTable = (*ppTls)->m_pHeadTable;
} else
{
pTable = (*ppTls)->m_pHeadTable;
}
while ( pTable && pTable->m_pHandler!=this )
pTable = pTable->m_pTableNext;
if ( !pTable )
{
pTable = new CSphSEThreadTable ( this );
pTable->m_pTableNext = (*ppTls)->m_pHeadTable;
(*ppTls)->m_pHeadTable = pTable;
}
// errors will be handled by caller
return pTable;
}
// Positions an index cursor to the index specified in the handle. Fetches the
// row if available. If the key value is null, begin at the first key of the
// index.
int ha_sphinx::index_read ( byte * buf, const byte * key, uint key_len, enum ha_rkey_function )
{
SPH_ENTER_METHOD();
char sError[256];
// set new data for thd->ha_data, it is used in show_status
CSphSEThreadTable * pTable = GetTls();
if ( !pTable )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: TLS malloc() failed" );
SPH_RET ( HA_ERR_END_OF_FILE );
}
pTable->m_tStats.Reset ();
// sphinxql table, just return the key once
if ( m_pShare->m_bSphinxQL )
{
// over and out
if ( pTable->m_bCondDone )
SPH_RET ( HA_ERR_END_OF_FILE );
// return a value from pushdown, if any
if ( pTable->m_bCondId )
{
table->field[0]->store ( pTable->m_iCondId, 1 );
pTable->m_bCondDone = true;
SPH_RET(0);
}
// return a value from key
longlong iRef = 0;
if ( key_len==4 )
iRef = uint4korr ( key );
else if ( key_len==8 )
iRef = uint8korr ( key );
else
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unexpected key length" );
SPH_RET ( HA_ERR_END_OF_FILE );
}
table->field[0]->store ( iRef, 1 );
pTable->m_bCondDone = true;
SPH_RET(0);
}
// parse query
if ( pTable->m_bQuery )
{
// we have a query from condition pushdown
m_pCurrentKey = (const byte *) pTable->m_sQuery;
m_iCurrentKeyLen = strlen(pTable->m_sQuery);
} else
{
// just use the key (might be truncated)
m_pCurrentKey = key+HA_KEY_BLOB_LENGTH;
m_iCurrentKeyLen = uint2korr(key); // or maybe key_len?
pTable->m_pQueryCharset = m_pShare ? m_pShare->m_pTableQueryCharset : NULL;
}
CSphSEQuery q ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, m_pShare->m_sIndex );
if ( !q.Parse () )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), q.m_sParseError );
SPH_RET ( HA_ERR_END_OF_FILE );
}
// do connect
int iSocket = ConnectAPI ( q.m_sHost, q.m_iPort );
if ( iSocket<0 )
SPH_RET ( HA_ERR_END_OF_FILE );
// my buffer
char * pBuffer; // will be free by CSphSEQuery dtor; do NOT free manually
int iReqLen = q.BuildRequest ( &pBuffer );
if ( iReqLen<=0 )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: q.BuildRequest() failed" );
SPH_RET ( HA_ERR_END_OF_FILE );
}
// send request
::send ( iSocket, pBuffer, iReqLen, 0 );
// receive reply
char sHeader[8];
int iGot = ::recv ( iSocket, sHeader, sizeof(sHeader), RECV_FLAGS );
if ( iGot!=sizeof(sHeader) )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "failed to receive response header (searchd went away?)" );
SPH_RET ( HA_ERR_END_OF_FILE );
}
short int uRespStatus = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[0] ) ) );
short int uRespVersion = ntohs ( sphUnalignedRead ( *(short int*)( &sHeader[2] ) ) );
uint uRespLength = ntohl ( sphUnalignedRead ( *(uint *)( &sHeader[4] ) ) );
SPH_DEBUG ( "got response header (status=%d version=%d length=%d)",
uRespStatus, uRespVersion, uRespLength );
SafeDeleteArray ( m_pResponse );
if ( uRespLength<=SPHINXSE_MAX_ALLOC )
m_pResponse = new char [ uRespLength+1 ];
if ( !m_pResponse )
{
my_snprintf ( sError, sizeof(sError), "bad searchd response length (length=%u)", uRespLength );
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
SPH_RET ( HA_ERR_END_OF_FILE );
}
int iRecvLength = 0;
while ( iRecvLength<(int)uRespLength )
{
int iRecv = ::recv ( iSocket, m_pResponse+iRecvLength, uRespLength-iRecvLength, RECV_FLAGS );
if ( iRecv<0 )
break;
iRecvLength += iRecv;
}
::closesocket ( iSocket );
iSocket = -1;
if ( iRecvLength!=(int)uRespLength )
{
my_snprintf ( sError, sizeof(sError), "net read error (expected=%d, got=%d)", uRespLength, iRecvLength );
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
SPH_RET ( HA_ERR_END_OF_FILE );
}
// we'll have a message, at least
pTable->m_bStats = true;
// parse reply
m_iCurrentPos = 0;
m_pCur = m_pResponse;
m_pResponseEnd = m_pResponse + uRespLength;
m_bUnpackError = false;
if ( uRespStatus!=SEARCHD_OK )
{
char * sMessage = UnpackString ();
if ( !sMessage )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "no valid response from searchd (status=%d, resplen=%d)",
uRespStatus, uRespLength );
SPH_RET ( HA_ERR_END_OF_FILE );
}
strncpy ( pTable->m_tStats.m_sLastMessage, sMessage, sizeof pTable->m_tStats.m_sLastMessage - 1 );
pTable->m_tStats.m_sLastMessage[sizeof pTable->m_tStats.m_sLastMessage - 1] = '\0';
SafeDeleteArray ( sMessage );
if ( uRespStatus!=SEARCHD_WARNING )
{
my_snprintf ( sError, sizeof(sError), "searchd error: %s", pTable->m_tStats.m_sLastMessage );
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), sError );
pTable->m_tStats.m_bLastError = true;
SPH_RET ( HA_ERR_END_OF_FILE );
}
}
if ( !UnpackSchema () )
SPH_RET ( HA_ERR_END_OF_FILE );
if ( !UnpackStats ( &pTable->m_tStats ) )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: UnpackStats() failed" );
SPH_RET ( HA_ERR_END_OF_FILE );
}
SPH_RET ( get_rec ( buf, key, key_len ) );
}
// Positions an index cursor to the index specified in key. Fetches the
// row if any. This is only used to read whole keys.
int ha_sphinx::index_read_idx ( byte *, uint, const byte *, uint, enum ha_rkey_function )
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_WRONG_COMMAND );
}
// Used to read forward through the index.
int ha_sphinx::index_next ( byte * buf )
{
SPH_ENTER_METHOD();
SPH_RET ( get_rec ( buf, m_pCurrentKey, m_iCurrentKeyLen ) );
}
int ha_sphinx::index_next_same ( byte * buf, const byte * key, uint keylen )
{
SPH_ENTER_METHOD();
SPH_RET ( get_rec ( buf, key, keylen ) );
}
int ha_sphinx::get_rec ( byte * buf, const byte *, uint )
{
SPH_ENTER_METHOD();
if ( m_iCurrentPos>=m_iMatchesTotal )
{
SafeDeleteArray ( m_pResponse );
SPH_RET ( HA_ERR_END_OF_FILE );
}
#if MYSQL_VERSION_ID>50100
MY_BITMAP * org_bitmap = dbug_tmp_use_all_columns ( table, &table->write_set );
#endif
Field ** field = table->field;
// unpack and return the match
longlong uMatchID = UnpackDword ();
if ( m_bId64 )
uMatchID = ( uMatchID<<32 ) + UnpackDword();
uint32 uMatchWeight = UnpackDword ();
field[0]->store ( uMatchID, 1 );
field[1]->store ( uMatchWeight, 1 );
field[2]->store ( (const char*)m_pCurrentKey, m_iCurrentKeyLen, &my_charset_bin );
for ( uint32 i=0; i<m_iAttrs; i++ )
{
longlong iValue64 = 0;
uint32 uValue = UnpackDword ();
if ( m_dAttrs[i].m_uType==SPH_ATTR_BIGINT )
iValue64 = ( (longlong)uValue<<32 ) | UnpackDword();
if ( m_dAttrs[i].m_iField<0 )
{
// skip MVA or String
if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET || m_dAttrs[i].m_uType==SPH_ATTR_UINT64SET )
{
for ( ; uValue>0 && !m_bUnpackError; uValue-- )
UnpackDword();
} else if ( m_dAttrs[i].m_uType==SPH_ATTR_STRING && CheckResponcePtr ( uValue ) )
{
m_pCur += uValue;
}
continue;
}
Field * af = field [ m_dAttrs[i].m_iField ];
switch ( m_dAttrs[i].m_uType )
{
case SPH_ATTR_INTEGER:
case SPH_ATTR_ORDINAL:
case SPH_ATTR_BOOL:
af->store ( uValue, 1 );
break;
case SPH_ATTR_FLOAT:
af->store ( sphDW2F(uValue) );
break;
case SPH_ATTR_TIMESTAMP:
if ( af->type()==MYSQL_TYPE_TIMESTAMP )
longstore ( af->ptr, uValue ); // because store() does not accept timestamps
else
af->store ( uValue, 1 );
break;
case SPH_ATTR_BIGINT:
af->store ( iValue64, 0 );
break;
case SPH_ATTR_STRING:
if ( !uValue )
af->store ( "", 0, &my_charset_bin );
else if ( CheckResponcePtr ( uValue ) )
{
af->store ( m_pCur, uValue, &my_charset_bin );
m_pCur += uValue;
}
break;
case SPH_ATTR_UINT64SET:
case SPH_ATTR_UINT32SET :
if ( uValue<=0 )
{
// shortcut, empty MVA set
af->store ( "", 0, &my_charset_bin );
} else
{
// convert MVA set to comma-separated string
char sBuf[1024]; // FIXME! magic size
char * pCur = sBuf;
if ( m_dAttrs[i].m_uType==SPH_ATTR_UINT32SET )
{
for ( ; uValue>0 && !m_bUnpackError; uValue-- )
{
uint32 uEntry = UnpackDword ();
if ( pCur < sBuf+sizeof(sBuf)-16 ) // 10 chars per 32bit value plus some safety bytes
{
snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u", uEntry );
while ( *pCur ) pCur++;
if ( uValue>1 )
*pCur++ = ','; // non-trailing commas
}
}
} else
{
for ( ; uValue>0 && !m_bUnpackError; uValue-=2 )
{
uint32 uEntryLo = UnpackDword ();
uint32 uEntryHi = UnpackDword();
if ( pCur < sBuf+sizeof(sBuf)-24 ) // 20 chars per 64bit value plus some safety bytes
{
snprintf ( pCur, sBuf+sizeof(sBuf)-pCur, "%u%u", uEntryHi, uEntryLo );
while ( *pCur ) pCur++;
if ( uValue>2 )
*pCur++ = ','; // non-trailing commas
}
}
}
af->store ( sBuf, uint(pCur-sBuf), &my_charset_bin );
}
break;
default:
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: unhandled attr type" );
SafeDeleteArray ( m_pResponse );
SPH_RET ( HA_ERR_END_OF_FILE );
}
}
if ( m_bUnpackError )
{
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0), "INTERNAL ERROR: response unpacker failed" );
SafeDeleteArray ( m_pResponse );
SPH_RET ( HA_ERR_END_OF_FILE );
}
// zero out unmapped fields
for ( int i=SPHINXSE_SYSTEM_COLUMNS; i<(int)table->s->fields; i++ )
if ( m_dUnboundFields[i]!=SPH_ATTR_NONE )
switch ( m_dUnboundFields[i] )
{
case SPH_ATTR_INTEGER: table->field[i]->store ( 0, 1 ); break;
case SPH_ATTR_TIMESTAMP: longstore ( table->field[i]->ptr, 0 ); break;
default:
my_error ( ER_QUERY_ON_FOREIGN_DATA_SOURCE, MYF(0),
"INTERNAL ERROR: unhandled unbound field type %d", m_dUnboundFields[i] );
SafeDeleteArray ( m_pResponse );
SPH_RET ( HA_ERR_END_OF_FILE );
}
memset ( buf, 0, table->s->null_bytes );
m_iCurrentPos++;
#if MYSQL_VERSION_ID > 50100
dbug_tmp_restore_column_map ( &table->write_set, org_bitmap );
#endif
SPH_RET(0);
}
// Used to read backwards through the index.
int ha_sphinx::index_prev ( byte * )
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_WRONG_COMMAND );
}
// index_first() asks for the first key in the index.
//
// Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
// and sql_select.cc.
int ha_sphinx::index_first ( byte * )
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_END_OF_FILE );
}
// index_last() asks for the last key in the index.
//
// Called from opt_range.cc, opt_sum.cc, sql_handler.cc,
// and sql_select.cc.
int ha_sphinx::index_last ( byte * )
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_WRONG_COMMAND );
}
int ha_sphinx::rnd_init ( bool )
{
SPH_ENTER_METHOD();
SPH_RET(0);
}
int ha_sphinx::rnd_end()
{
SPH_ENTER_METHOD();
SPH_RET(0);
}
int ha_sphinx::rnd_next ( byte * )
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_END_OF_FILE );
}
void ha_sphinx::position ( const byte * )
{
SPH_ENTER_METHOD();
SPH_VOID_RET();
}
// This is like rnd_next, but you are given a position to use
// to determine the row. The position will be of the type that you stored in
// ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key
// or position you saved when position() was called.
// Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.
int ha_sphinx::rnd_pos ( byte *, byte * )
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_WRONG_COMMAND );
}
#if MYSQL_VERSION_ID>=50030
int ha_sphinx::info ( uint )
#else
void ha_sphinx::info ( uint )
#endif
{
SPH_ENTER_METHOD();
if ( table->s->keys>0 )
table->key_info[0].rec_per_key[0] = 1;
#if MYSQL_VERSION_ID>50100
stats.records = 20;
#else
records = 20;
#endif
#if MYSQL_VERSION_ID>=50030
SPH_RET(0);
#else
SPH_VOID_RET();
#endif
}
int ha_sphinx::reset ()
{
SPH_ENTER_METHOD();
CSphSEThreadTable * pTable = GetTls ();
if ( pTable )
pTable->m_bQuery = false;
SPH_RET(0);
}
int ha_sphinx::delete_all_rows()
{
SPH_ENTER_METHOD();
SPH_RET ( HA_ERR_WRONG_COMMAND );
}
// First you should go read the section "locking functions for mysql" in
// lock.cc to understand this.
// This create a lock on the table. If you are implementing a storage engine
// that can handle transacations look at ha_berkely.cc to see how you will
// want to go about doing this. Otherwise you should consider calling flock()
// here.
//
// Called from lock.cc by lock_external() and unlock_external(). Also called
// from sql_table.cc by copy_data_between_tables().
int ha_sphinx::external_lock ( THD *, int )
{
SPH_ENTER_METHOD();
SPH_RET(0);
}
THR_LOCK_DATA ** ha_sphinx::store_lock ( THD *, THR_LOCK_DATA ** to,
enum thr_lock_type lock_type )
{
SPH_ENTER_METHOD();
if ( lock_type!=TL_IGNORE && m_tLock.type==TL_UNLOCK )
m_tLock.type = lock_type;
*to++ = &m_tLock;
SPH_RET(to);
}
int ha_sphinx::delete_table ( const char * )
{
SPH_ENTER_METHOD();
SPH_RET(0);
}
// Renames a table from one name to another from alter table call.
//
// If you do not implement this, the default rename_table() is called from
// handler.cc and it will delete all files with the file extensions returned
// by bas_ext().
//
// Called from sql_table.cc by mysql_rename_table().
int ha_sphinx::rename_table ( const char *, const char * )
{
SPH_ENTER_METHOD();
SPH_RET(0);
}
// Given a starting key, and an ending key estimate the number of rows that
// will exist between the two. end_key may be empty which in case determine
// if start_key matches any rows.
//
// Called from opt_range.cc by check_quick_keys().
ha_rows ha_sphinx::records_in_range ( uint, key_range *, key_range * )
{
SPH_ENTER_METHOD();
SPH_RET(3); // low number to force index usage
}
#if MYSQL_VERSION_ID < 50610
#define user_defined_key_parts key_parts
#endif
// create() is called to create a database. The variable name will have the name
// of the table. When create() is called you do not need to worry about opening
// the table. Also, the FRM file will have already been created so adjusting
// create_info will not do you any good. You can overwrite the frm file at this
// point if you wish to change the table definition, but there are no methods
// currently provided for doing that.
//
// Called from handle.cc by ha_create_table().
int ha_sphinx::create ( const char * name, TABLE * table_arg, HA_CREATE_INFO * )
{
SPH_ENTER_METHOD();
char sError[256];
CSphSEShare tInfo;
if ( !ParseUrl ( &tInfo, table_arg, true ) )
SPH_RET(-1);
// check SphinxAPI table
for ( ; !tInfo.m_bSphinxQL; )
{
// check system fields (count and types)
if ( table_arg->s->fields<SPHINXSE_SYSTEM_COLUMNS )
{
my_snprintf ( sError, sizeof(sError), "%s: there MUST be at least %d columns",
name, SPHINXSE_SYSTEM_COLUMNS );
break;
}
if ( !IsIDField ( table_arg->field[0] ) )
{
my_snprintf ( sError, sizeof(sError), "%s: 1st column (docid) MUST be unsigned integer or bigint", name );
break;
}
if ( !IsIntegerFieldType ( table_arg->field[1]->type() ) )
{
my_snprintf ( sError, sizeof(sError), "%s: 2nd column (weight) MUST be integer or bigint", name );
break;
}
enum_field_types f2 = table_arg->field[2]->type();
if ( f2!=MYSQL_TYPE_VARCHAR
&& f2!=MYSQL_TYPE_BLOB && f2!=MYSQL_TYPE_MEDIUM_BLOB && f2!=MYSQL_TYPE_LONG_BLOB && f2!=MYSQL_TYPE_TINY_BLOB )
{
my_snprintf ( sError, sizeof(sError), "%s: 3rd column (search query) MUST be varchar or text", name );
break;
}
// check attributes
int i;
for ( i=3; i<(int)table_arg->s->fields; i++ )
{
enum_field_types eType = table_arg->field[i]->type();
if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
{
my_snprintf ( sError, sizeof(sError), "%s: %dth column (attribute %s) MUST be integer, bigint, timestamp, varchar, or float",
name, i+1, table_arg->field[i]->field_name.str );
break;
}
}
if ( i!=(int)table_arg->s->fields )
break;
// check index
if (
table_arg->s->keys!=1 ||
table_arg->key_info[0].user_defined_key_parts!=1 ||
strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, table_arg->field[2]->field_name.str ) )
{
my_snprintf ( sError, sizeof(sError), "%s: there must be an index on '%s' column",
name, table_arg->field[2]->field_name.str );
break;
}
// all good
sError[0] = '\0';
break;
}
// check SphinxQL table
for ( ; tInfo.m_bSphinxQL; )
{
sError[0] = '\0';
// check that 1st column is id, is of int type, and has an index
if ( strcmp ( table_arg->field[0]->field_name.str, "id" ) )
{
my_snprintf ( sError, sizeof(sError), "%s: 1st column must be called 'id'", name );
break;
}
if ( !IsIDField ( table_arg->field[0] ) )
{
my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be INT UNSIGNED or BIGINT", name );
break;
}
// check index
if (
table_arg->s->keys!=1 ||
table_arg->key_info[0].user_defined_key_parts!=1 ||
strcasecmp ( table_arg->key_info[0].key_part[0].field->field_name.str, "id" ) )
{
my_snprintf ( sError, sizeof(sError), "%s: 'id' column must be indexed", name );
break;
}
// check column types
for ( int i=1; i<(int)table_arg->s->fields; i++ )
{
enum_field_types eType = table_arg->field[i]->type();
if ( eType!=MYSQL_TYPE_TIMESTAMP && !IsIntegerFieldType(eType) && eType!=MYSQL_TYPE_VARCHAR && eType!=MYSQL_TYPE_FLOAT )
{
my_snprintf ( sError, sizeof(sError), "%s: column %d(%s) is of unsupported type (use int/bigint/timestamp/varchar/float)",
name, i+1, table_arg->field[i]->field_name.str );
break;
}
}
if ( sError[0] )
break;
// all good
break;
}
// report and bail
if ( sError[0] )
{
my_printf_error(ER_CANT_CREATE_TABLE,
"Can\'t create table %s.%s (Error: %s)",
MYF(0),
table_arg->s->db.str,
table_arg->s->table_name.str, sError);
SPH_RET(-1);
}
SPH_RET(0);
}
// show functions
#if MYSQL_VERSION_ID<50100
#define SHOW_VAR_FUNC_BUFF_SIZE 1024
#endif
CSphSEStats * sphinx_get_stats ( THD * thd, SHOW_VAR * out )
{
#if MYSQL_VERSION_ID>50100
if ( sphinx_hton_ptr )
{
CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
return &pTls->m_pHeadTable->m_tStats;
}
#else
CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
return &pTls->m_pHeadTable->m_tStats;
#endif
out->type = SHOW_CHAR;
out->value = (char*) "";
return 0;
}
int sphinx_showfunc_total ( THD * thd, SHOW_VAR * out, char * )
{
CSphSEStats * pStats = sphinx_get_stats ( thd, out );
if ( pStats )
{
out->type = SHOW_INT;
out->value = (char *) &pStats->m_iMatchesTotal;
}
return 0;
}
int sphinx_showfunc_total_found ( THD * thd, SHOW_VAR * out, char * )
{
CSphSEStats * pStats = sphinx_get_stats ( thd, out );
if ( pStats )
{
out->type = SHOW_INT;
out->value = (char *) &pStats->m_iMatchesFound;
}
return 0;
}
int sphinx_showfunc_time ( THD * thd, SHOW_VAR * out, char * )
{
CSphSEStats * pStats = sphinx_get_stats ( thd, out );
if ( pStats )
{
out->type = SHOW_INT;
out->value = (char *) &pStats->m_iQueryMsec;
}
return 0;
}
int sphinx_showfunc_word_count ( THD * thd, SHOW_VAR * out, char * )
{
CSphSEStats * pStats = sphinx_get_stats ( thd, out );
if ( pStats )
{
out->type = SHOW_INT;
out->value = (char *) &pStats->m_iWords;
}
return 0;
}
int sphinx_showfunc_words ( THD * thd, SHOW_VAR * out, char * sBuffer )
{
#if MYSQL_VERSION_ID>50100
if ( sphinx_hton_ptr )
{
CSphTLS * pTls = (CSphTLS *) *thd_ha_data ( thd, sphinx_hton_ptr );
#else
{
CSphTLS * pTls = (CSphTLS *) thd->ha_data[sphinx_hton.slot];
#endif
if ( pTls && pTls->m_pHeadTable && pTls->m_pHeadTable->m_bStats )
{
CSphSEStats * pStats = &pTls->m_pHeadTable->m_tStats;
if ( pStats && pStats->m_iWords )
{
uint uBuffLen = 0;
out->type = SHOW_CHAR;
out->value = sBuffer;
// the following is partially based on code in sphinx_show_status()
sBuffer[0] = 0;
for ( int i=0; i<pStats->m_iWords; i++ )
{
CSphSEWordStats & tWord = pStats->m_dWords[i];
uBuffLen = my_snprintf ( sBuffer, SHOW_VAR_FUNC_BUFF_SIZE, "%s%s:%d:%d ", sBuffer,
tWord.m_sWord, tWord.m_iDocs, tWord.m_iHits );
}
if ( uBuffLen > 0 )
{
// trim last space
sBuffer [ --uBuffLen ] = 0;
if ( pTls->m_pHeadTable->m_pQueryCharset )
{
// String::c_ptr() will nul-terminate the buffer.
//
// NOTE: It's not entirely clear whether this conversion is necessary at all.
String sConvert;
uint iErrors;
sConvert.copy ( sBuffer, uBuffLen, pTls->m_pHeadTable->m_pQueryCharset, system_charset_info, &iErrors );
memcpy ( sBuffer, sConvert.c_ptr(), sConvert.length() + 1 );
}
}
return 0;
}
}
}
out->type = SHOW_CHAR;
out->value = (char*) "";
return 0;
}
int sphinx_showfunc_error ( THD * thd, SHOW_VAR * out, char * )
{
CSphSEStats * pStats = sphinx_get_stats ( thd, out );
out->type = SHOW_CHAR;
if ( pStats && pStats->m_bLastError )
{
out->value = pStats->m_sLastMessage;
}
else
out->value = (char*)"";
return 0;
}
#if MYSQL_VERSION_ID>50100
struct st_mysql_storage_engine sphinx_storage_engine =
{
MYSQL_HANDLERTON_INTERFACE_VERSION
};
struct st_mysql_show_var sphinx_status_vars[] =
{
{"Sphinx_total", (char *)sphinx_showfunc_total, SHOW_SIMPLE_FUNC},
{"Sphinx_total_found", (char *)sphinx_showfunc_total_found, SHOW_SIMPLE_FUNC},
{"Sphinx_time", (char *)sphinx_showfunc_time, SHOW_SIMPLE_FUNC},
{"Sphinx_word_count", (char *)sphinx_showfunc_word_count, SHOW_SIMPLE_FUNC},
{"Sphinx_words", (char *)sphinx_showfunc_words, SHOW_SIMPLE_FUNC},
{"Sphinx_error", (char *)sphinx_showfunc_error, SHOW_SIMPLE_FUNC},
{0, 0, (enum_mysql_show_type)0}
};
maria_declare_plugin(sphinx)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&sphinx_storage_engine,
sphinx_hton_name,
"Sphinx developers",
sphinx_hton_comment,
PLUGIN_LICENSE_GPL,
sphinx_init_func, // Plugin Init
sphinx_done_func, // Plugin Deinit
0x0202, // 2.2
sphinx_status_vars,
NULL,
SPHINXSE_VERSION, // string version
MariaDB_PLUGIN_MATURITY_GAMMA
}
maria_declare_plugin_end;
#endif // >50100
//
// $Id: ha_sphinx.cc 4842 2014-11-12 21:03:06Z deogar $
//