mirror of
https://github.com/MariaDB/server.git
synced 2025-08-07 00:04:31 +03:00
ndb - wl#2972 rbr blobs: write blob data to binlog
This commit is contained in:
@@ -28,3 +28,7 @@ ndb_autodiscover : Needs to be fixed w.r.t binlog
|
|||||||
ndb_autodiscover2 : Needs to be fixed w.r.t binlog
|
ndb_autodiscover2 : Needs to be fixed w.r.t binlog
|
||||||
system_mysql_db : Needs fixing
|
system_mysql_db : Needs fixing
|
||||||
system_mysql_db_fix : Needs fixing
|
system_mysql_db_fix : Needs fixing
|
||||||
|
#ndb_alter_table_row : sometimes wrong error 1015!=1046
|
||||||
|
ndb_gis : garbled msgs from corrupt THD* + partitioning problem
|
||||||
|
|
||||||
|
# vim: set filetype=conf:
|
||||||
|
@@ -35,6 +35,11 @@
|
|||||||
|
|
||||||
#include "ha_ndbcluster_binlog.h"
|
#include "ha_ndbcluster_binlog.h"
|
||||||
|
|
||||||
|
#ifdef ndb_dynamite
|
||||||
|
#undef assert
|
||||||
|
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
|
||||||
|
#endif
|
||||||
|
|
||||||
// options from from mysqld.cc
|
// options from from mysqld.cc
|
||||||
extern my_bool opt_ndb_optimized_node_selection;
|
extern my_bool opt_ndb_optimized_node_selection;
|
||||||
extern const char *opt_ndbcluster_connectstring;
|
extern const char *opt_ndbcluster_connectstring;
|
||||||
@@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
|
|||||||
if (ndb_blob->blobsNextBlob() != NULL)
|
if (ndb_blob->blobsNextBlob() != NULL)
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
ha_ndbcluster *ha= (ha_ndbcluster *)arg;
|
ha_ndbcluster *ha= (ha_ndbcluster *)arg;
|
||||||
DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
|
int ret= get_ndb_blobs_value(ha->table, ha->m_value,
|
||||||
|
ha->m_blobs_buffer, ha->m_blobs_buffer_size,
|
||||||
|
0);
|
||||||
|
DBUG_RETURN(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
|
/*
|
||||||
|
This routine is shared by injector. There is no common blobs buffer
|
||||||
|
so the buffer and length are passed by reference. Injector also
|
||||||
|
passes a record pointer diff.
|
||||||
|
*/
|
||||||
|
int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
|
||||||
|
byte*& buffer, uint& buffer_size,
|
||||||
|
my_ptrdiff_t ptrdiff)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("get_ndb_blobs_value");
|
DBUG_ENTER("get_ndb_blobs_value");
|
||||||
|
|
||||||
@@ -803,44 +818,51 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
|
|||||||
for (int loop= 0; loop <= 1; loop++)
|
for (int loop= 0; loop <= 1; loop++)
|
||||||
{
|
{
|
||||||
uint32 offset= 0;
|
uint32 offset= 0;
|
||||||
for (uint i= 0; i < table_share->fields; i++)
|
for (uint i= 0; i < table->s->fields; i++)
|
||||||
{
|
{
|
||||||
Field *field= table->field[i];
|
Field *field= table->field[i];
|
||||||
NdbValue value= m_value[i];
|
NdbValue value= value_array[i];
|
||||||
if (value.ptr != NULL && (field->flags & BLOB_FLAG))
|
if (value.ptr != NULL && (field->flags & BLOB_FLAG))
|
||||||
{
|
{
|
||||||
Field_blob *field_blob= (Field_blob *)field;
|
Field_blob *field_blob= (Field_blob *)field;
|
||||||
NdbBlob *ndb_blob= value.blob;
|
NdbBlob *ndb_blob= value.blob;
|
||||||
Uint64 blob_len= 0;
|
int isNull;
|
||||||
if (ndb_blob->getLength(blob_len) != 0)
|
ndb_blob->getDefined(isNull);
|
||||||
DBUG_RETURN(-1);
|
if (isNull == 0) { // XXX -1 should be allowed only for events
|
||||||
// Align to Uint64
|
Uint64 blob_len= 0;
|
||||||
uint32 blob_size= blob_len;
|
if (ndb_blob->getLength(blob_len) != 0)
|
||||||
if (blob_size % 8 != 0)
|
|
||||||
blob_size+= 8 - blob_size % 8;
|
|
||||||
if (loop == 1)
|
|
||||||
{
|
|
||||||
char *buf= m_blobs_buffer + offset;
|
|
||||||
uint32 len= 0xffffffff; // Max uint32
|
|
||||||
DBUG_PRINT("value", ("read blob ptr=%lx len=%u",
|
|
||||||
buf, (uint) blob_len));
|
|
||||||
if (ndb_blob->readData(buf, len) != 0)
|
|
||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
DBUG_ASSERT(len == blob_len);
|
// Align to Uint64
|
||||||
field_blob->set_ptr(len, buf);
|
uint32 blob_size= blob_len;
|
||||||
|
if (blob_size % 8 != 0)
|
||||||
|
blob_size+= 8 - blob_size % 8;
|
||||||
|
if (loop == 1)
|
||||||
|
{
|
||||||
|
char *buf= buffer + offset;
|
||||||
|
uint32 len= 0xffffffff; // Max uint32
|
||||||
|
DBUG_PRINT("info", ("read blob ptr=%p len=%u",
|
||||||
|
buf, (uint) blob_len));
|
||||||
|
if (ndb_blob->readData(buf, len) != 0)
|
||||||
|
DBUG_RETURN(-1);
|
||||||
|
DBUG_ASSERT(len == blob_len);
|
||||||
|
// Ugly hack assumes only ptr needs to be changed
|
||||||
|
field_blob->ptr += ptrdiff;
|
||||||
|
field_blob->set_ptr(len, buf);
|
||||||
|
field_blob->ptr -= ptrdiff;
|
||||||
|
}
|
||||||
|
offset+= blob_size;
|
||||||
}
|
}
|
||||||
offset+= blob_size;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (loop == 0 && offset > m_blobs_buffer_size)
|
if (loop == 0 && offset > buffer_size)
|
||||||
{
|
{
|
||||||
my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
|
my_free(buffer, MYF(MY_ALLOW_ZERO_PTR));
|
||||||
m_blobs_buffer_size= 0;
|
buffer_size= 0;
|
||||||
DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
|
DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
|
||||||
m_blobs_buffer= my_malloc(offset, MYF(MY_WME));
|
buffer= my_malloc(offset, MYF(MY_WME));
|
||||||
if (m_blobs_buffer == NULL)
|
if (buffer == NULL)
|
||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
m_blobs_buffer_size= offset;
|
buffer_size= offset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
@@ -2713,14 +2735,22 @@ void ndb_unpack_record(TABLE *table, NdbValue *value,
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
NdbBlob *ndb_blob= (*value).blob;
|
NdbBlob *ndb_blob= (*value).blob;
|
||||||
bool isNull= TRUE;
|
int isNull;
|
||||||
#ifndef DBUG_OFF
|
ndb_blob->getDefined(isNull);
|
||||||
int ret=
|
if (isNull != 0)
|
||||||
#endif
|
{
|
||||||
ndb_blob->getNull(isNull);
|
uint col_no = ndb_blob->getColumn()->getColumnNo();
|
||||||
DBUG_ASSERT(ret == 0);
|
if (isNull == 1)
|
||||||
if (isNull)
|
{
|
||||||
field->set_null(row_offset);
|
DBUG_PRINT("info",("[%u] NULL", col_no))
|
||||||
|
field->set_null(row_offset);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
|
||||||
|
bitmap_clear_bit(defined, col_no);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to)
|
|||||||
NDBDICT *dict= ndb->getDictionary();
|
NDBDICT *dict= ndb->getDictionary();
|
||||||
const NDBTAB *orig_tab= (const NDBTAB *) m_table;
|
const NDBTAB *orig_tab= (const NDBTAB *) m_table;
|
||||||
DBUG_ENTER("alter_table_name");
|
DBUG_ENTER("alter_table_name");
|
||||||
|
DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to));
|
||||||
|
|
||||||
NdbDictionary::Table new_tab= *orig_tab;
|
NdbDictionary::Table new_tab= *orig_tab;
|
||||||
new_tab.setName(to);
|
new_tab.setName(to);
|
||||||
|
@@ -25,6 +25,9 @@
|
|||||||
#pragma interface /* gcc class implementation */
|
#pragma interface /* gcc class implementation */
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/* Blob tables and events are internal to NDB and must never be accessed */
|
||||||
|
#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB")
|
||||||
|
|
||||||
#include <NdbApi.hpp>
|
#include <NdbApi.hpp>
|
||||||
#include <ndbapi_limits.h>
|
#include <ndbapi_limits.h>
|
||||||
|
|
||||||
@@ -78,6 +81,10 @@ typedef struct ndb_index_data {
|
|||||||
|
|
||||||
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
|
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
|
||||||
|
|
||||||
|
int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
|
||||||
|
byte*& buffer, uint& buffer_size,
|
||||||
|
my_ptrdiff_t ptrdiff);
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
NSS_INITIAL= 0,
|
NSS_INITIAL= 0,
|
||||||
NSS_DROPPED,
|
NSS_DROPPED,
|
||||||
@@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share {
|
|||||||
#ifdef HAVE_NDB_BINLOG
|
#ifdef HAVE_NDB_BINLOG
|
||||||
/* NDB_SHARE.flags */
|
/* NDB_SHARE.flags */
|
||||||
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
|
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
|
||||||
|
#define NSF_BLOB_FLAG 2 /* table has blob attributes */
|
||||||
#define NSF_NO_BINLOG 4 /* table should not be binlogged */
|
#define NSF_NO_BINLOG 4 /* table should not be binlogged */
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@@ -23,6 +23,11 @@
|
|||||||
#include "slave.h"
|
#include "slave.h"
|
||||||
#include "ha_ndbcluster_binlog.h"
|
#include "ha_ndbcluster_binlog.h"
|
||||||
|
|
||||||
|
#ifdef ndb_dynamite
|
||||||
|
#undef assert
|
||||||
|
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
defines for cluster replication table names
|
defines for cluster replication table names
|
||||||
*/
|
*/
|
||||||
@@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
|
|||||||
DBUG_ASSERT(_table != 0);
|
DBUG_ASSERT(_table != 0);
|
||||||
if (_table->s->primary_key == MAX_KEY)
|
if (_table->s->primary_key == MAX_KEY)
|
||||||
share->flags|= NSF_HIDDEN_PK;
|
share->flags|= NSF_HIDDEN_PK;
|
||||||
|
if (_table->s->blob_fields != 0)
|
||||||
|
share->flags|= NSF_BLOB_FLAG;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
while (1)
|
while (1)
|
||||||
@@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
|
|||||||
}
|
}
|
||||||
if (table->s->primary_key == MAX_KEY)
|
if (table->s->primary_key == MAX_KEY)
|
||||||
share->flags|= NSF_HIDDEN_PK;
|
share->flags|= NSF_HIDDEN_PK;
|
||||||
|
if (table->s->blob_fields != 0)
|
||||||
|
share->flags|= NSF_BLOB_FLAG;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
|
|||||||
NDB_SHARE *share)
|
NDB_SHARE *share)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("ndbcluster_create_binlog_setup");
|
DBUG_ENTER("ndbcluster_create_binlog_setup");
|
||||||
|
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
|
||||||
|
|
||||||
pthread_mutex_lock(&ndbcluster_mutex);
|
pthread_mutex_lock(&ndbcluster_mutex);
|
||||||
|
|
||||||
@@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
|
|||||||
const char *event_name, NDB_SHARE *share)
|
const char *event_name, NDB_SHARE *share)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("ndbcluster_create_event");
|
DBUG_ENTER("ndbcluster_create_event");
|
||||||
|
DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
|
||||||
|
ndbtab->getName(), ndbtab->getObjectVersion(),
|
||||||
|
event_name, share ? share->key : "(nil)"));
|
||||||
|
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
|
||||||
if (!share)
|
if (!share)
|
||||||
{
|
{
|
||||||
DBUG_PRINT("info", ("share == NULL"));
|
DBUG_PRINT("info", ("share == NULL"));
|
||||||
@@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
|
|||||||
my_event.addTableEvent(NDBEVENT::TE_ALL);
|
my_event.addTableEvent(NDBEVENT::TE_ALL);
|
||||||
if (share->flags & NSF_HIDDEN_PK)
|
if (share->flags & NSF_HIDDEN_PK)
|
||||||
{
|
{
|
||||||
/* No primary key, susbscribe for all attributes */
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
{
|
||||||
|
sql_print_error("NDB Binlog: logging of table %s "
|
||||||
|
"with no PK and blob attributes is not supported",
|
||||||
|
share->key);
|
||||||
|
DBUG_RETURN(-1);
|
||||||
|
}
|
||||||
|
/* No primary key, subscribe for all attributes */
|
||||||
my_event.setReport(NDBEVENT::ER_ALL);
|
my_event.setReport(NDBEVENT::ER_ALL);
|
||||||
DBUG_PRINT("info", ("subscription all"));
|
DBUG_PRINT("info", ("subscription all"));
|
||||||
}
|
}
|
||||||
@@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
|
|||||||
DBUG_PRINT("info", ("subscription all and subscribe"));
|
DBUG_PRINT("info", ("subscription all and subscribe"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
my_event.mergeEvents(true);
|
||||||
|
|
||||||
/* add all columns to the event */
|
/* add all columns to the event */
|
||||||
int n_cols= ndbtab->getNoOfColumns();
|
int n_cols= ndbtab->getNoOfColumns();
|
||||||
@@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
DBUG_ENTER("ndbcluster_create_event_ops");
|
DBUG_ENTER("ndbcluster_create_event_ops");
|
||||||
|
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
|
||||||
|
|
||||||
DBUG_ASSERT(share != 0);
|
DBUG_ASSERT(share != 0);
|
||||||
|
|
||||||
@@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
|
|||||||
}
|
}
|
||||||
|
|
||||||
TABLE *table= share->table;
|
TABLE *table= share->table;
|
||||||
if (table)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
Logging of blob tables is not yet implemented, it would require:
|
|
||||||
1. setup of events also on the blob attribute tables
|
|
||||||
2. collect the pieces of the blob into one from an epoch to
|
|
||||||
provide a full blob to binlog
|
|
||||||
*/
|
|
||||||
if (table->s->blob_fields)
|
|
||||||
{
|
|
||||||
sql_print_error("NDB Binlog: logging of blob table %s "
|
|
||||||
"is not supported", share->key);
|
|
||||||
share->flags|= NSF_NO_BINLOG;
|
|
||||||
DBUG_RETURN(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int do_schema_share= 0, do_apply_status_share= 0;
|
int do_schema_share= 0, do_apply_status_share= 0;
|
||||||
int retries= 100;
|
int retries= 100;
|
||||||
@@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
|
|||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
op->mergeEvents(true); // currently not inherited from event
|
||||||
|
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Given servers S1 S2, following results in out-of-date
|
||||||
|
* event->m_tableImpl and column->m_blobTable.
|
||||||
|
*
|
||||||
|
* S1: create table t1(a int primary key);
|
||||||
|
* S2: drop table t1;
|
||||||
|
* S1: create table t2(a int primary key, b blob);
|
||||||
|
* S1: alter table t2 add x int;
|
||||||
|
* S1: alter table t2 drop x;
|
||||||
|
*
|
||||||
|
* TODO fix at right place before we get here
|
||||||
|
*/
|
||||||
|
ndb->getDictionary()->fix_blob_events(ndbtab, event_name);
|
||||||
|
}
|
||||||
|
|
||||||
int n_columns= ndbtab->getNoOfColumns();
|
int n_columns= ndbtab->getNoOfColumns();
|
||||||
int n_fields= table ? table->s->fields : 0;
|
int n_fields= table ? table->s->fields : 0; // XXX ???
|
||||||
for (int j= 0; j < n_columns; j++)
|
for (int j= 0; j < n_columns; j++)
|
||||||
{
|
{
|
||||||
const char *col_name= ndbtab->getColumn(j)->getName();
|
const char *col_name= ndbtab->getColumn(j)->getName();
|
||||||
NdbRecAttr *attr0, *attr1;
|
NdbValue attr0, attr1;
|
||||||
if (j < n_fields)
|
if (j < n_fields)
|
||||||
{
|
{
|
||||||
Field *f= share->table->field[j];
|
Field *f= share->table->field[j];
|
||||||
if (is_ndb_compatible_type(f))
|
if (is_ndb_compatible_type(f))
|
||||||
{
|
{
|
||||||
DBUG_PRINT("info", ("%s compatible", col_name));
|
DBUG_PRINT("info", ("%s compatible", col_name));
|
||||||
attr0= op->getValue(col_name, f->ptr);
|
attr0.rec= op->getValue(col_name, f->ptr);
|
||||||
attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
|
attr1.rec= op->getPreValue(col_name,
|
||||||
|
(f->ptr - share->table->record[0]) +
|
||||||
share->table->record[1]);
|
share->table->record[1]);
|
||||||
}
|
}
|
||||||
|
else if (! (f->flags & BLOB_FLAG))
|
||||||
|
{
|
||||||
|
DBUG_PRINT("info", ("%s non compatible", col_name));
|
||||||
|
attr0.rec= op->getValue(col_name);
|
||||||
|
attr1.rec= op->getPreValue(col_name);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
DBUG_PRINT("info", ("%s non compatible", col_name));
|
DBUG_PRINT("info", ("%s blob", col_name));
|
||||||
attr0= op->getValue(col_name);
|
attr0.blob= op->getBlobHandle(col_name);
|
||||||
attr1= op->getPreValue(col_name);
|
attr1.blob= op->getPreBlobHandle(col_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
DBUG_PRINT("info", ("%s hidden key", col_name));
|
DBUG_PRINT("info", ("%s hidden key", col_name));
|
||||||
attr0= op->getValue(col_name);
|
attr0.rec= op->getValue(col_name);
|
||||||
attr1= op->getPreValue(col_name);
|
attr1.rec= op->getPreValue(col_name);
|
||||||
}
|
}
|
||||||
share->ndb_value[0][j].rec= attr0;
|
share->ndb_value[0][j].ptr= attr0.ptr;
|
||||||
share->ndb_value[1][j].rec= attr1;
|
share->ndb_value[1][j].ptr= attr1.ptr;
|
||||||
}
|
}
|
||||||
op->setCustomData((void *) share); // set before execute
|
op->setCustomData((void *) share); // set before execute
|
||||||
share->op= op; // assign op in NDB_SHARE
|
share->op= op; // assign op in NDB_SHARE
|
||||||
@@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
(saves moving data about many times)
|
(saves moving data about many times)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
for now malloc/free blobs buffer each time
|
||||||
|
TODO if possible share single permanent buffer with handlers
|
||||||
|
*/
|
||||||
|
byte* blobs_buffer[2] = { 0, 0 };
|
||||||
|
uint blobs_buffer_size[2] = { 0, 0 };
|
||||||
|
|
||||||
switch(pOp->getEventType())
|
switch(pOp->getEventType())
|
||||||
{
|
{
|
||||||
case NDBEVENT::TE_INSERT:
|
case NDBEVENT::TE_INSERT:
|
||||||
row.n_inserts++;
|
row.n_inserts++;
|
||||||
DBUG_PRINT("info", ("INSERT INTO %s", share->key));
|
DBUG_PRINT("info", ("INSERT INTO %s", share->key));
|
||||||
{
|
{
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
{
|
||||||
|
my_ptrdiff_t ptrdiff= 0;
|
||||||
|
int ret= get_ndb_blobs_value(table, share->ndb_value[0],
|
||||||
|
blobs_buffer[0], blobs_buffer_size[0],
|
||||||
|
ptrdiff);
|
||||||
|
DBUG_ASSERT(ret == 0);
|
||||||
|
}
|
||||||
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
|
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
|
||||||
trans.write_row(::server_id, injector::transaction::table(table, true),
|
trans.write_row(::server_id, injector::transaction::table(table, true),
|
||||||
&b, n_fields, table->record[0]);
|
&b, n_fields, table->record[0]);
|
||||||
@@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
key
|
key
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
{
|
||||||
|
my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
|
||||||
|
int ret= get_ndb_blobs_value(table, share->ndb_value[n],
|
||||||
|
blobs_buffer[n], blobs_buffer_size[n],
|
||||||
|
ptrdiff);
|
||||||
|
DBUG_ASSERT(ret == 0);
|
||||||
|
}
|
||||||
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
|
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
|
||||||
print_records(table, table->record[n]);
|
print_records(table, table->record[n]);
|
||||||
trans.delete_row(::server_id, injector::transaction::table(table, true),
|
trans.delete_row(::server_id, injector::transaction::table(table, true),
|
||||||
@@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
row.n_updates++;
|
row.n_updates++;
|
||||||
DBUG_PRINT("info", ("UPDATE %s", share->key));
|
DBUG_PRINT("info", ("UPDATE %s", share->key));
|
||||||
{
|
{
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
{
|
||||||
|
my_ptrdiff_t ptrdiff= 0;
|
||||||
|
int ret= get_ndb_blobs_value(table, share->ndb_value[0],
|
||||||
|
blobs_buffer[0], blobs_buffer_size[0],
|
||||||
|
ptrdiff);
|
||||||
|
DBUG_ASSERT(ret == 0);
|
||||||
|
}
|
||||||
ndb_unpack_record(table, share->ndb_value[0],
|
ndb_unpack_record(table, share->ndb_value[0],
|
||||||
&b, table->record[0]);
|
&b, table->record[0]);
|
||||||
print_records(table, table->record[0]);
|
print_records(table, table->record[0]);
|
||||||
if (table->s->primary_key != MAX_KEY)
|
if (table->s->primary_key != MAX_KEY)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
since table has a primary key, we can to a write
|
since table has a primary key, we can do a write
|
||||||
using only after values
|
using only after values
|
||||||
*/
|
*/
|
||||||
trans.write_row(::server_id, injector::transaction::table(table, true),
|
trans.write_row(::server_id, injector::transaction::table(table, true),
|
||||||
@@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
mysql server cannot handle the ndb hidden key and
|
mysql server cannot handle the ndb hidden key and
|
||||||
therefore needs the before image as well
|
therefore needs the before image as well
|
||||||
*/
|
*/
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
{
|
||||||
|
my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
|
||||||
|
int ret= get_ndb_blobs_value(table, share->ndb_value[1],
|
||||||
|
blobs_buffer[1], blobs_buffer_size[1],
|
||||||
|
ptrdiff);
|
||||||
|
DBUG_ASSERT(ret == 0);
|
||||||
|
}
|
||||||
ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
|
ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
|
||||||
print_records(table, table->record[1]);
|
print_records(table, table->record[1]);
|
||||||
trans.update_row(::server_id,
|
trans.update_row(::server_id,
|
||||||
@@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (share->flags & NSF_BLOB_FLAG)
|
||||||
|
{
|
||||||
|
my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
|
||||||
|
my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
|
|||||||
Binlog_index_row row;
|
Binlog_index_row row;
|
||||||
while (pOp != NULL)
|
while (pOp != NULL)
|
||||||
{
|
{
|
||||||
|
// sometimes get TE_ALTER with invalid table
|
||||||
|
DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
|
||||||
|
! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName()));
|
||||||
ndb->
|
ndb->
|
||||||
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
|
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
|
||||||
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
|
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
|
||||||
@@ -2684,6 +2767,7 @@ err:
|
|||||||
DBUG_PRINT("info",("removing all event operations"));
|
DBUG_PRINT("info",("removing all event operations"));
|
||||||
while ((op= ndb->getEventOperation()))
|
while ((op= ndb->getEventOperation()))
|
||||||
{
|
{
|
||||||
|
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName()));
|
||||||
DBUG_PRINT("info",("removing event operation on %s",
|
DBUG_PRINT("info",("removing event operation on %s",
|
||||||
op->getEvent()->getName()));
|
op->getEvent()->getName()));
|
||||||
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
|
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
|
||||||
|
@@ -883,6 +883,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||||
|
friend class NdbDictionaryImpl;
|
||||||
friend class NdbTableImpl;
|
friend class NdbTableImpl;
|
||||||
#endif
|
#endif
|
||||||
class NdbTableImpl & m_impl;
|
class NdbTableImpl & m_impl;
|
||||||
@@ -1764,6 +1765,7 @@ public:
|
|||||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||||
const Table * getTable(const char * name, void **data) const;
|
const Table * getTable(const char * name, void **data) const;
|
||||||
void set_local_table_data_size(unsigned sz);
|
void set_local_table_data_size(unsigned sz);
|
||||||
|
void fix_blob_events(const Table* table, const char* ev_name);
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
@@ -1327,10 +1327,10 @@ NdbBlob::prepareColumn()
|
|||||||
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
|
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
|
||||||
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
|
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
|
||||||
if (thePartSize > 0) {
|
if (thePartSize > 0) {
|
||||||
const NdbDictionary::Table* bt = NULL;
|
const NdbTableImpl* bt = NULL;
|
||||||
const NdbDictionary::Column* bc = NULL;
|
const NdbColumnImpl* bc = NULL;
|
||||||
if (theStripeSize == 0 ||
|
if (theStripeSize == 0 ||
|
||||||
(bt = theColumn->getBlobTable()) == NULL ||
|
(bt = theColumn->m_blobTable) == NULL ||
|
||||||
(bc = bt->getColumn("DATA")) == NULL ||
|
(bc = bt->getColumn("DATA")) == NULL ||
|
||||||
bc->getType() != partType ||
|
bc->getType() != partType ||
|
||||||
bc->getLength() != (int)thePartSize) {
|
bc->getLength() != (int)thePartSize) {
|
||||||
|
@@ -1478,6 +1478,12 @@ NdbDictionary::Dictionary::getNdbError() const {
|
|||||||
return m_impl.getNdbError();
|
return m_impl.getNdbError();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
NdbDictionary::Dictionary::fix_blob_events(const Table* table, const char* ev_name)
|
||||||
|
{
|
||||||
|
m_impl.fix_blob_events(table, ev_name);
|
||||||
|
}
|
||||||
|
|
||||||
// printers
|
// printers
|
||||||
|
|
||||||
NdbOut&
|
NdbOut&
|
||||||
|
@@ -3398,12 +3398,14 @@ NdbDictionaryImpl::getEvent(const char * eventName)
|
|||||||
if (ev->m_tableId == info->m_table_impl->m_id &&
|
if (ev->m_tableId == info->m_table_impl->m_id &&
|
||||||
ev->m_tableVersion == info->m_table_impl->m_version)
|
ev->m_tableVersion == info->m_table_impl->m_version)
|
||||||
break;
|
break;
|
||||||
|
DBUG_PRINT("error",("%s: retry=%d: "
|
||||||
|
"table version mismatch, event: [%u,%u] table: [%u,%u]",
|
||||||
|
ev->getTableName(), retry,
|
||||||
|
ev->m_tableId, ev->m_tableVersion,
|
||||||
|
info->m_table_impl->m_id, info->m_table_impl->m_version));
|
||||||
if (retry)
|
if (retry)
|
||||||
{
|
{
|
||||||
m_error.code= 241;
|
m_error.code= 241;
|
||||||
DBUG_PRINT("error",("%s: table version mismatch, event: [%u,%u] table: [%u,%u]",
|
|
||||||
ev->getTableName(), ev->m_tableId, ev->m_tableVersion,
|
|
||||||
info->m_table_impl->m_id, info->m_table_impl->m_version));
|
|
||||||
delete ev;
|
delete ev;
|
||||||
DBUG_RETURN(NULL);
|
DBUG_RETURN(NULL);
|
||||||
}
|
}
|
||||||
@@ -3607,7 +3609,7 @@ NdbDictionaryImpl::dropEvent(const char * eventName)
|
|||||||
if (m_error.code != 723 && // no such table
|
if (m_error.code != 723 && // no such table
|
||||||
m_error.code != 241) // invalid table
|
m_error.code != 241) // invalid table
|
||||||
DBUG_RETURN(-1);
|
DBUG_RETURN(-1);
|
||||||
DBUG_PRINT("info", ("no table, drop by name alone"));
|
DBUG_PRINT("info", ("no table err=%d, drop by name alone", m_error.code));
|
||||||
evnt = new NdbEventImpl();
|
evnt = new NdbEventImpl();
|
||||||
evnt->setName(eventName);
|
evnt->setName(eventName);
|
||||||
}
|
}
|
||||||
@@ -3644,7 +3646,17 @@ NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt)
|
|||||||
(void)dropEvent(bename);
|
(void)dropEvent(bename);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// could loop over MAX_ATTRIBUTES_IN_TABLE ...
|
// loop over MAX_ATTRIBUTES_IN_TABLE ...
|
||||||
|
Uint32 i;
|
||||||
|
for (i = 0; i < MAX_ATTRIBUTES_IN_TABLE; i++) {
|
||||||
|
char bename[MAX_TAB_NAME_SIZE];
|
||||||
|
// XXX should get name from NdbBlob
|
||||||
|
sprintf(bename, "NDB$BLOBEVENT_%s_%u", evnt.getName(), i);
|
||||||
|
NdbEventImpl* bevnt = new NdbEventImpl();
|
||||||
|
bevnt->setName(bename);
|
||||||
|
(void)m_receiver.dropEvent(*bevnt);
|
||||||
|
delete bevnt;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
DBUG_RETURN(0);
|
DBUG_RETURN(0);
|
||||||
}
|
}
|
||||||
@@ -4631,6 +4643,30 @@ NdbDictInterface::parseFileInfo(NdbFileImpl &dst,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XXX temp
|
||||||
|
void
|
||||||
|
NdbDictionaryImpl::fix_blob_events(const NdbDictionary::Table* table, const char* ev_name)
|
||||||
|
{
|
||||||
|
const NdbTableImpl& t = table->m_impl;
|
||||||
|
const NdbEventImpl* ev = getEvent(ev_name);
|
||||||
|
assert(ev != NULL && ev->m_tableImpl == &t);
|
||||||
|
Uint32 i;
|
||||||
|
for (i = 0; i < t.m_columns.size(); i++) {
|
||||||
|
assert(t.m_columns[i] != NULL);
|
||||||
|
const NdbColumnImpl& c = *t.m_columns[i];
|
||||||
|
if (! c.getBlobType() || c.getPartSize() == 0)
|
||||||
|
continue;
|
||||||
|
char bename[200];
|
||||||
|
NdbBlob::getBlobEventName(bename, ev, &c);
|
||||||
|
// following fixes dict cache blob table
|
||||||
|
NdbEventImpl* bev = getEvent(bename);
|
||||||
|
if (c.m_blobTable != bev->m_tableImpl) {
|
||||||
|
// XXX const violation
|
||||||
|
((NdbColumnImpl*)&c)->m_blobTable = bev->m_tableImpl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
template class Vector<int>;
|
template class Vector<int>;
|
||||||
template class Vector<Uint16>;
|
template class Vector<Uint16>;
|
||||||
template class Vector<Uint32>;
|
template class Vector<Uint32>;
|
||||||
|
@@ -592,6 +592,9 @@ public:
|
|||||||
|
|
||||||
NdbDictInterface m_receiver;
|
NdbDictInterface m_receiver;
|
||||||
Ndb & m_ndb;
|
Ndb & m_ndb;
|
||||||
|
|
||||||
|
// XXX temp
|
||||||
|
void fix_blob_events(const NdbDictionary::Table* table, const char* ev_name);
|
||||||
private:
|
private:
|
||||||
NdbIndexImpl * getIndexImpl(const char * name,
|
NdbIndexImpl * getIndexImpl(const char * name,
|
||||||
const BaseString& internalName);
|
const BaseString& internalName);
|
||||||
|
Reference in New Issue
Block a user