1
0
mirror of https://github.com/MariaDB/server.git synced 2025-07-30 16:24:05 +03:00

MDEV-16978 Application-time periods: WITHOUT OVERLAPS

* The overlaps check is implemented on a handler level per row command.
  It creates a separate cursor (actually, another handler instance) and
  caches it inside the original handler, when ha_update_row or
  ha_insert_row is issued. Cursor closes on unlocking the handler.

* Containing the same key in index means unique constraint violation
  even in usual terms. So we fetch left and right neighbours and check
  that they have same key prefix, excluding from the key only the period part.
  If it doesnt match, then there's no such neighbour, and the check passes.
  Otherwise, we check if this neighbour intersects with the considered key.

* The check does not introduce new error and fails with ER_DUPP_KEY error.
  This might break REPLACE workflow and should be fixed separately
This commit is contained in:
Nikita Malyavin
2019-11-26 19:22:04 +10:00
committed by Sergei Golubchik
parent 0515577d12
commit 259fb1cbed
20 changed files with 826 additions and 76 deletions

View File

@ -6734,6 +6734,116 @@ int handler::check_duplicate_long_entries_update(const uchar *new_rec)
}
int handler::ha_check_overlaps(const uchar *old_data, const uchar* new_data)
{
DBUG_ASSERT(new_data);
if (!table_share->period.unique_keys)
return 0;
if (table->versioned() && !table->vers_end_field()->is_max())
return 0;
bool is_update= old_data != NULL;
alloc_lookup_buffer();
auto *record_buffer= lookup_buffer + table_share->max_unique_length
+ table_share->null_fields;
auto *handler= this;
// handler->inited can be NONE on INSERT
if (handler->inited != NONE)
{
create_lookup_handler();
handler= lookup_handler;
// Needs to compare record refs later is old_row_found()
if (is_update)
position(old_data);
}
DBUG_ASSERT(!keyread_enabled());
int error= 0;
lookup_errkey= (uint)-1;
for (uint key_nr= 0; key_nr < table_share->keys && !error; key_nr++)
{
const KEY &key_info= table->key_info[key_nr];
const uint key_parts= key_info.user_defined_key_parts;
if (!key_info.without_overlaps)
continue;
if (is_update)
{
bool key_used= false;
for (uint k= 0; k < key_parts && !key_used; k++)
key_used= bitmap_is_set(table->write_set,
key_info.key_part[k].fieldnr - 1);
if (!key_used)
continue;
}
error= handler->ha_index_init(key_nr, 0);
if (error)
return error;
error= handler->ha_start_keyread(key_nr);
DBUG_ASSERT(!error);
const uint period_field_length= key_info.key_part[key_parts - 1].length;
const uint key_base_length= key_info.key_length - 2 * period_field_length;
key_copy(lookup_buffer, new_data, &key_info, 0);
/* Copy period_start to period_end.
the value in period_start field is not significant, but anyway let's leave
it defined to avoid uninitialized memory access
*/
memcpy(lookup_buffer + key_base_length,
lookup_buffer + key_base_length + period_field_length,
period_field_length);
/* Find row with period_end > (period_start of new_data) */
error = handler->ha_index_read_map(record_buffer, lookup_buffer,
key_part_map((1 << (key_parts - 1)) - 1),
HA_READ_AFTER_KEY);
if (!error && is_update)
{
/* In case of update it could happen that the nearest neighbour is
a record we are updating. It means, that there are no overlaps
from this side.
An assumption is made that during update we always have the last
fetched row in old_data. Therefore, comparing ref's is enough
*/
DBUG_ASSERT(handler != this);
DBUG_ASSERT(inited != NONE);
DBUG_ASSERT(ref_length == handler->ref_length);
handler->position(record_buffer);
if (memcmp(ref, handler->ref, ref_length) == 0)
error= handler->ha_index_next(record_buffer);
}
if (!error && table->check_period_overlaps(key_info, new_data, record_buffer))
error= HA_ERR_FOUND_DUPP_KEY;
if (error == HA_ERR_KEY_NOT_FOUND || error == HA_ERR_END_OF_FILE)
error= 0;
if (error == HA_ERR_FOUND_DUPP_KEY)
lookup_errkey= key_nr;
int end_error= handler->ha_end_keyread();
DBUG_ASSERT(!end_error);
end_error= handler->ha_index_end();
if (!error && end_error)
error= end_error;
}
return error;
}
/**
Check if galera disables binary logging for this table
@ -6831,6 +6941,10 @@ int handler::ha_write_row(const uchar *buf)
DBUG_ENTER("handler::ha_write_row");
DEBUG_SYNC_C("ha_write_row_start");
error= ha_check_overlaps(NULL, buf);
if (unlikely(error))
goto end;
MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();
increment_statistics(&SSV::ha_write_count);
@ -6862,6 +6976,7 @@ int handler::ha_write_row(const uchar *buf)
#endif /* WITH_WSREP */
}
end:
DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(error);
}
@ -6879,6 +6994,9 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
DBUG_ASSERT(new_data == table->record[0]);
DBUG_ASSERT(old_data == table->record[1]);
if ((error= ha_check_overlaps(old_data, new_data)))
return error;
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();
increment_statistics(&SSV::ha_update_count);