1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-24 19:42:23 +03:00

WL#5151: Conversion between different types when replicating

Row-based replication requires the types of columns on the
master and slave to be approximately the same (some safe
conversions between strings are allowed), but does not
allow safe conversions between fields of similar types such
as TINYINT and INT.

This patch implement type conversions between similar fields
on the master and slave.

The conversions are controlled using a new variable
SLAVE_TYPE_CONVERSIONS of type SET('ALL_LOSSY','ALL_NON_LOSSY').

Non-lossy conversions are any conversions that do not run the
risk of losing any information, while lossy conversions can
potentially truncate the value. The column definitions are
checked to decide if the conversion is acceptable.

If neither conversion is enabled, it is required that the
definitions of the columns are identical on master and slave.

Conversion is done by creating an internal conversion table,
unpacking the master data into it, and then copy the data to
the real table on the slave.
This commit is contained in:
Mats Kindahl
2009-12-14 12:04:55 +01:00
parent c7a02cf807
commit c63df11f37
39 changed files with 2893 additions and 464 deletions

View File

@ -59,6 +59,8 @@ const char field_separator=',';
#define ASSERT_COLUMN_MARKED_FOR_READ DBUG_ASSERT(!table || (!table->read_set || bitmap_is_set(table->read_set, field_index)))
#define ASSERT_COLUMN_MARKED_FOR_WRITE DBUG_ASSERT(!table || (!table->write_set || bitmap_is_set(table->write_set, field_index)))
#define FLAGSTR(S,F) ((S) & (F) ? #F " " : "")
/*
Rules for merging different types of fields in UNION
@ -996,6 +998,21 @@ test_if_important_data(CHARSET_INFO *cs, const char *str, const char *strend)
}
/**
Template function to compare two objects.
*/
namespace {
template <class A_type, class B_type>
int compare(A_type a, B_type b)
{
if (a < b)
return -1;
if (b < a)
return 1;
return 0;
}
}
/**
Detect Item_result by given field type of UNION merge result.
@ -1367,22 +1384,46 @@ bool Field::send_binary(Protocol *protocol)
/**
Check to see if field size is compatible with destination.
This method is used in row-based replication to verify that the slave's
field size is less than or equal to the master's field size. The
encoded field metadata (from the master or source) is decoded and compared
to the size of this field (the slave or destination).
This method is used in row-based replication to verify that the
slave's field size is less than or equal to the master's field
size. The encoded field metadata (from the master or source) is
decoded and compared to the size of this field (the slave or
destination).
@note
The comparison is made so that if the source data (from the master)
is less than the target data (on the slave), -1 is returned in @c
<code>*order_var</code>. This implies that a conversion is
necessary, but that it is lossy and can result in truncation of the
value.
If the source data is strictly greater than the target data, 1 is
returned in <code>*order_var</code>. This implies that the source
type can is contained in the target type and that a conversion is
necessary but is non-lossy.
If no conversion is required to fit the source type in the target
type, 0 is returned in <code>*order_var</code>.
@param field_metadata Encoded size in field metadata
@param order_var Pointer to variable where the order
between the source field and this field
will be returned.
@retval 0 if this field's size is < the source field's size
@retval 1 if this field's size is >= the source field's size
@return @c true if this field's size is compatible with the
master's field size, @c false otherwise.
*/
int Field::compatible_field_size(uint field_metadata,
const Relay_log_info *rli_arg __attribute__((unused)))
bool Field::compatible_field_size(uint field_metadata,
Relay_log_info *rli_arg __attribute__((unused)),
int *order_var)
{
uint const source_size= pack_length_from_metadata(field_metadata);
uint const destination_size= row_pack_length();
return (source_size <= destination_size);
DBUG_PRINT("debug", ("real_type: %d, source_size: %u, destination_size: %u",
real_type(), source_size, destination_size));
*order_var = compare(source_size, destination_size);
return true;
}
@ -2907,33 +2948,15 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata)
}
/**
Check to see if field size is compatible with destination.
This method is used in row-based replication to verify that the slave's
field size is less than or equal to the master's field size. The
encoded field metadata (from the master or source) is decoded and compared
to the size of this field (the slave or destination).
@param field_metadata Encoded size in field metadata
@retval 0 if this field's size is < the source field's size
@retval 1 if this field's size is >= the source field's size
*/
int Field_new_decimal::compatible_field_size(uint field_metadata,
const Relay_log_info * __attribute__((unused)))
bool Field_new_decimal::compatible_field_size(uint field_metadata,
Relay_log_info * __attribute__((unused)),
int *order_var)
{
int compatible= 0;
uint const source_precision= (field_metadata >> 8U) & 0x00ff;
uint const source_decimal= field_metadata & 0x00ff;
uint const source_size= my_decimal_get_binary_size(source_precision,
source_decimal);
uint const destination_size= row_pack_length();
compatible= (source_size <= destination_size);
if (compatible)
compatible= (source_precision <= precision) &&
(source_decimal <= decimals());
return (compatible);
int order= compare(source_precision, precision);
*order_var= order != 0 ? order : compare(source_decimal, dec);
return true;
}
@ -6707,8 +6730,10 @@ check_field_for_37426(const void *param_arg)
}
#endif
int Field_string::compatible_field_size(uint field_metadata,
const Relay_log_info *rli_arg)
bool
Field_string::compatible_field_size(uint field_metadata,
Relay_log_info *rli_arg,
int *order_var)
{
#ifdef HAVE_REPLICATION
const Check_field_param check_param = { this };
@ -6716,7 +6741,7 @@ int Field_string::compatible_field_size(uint field_metadata,
check_field_for_37426, &check_param))
return FALSE; // Not compatible field sizes
#endif
return Field::compatible_field_size(field_metadata, rli_arg);
return Field::compatible_field_size(field_metadata, rli_arg, order_var);
}
@ -6778,6 +6803,8 @@ uchar *Field_string::pack(uchar *to, const uchar *from,
{
uint length= min(field_length,max_length);
uint local_char_length= max_length/field_charset->mbmaxlen;
DBUG_PRINT("debug", ("Packing field '%s' - length: %u ", field_name, length));
if (length > local_char_length)
local_char_length= my_charpos(field_charset, from, from+length,
local_char_length);
@ -7625,6 +7652,7 @@ Field_blob::Field_blob(uchar *ptr_arg, uchar *null_ptr_arg, uchar null_bit_arg,
cs),
packlength(blob_pack_length)
{
DBUG_ASSERT(blob_pack_length <= 4); // Only pack lengths 1-4 supported currently
flags|= BLOB_FLAG;
share->blob_fields++;
/* TODO: why do not fill table->s->blob_field array here? */
@ -8035,8 +8063,10 @@ int Field_blob::key_cmp(const uchar *a,const uchar *b)
*/
int Field_blob::do_save_field_metadata(uchar *metadata_ptr)
{
DBUG_ENTER("Field_blob::do_save_field_metadata");
*metadata_ptr= pack_length_no_ptr();
return 1;
DBUG_PRINT("debug", ("metadata: %u (pack_length_no_ptr)", *metadata_ptr));
DBUG_RETURN(1);
}
@ -8977,6 +9007,9 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg,
bit_ptr(bit_ptr_arg), bit_ofs(bit_ofs_arg), bit_len(len_arg & 7),
bytes_in_rec(len_arg / 8)
{
DBUG_ENTER("Field_bit::Field_bit");
DBUG_PRINT("enter", ("ptr_arg: %p, null_ptr_arg: %p, len_arg: %u, bit_len: %u, bytes_in_rec: %u",
ptr_arg, null_ptr_arg, len_arg, bit_len, bytes_in_rec));
flags|= UNSIGNED_FLAG;
/*
Ensure that Field::eq() can distinguish between two different bit fields.
@ -8984,6 +9017,7 @@ Field_bit::Field_bit(uchar *ptr_arg, uint32 len_arg, uchar *null_ptr_arg,
*/
if (!null_ptr_arg)
null_bit= bit_ofs_arg;
DBUG_VOID_RETURN;
}
@ -9268,9 +9302,12 @@ uint Field_bit::get_key_image(uchar *buff, uint length, imagetype type_arg)
*/
int Field_bit::do_save_field_metadata(uchar *metadata_ptr)
{
DBUG_ENTER("Field_bit::do_save_field_metadata");
DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d",
bit_len, bytes_in_rec));
*metadata_ptr= bit_len;
*(metadata_ptr + 1)= bytes_in_rec;
return 2;
DBUG_RETURN(2);
}
@ -9295,34 +9332,20 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata)
}
/**
Check to see if field size is compatible with destination.
This method is used in row-based replication to verify that the slave's
field size is less than or equal to the master's field size. The
encoded field metadata (from the master or source) is decoded and compared
to the size of this field (the slave or destination).
@param field_metadata Encoded size in field metadata
@retval 0 if this field's size is < the source field's size
@retval 1 if this field's size is >= the source field's size
*/
int Field_bit::compatible_field_size(uint field_metadata,
const Relay_log_info * __attribute__((unused)))
bool
Field_bit::compatible_field_size(uint field_metadata,
Relay_log_info * __attribute__((unused)),
int *order_var)
{
int compatible= 0;
uint const source_size= pack_length_from_metadata(field_metadata);
uint const destination_size= row_pack_length();
uint const from_bit_len= field_metadata & 0x00ff;
uint const from_len= (field_metadata >> 8U) & 0x00ff;
if ((bit_len == 0) || (from_bit_len == 0))
compatible= (source_size <= destination_size);
else if (from_bit_len > bit_len)
compatible= (from_len < bytes_in_rec);
else
compatible= ((from_bit_len <= bit_len) && (from_len <= bytes_in_rec));
return (compatible);
DBUG_ENTER("Field_bit::compatible_field_size");
DBUG_ASSERT((field_metadata >> 16) == 0);
uint const from_bit_len=
8 * (field_metadata >> 8) + (field_metadata & 0xff);
uint const to_bit_len= max_display_length();
DBUG_PRINT("debug", ("from_bit_len: %u, to_bit_len: %u",
from_bit_len, to_bit_len));
*order_var= compare(from_bit_len, to_bit_len);
DBUG_RETURN(TRUE);
}
@ -9388,8 +9411,15 @@ const uchar *
Field_bit::unpack(uchar *to, const uchar *from, uint param_data,
bool low_byte_first __attribute__((unused)))
{
DBUG_ENTER("Field_bit::unpack");
DBUG_PRINT("enter", ("to: %p, from: %p, param_data: 0x%x",
to, from, param_data));
DBUG_PRINT("debug", ("bit_ptr: %p, bit_len: %u, bit_ofs: %u",
bit_ptr, bit_len, bit_ofs));
uint const from_len= (param_data >> 8U) & 0x00ff;
uint const from_bit_len= param_data & 0x00ff;
DBUG_PRINT("debug", ("from_len: %u, from_bit_len: %u",
from_len, from_bit_len));
/*
If the parameter data is zero (i.e., undefined), or if the master
and slave have the same sizes, then use the old unpack() method.
@ -9410,7 +9440,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data,
from++;
}
memcpy(to, from, bytes_in_rec);
return from + bytes_in_rec;
DBUG_RETURN(from + bytes_in_rec);
}
/*
@ -9436,7 +9466,7 @@ Field_bit::unpack(uchar *to, const uchar *from, uint param_data,
bitmap_set_bit(table->write_set,field_index);
store(value, new_len, system_charset_info);
my_afree(value);
return from + len;
DBUG_RETURN(from + len);
}
@ -9564,8 +9594,11 @@ void Create_field::create_length_to_internal_length(void)
*/
void Create_field::init_for_tmp_table(enum_field_types sql_type_arg,
uint32 length_arg, uint32 decimals_arg,
bool maybe_null, bool is_unsigned)
bool maybe_null, bool is_unsigned,
uint pack_length)
{
DBUG_ENTER("Create_field::init_for_tmp_table");
field_name= "";
sql_type= sql_type_arg;
char_length= length= length_arg;;
@ -9573,10 +9606,92 @@ void Create_field::init_for_tmp_table(enum_field_types sql_type_arg,
interval= 0;
charset= &my_charset_bin;
geom_type= Field::GEOM_GEOMETRY;
pack_flag= (FIELDFLAG_NUMBER |
((decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT) |
(maybe_null ? FIELDFLAG_MAYBE_NULL : 0) |
(is_unsigned ? 0 : FIELDFLAG_DECIMAL));
DBUG_PRINT("enter", ("sql_type: %d, length: %u, pack_length: %u",
sql_type_arg, length_arg, pack_length));
/*
These pack flags are crafted to get it correctly through the
branches of make_field().
*/
switch (sql_type_arg)
{
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_STRING:
case MYSQL_TYPE_SET:
pack_flag= 0;
break;
case MYSQL_TYPE_GEOMETRY:
pack_flag= FIELDFLAG_GEOM;
break;
case MYSQL_TYPE_ENUM:
pack_flag= FIELDFLAG_INTERVAL;
break;
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_NEWDECIMAL:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
pack_flag= FIELDFLAG_DECIMAL | FIELDFLAG_NUMBER |
(decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT;
break;
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
pack_flag= FIELDFLAG_BLOB;
break;
case MYSQL_TYPE_BIT:
pack_flag= FIELDFLAG_NUMBER | FIELDFLAG_TREAT_BIT_AS_CHAR;
break;
default:
pack_flag= FIELDFLAG_NUMBER;
break;
}
/*
Set the pack flag correctly for the blob-like types. This sets the
packtype to something that make_field can use. If the pack type is
not set correctly, the packlength will be reeeeally wierd (like
129 or so).
*/
switch (sql_type_arg)
{
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_GEOMETRY:
// If you are going to use the above types, you have to pass a
// pack_length as parameter. Assert that is really done.
DBUG_ASSERT(pack_length != ~0U);
pack_flag|= pack_length_to_packflag(pack_length);
break;
default:
/* Nothing */
break;
}
pack_flag|=
(maybe_null ? FIELDFLAG_MAYBE_NULL : 0) |
(is_unsigned ? 0 : FIELDFLAG_DECIMAL);
DBUG_PRINT("debug", ("pack_flag: %s%s%s%s%s, pack_type: %d",
FLAGSTR(pack_flag, FIELDFLAG_BINARY),
FLAGSTR(pack_flag, FIELDFLAG_NUMBER),
FLAGSTR(pack_flag, FIELDFLAG_INTERVAL),
FLAGSTR(pack_flag, FIELDFLAG_GEOM),
FLAGSTR(pack_flag, FIELDFLAG_BLOB),
f_packtype(pack_flag)));
DBUG_VOID_RETURN;
}
@ -10073,6 +10188,14 @@ Field *make_field(TABLE_SHARE *share, uchar *ptr, uint32 field_length,
default: break;
}
DBUG_PRINT("debug", ("field_type: %d, field_length: %u, interval: %p, pack_flag: %s%s%s%s%s",
field_type, field_length, interval,
FLAGSTR(pack_flag, FIELDFLAG_BINARY),
FLAGSTR(pack_flag, FIELDFLAG_INTERVAL),
FLAGSTR(pack_flag, FIELDFLAG_NUMBER),
FLAGSTR(pack_flag, FIELDFLAG_PACK),
FLAGSTR(pack_flag, FIELDFLAG_BLOB)));
if (f_is_alpha(pack_flag))
{
if (!f_is_packed(pack_flag))