Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions storage/innobase/include/trx0purge.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ Created 3/26/1996 Heikki Tuuri
Remove the undo log segment from the rseg slot if it is too big for reuse.
@param[in] trx transaction
@param[in,out] undo undo log
@param[in,out] mtr mini-transaction */
@param[in,out] mtr mini-transaction
@param[in] end transaction serialisation number */
void
trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr);
trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr,
trx_id_t end);

/**
Remove unnecessary history data from rollback segments. NOTE that when this
Expand Down
218 changes: 148 additions & 70 deletions storage/innobase/include/trx0sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,6 @@ struct rw_trx_hash_element_t


trx_id_t id; /* lf_hash_init() relies on this to be first in the struct */

/**
Transaction serialization number.

Assigned shortly before the transaction is moved to COMMITTED_IN_MEMORY
state. Initially set to TRX_ID_MAX.
*/
Atomic_counter<trx_id_t> no;
trx_t *trx;
srw_mutex mutex;
};
Expand Down Expand Up @@ -443,7 +435,6 @@ class rw_trx_hash_t
ut_ad(element->trx == 0);
element->trx= trx;
element->id= trx->id;
element->no= TRX_ID_MAX;
trx->rw_trx_hash_element= element;
}

Expand Down Expand Up @@ -512,7 +503,6 @@ class rw_trx_hash_t
if (element->trx)
validate_element(element->trx);
element->mutex.wr_unlock();
ut_ad(element->id < element->no);
return arg->action(element, arg->argument);
}
#endif
Expand Down Expand Up @@ -849,6 +839,119 @@ class thread_safe_trx_ilist_t
alignas(CPU_LEVEL1_DCACHE_LINESIZE) ilist<trx_t> trx_list;
};

/**
Active read-write transaction identifiers and serialisation numbers container.

Unlike rw_trx_hash_t, which is optimized for direct lookup, this
structure is optimized for compact storage and traversal of active
transactions by MVCC read view construction.

The vector may contain empty slots corresponding to idle or read-only
transactions that currently do not own an active read-write trx_id.
Such slots are skipped during traversal.
*/
class rw_trx_vector
{
struct rw_trx_id
{
Atomic_relaxed<trx_id_t> id{TRX_ID_MAX};
Atomic_relaxed<trx_id_t> no{TRX_ID_MAX};
trx_t *trx;
rw_trx_id(trx_t *t): trx(t) {}
};
Comment thread
svoj marked this conversation as resolved.
alignas(CPU_LEVEL1_DCACHE_LINESIZE)
std::vector<rw_trx_id, ut_allocator<rw_trx_id>>
ids{ut_allocator<rw_trx_id>(mem_key_trx_sys_t_rw_trx_ids)};
alignas(CPU_LEVEL1_DCACHE_LINESIZE) mutable srw_spin_lock_low latch;

public:
void assign_new_trx_no(const trx_t *trx, trx_id_t no) noexcept
{
latch.rd_lock();
ut_ad(trx->rw_trx_ids_slot < ids.size());
ut_ad(ids[trx->rw_trx_ids_slot].trx == trx);
ut_ad(ids[trx->rw_trx_ids_slot].id == trx->id);
ut_ad(ids[trx->rw_trx_ids_slot].no == TRX_ID_MAX);
ids[trx->rw_trx_ids_slot].no= no;
latch.rd_unlock();
}
trx_id_t snapshot_ids(trx_ids_t &view_ids,
const trx_id_t max_trx_id) const noexcept
{
trx_id_t min_trx_no{max_trx_id};
view_ids.clear();
latch.rd_lock();
view_ids.reserve(ids.size());
Comment thread
svoj marked this conversation as resolved.
for (const auto &it : ids)
{
trx_id_t id{it.id};
if (id < max_trx_id)
{
view_ids.push_back(id);
const trx_id_t no{it.no};
if (no < min_trx_no)
min_trx_no= no;
}
}
latch.rd_unlock();
return min_trx_no;
}
void register_rw(const trx_t *trx) noexcept
{
latch.rd_lock();
ut_ad(trx->rw_trx_ids_slot < ids.size());
ut_ad(ids[trx->rw_trx_ids_slot].trx == trx);
ut_ad(ids[trx->rw_trx_ids_slot].id == TRX_ID_MAX);
ut_ad(ids[trx->rw_trx_ids_slot].no == TRX_ID_MAX);
ids[trx->rw_trx_ids_slot].id= trx->id;
Comment thread
svoj marked this conversation as resolved.
latch.rd_unlock();
}
Comment thread
svoj marked this conversation as resolved.
void deregister_rw(const trx_t *trx) noexcept
{
latch.rd_lock();
ut_ad(trx->rw_trx_ids_slot < ids.size());
rw_trx_id &slot= ids[trx->rw_trx_ids_slot];
ut_ad(slot.trx == trx);
ut_ad(slot.id == trx->id);
slot.id= TRX_ID_MAX;
slot.no= TRX_ID_MAX;
latch.rd_unlock();
}
Comment thread
svoj marked this conversation as resolved.
void register_trx(trx_t *trx) noexcept
{
ut_ad(trx->rw_trx_ids_slot == std::numeric_limits<uint32_t>::max());
latch.wr_lock();
trx->rw_trx_ids_slot= static_cast<uint32_t>(ids.size());
ids.emplace_back(trx);
latch.wr_unlock();
}
void deregister_trx(trx_t *trx) noexcept
{
latch.wr_lock();
ut_ad(trx->rw_trx_ids_slot < ids.size());
ut_ad(ids[trx->rw_trx_ids_slot].trx == trx);
if (trx->rw_trx_ids_slot + 1 < ids.size())
{
trx_t *move_trx= ids.back().trx;
ids[trx->rw_trx_ids_slot]= std::move(ids.back());
move_trx->rw_trx_ids_slot= trx->rw_trx_ids_slot;
}
ids.pop_back();
latch.wr_unlock();
trx->rw_trx_ids_slot= std::numeric_limits<uint32_t>::max();
}
void create() noexcept
{
ut_ad(ids.size() == 0);
latch.init();
}
void destroy() noexcept
{
ut_ad(ids.size() == 0);
latch.destroy();
}
};

/** The transaction system central memory data structure. */
class trx_sys_t
{
Expand Down Expand Up @@ -876,6 +979,15 @@ class trx_sys_t
/** False if there is no undo log to purge or rollback */
bool undo_log_nonempty;
public:
/**
Collection of active read-write transaction identifiers and serialization
numbers used for MVCC snapshot creation.

This complements rw_trx_hash with a traversal-friendly representation
optimized for collecting active transaction ids.
*/
rw_trx_vector rw_trx_ids;

/** List of all transactions. */
thread_safe_trx_ilist_t trx_list;

Expand Down Expand Up @@ -1014,7 +1126,7 @@ class trx_sys_t
next call to trx_sys.get_new_trx_id()
*/

trx_id_t get_max_trx_id()
trx_id_t get_max_trx_id() const noexcept
{
return m_max_trx_id;
}
Expand All @@ -1037,7 +1149,7 @@ class trx_sys_t
Allocates and assigns new transaction serialisation number.

There's a gap between m_max_trx_id increment and transaction serialisation
number becoming visible through rw_trx_hash. While we're in this gap
number becoming visible through rw_trx_ids. While we're in this gap
concurrent thread may come and do MVCC snapshot without seeing allocated
but not yet assigned serialisation number. Then at some point purge thread
may clone this view. As a result it won't see newly allocated serialisation
Expand All @@ -1047,58 +1159,44 @@ class trx_sys_t
m_rw_trx_hash_version is intended to solve this problem. MVCC snapshot has
to wait until m_max_trx_id == m_rw_trx_hash_version, which effectively
means that all transaction serialisation numbers up to m_max_trx_id are
available through rw_trx_hash.
available through rw_trx_ids.

We rely on refresh_rw_trx_hash_version() to issue RELEASE memory barrier so
that m_rw_trx_hash_version increment happens after
trx->rw_trx_hash_element->no becomes visible through rw_trx_hash.
that m_rw_trx_hash_version increment happens after transaction serialisation
number becomes visible through rw_trx_ids.

@param trx transaction
*/
void assign_new_trx_no(trx_t *trx)
trx_id_t assign_new_trx_no(trx_t *trx)
{
trx->rw_trx_hash_element->no= get_new_trx_id_no_refresh();
trx_id_t no= get_new_trx_id_no_refresh();
rw_trx_ids.assign_new_trx_no(trx, no);
refresh_rw_trx_hash_version();
return no;
}


/**
Takes MVCC snapshot.

To reduce malloc probability we reserve rw_trx_hash.size() + 32 elements
in ids.

For details about get_rw_trx_hash_version() != get_max_trx_id() spin
@sa register_rw() and @sa assign_new_trx_no().

We rely on get_rw_trx_hash_version() to issue ACQUIRE memory barrier so
that loading of m_rw_trx_hash_version happens before accessing rw_trx_hash.

To optimise snapshot creation rw_trx_hash.iterate() is being used instead
of rw_trx_hash.iterate_no_dups(). It means that some transaction
identifiers may appear multiple times in ids.
that loading of m_rw_trx_hash_version happens before accessing rw_trx_ids.

@param[in,out] caller_trx used to get access to rw_trx_hash_pins
@param[out] ids array to store registered transaction identifiers
@param[out] max_trx_id variable to store m_max_trx_id value
@param[out] mix_trx_no variable to store min(no) value

@return min(no)
*/

void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id,
trx_id_t *min_trx_no)
trx_id_t snapshot_ids(trx_ids_t &ids, trx_id_t &max_trx_id) const noexcept
{
snapshot_ids_arg arg(ids);

while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id())
while ((max_trx_id= get_rw_trx_hash_version()) != get_max_trx_id())
ut_delay(1);
Comment thread
svoj marked this conversation as resolved.
arg.m_no= arg.m_id;

ids->clear();
ids->reserve(rw_trx_hash.size() + 32);
rw_trx_hash.iterate(caller_trx, copy_one_id, &arg);

*max_trx_id= arg.m_id;
*min_trx_no= arg.m_no;
return rw_trx_ids.snapshot_ids(ids, max_trx_id);
}


Expand Down Expand Up @@ -1149,7 +1247,7 @@ class trx_sys_t
Transaction becomes visible to MVCC.

There's a gap between m_max_trx_id increment and transaction becoming
visible through rw_trx_hash. While we're in this gap concurrent thread may
visible through rw_trx_ids. While we're in this gap concurrent thread may
come and do MVCC snapshot. As a result concurrent read view will be able to
observe records owned by this transaction even before it was committed.

Expand All @@ -1166,20 +1264,23 @@ class trx_sys_t
void register_rw(trx_t *trx)
{
trx->id= get_new_trx_id_no_refresh();
rw_trx_hash.insert(trx);
rw_trx_ids.register_rw(trx);
refresh_rw_trx_hash_version();
rw_trx_hash.insert(trx);
}


/**
Deregisters read-write transaction.

Transaction is removed from rw_trx_hash, which releases all implicit locks.
MVCC snapshot won't see this transaction anymore.
After this call the transaction is no longer visible as active to MVCC read
views created subsequently, and all implicit locks held by the transaction
have been released.
*/

void deregister_rw(trx_t *trx)
void deregister_rw(trx_t *trx) noexcept
{
rw_trx_ids.deregister_rw(trx);
rw_trx_hash.erase(trx);
}

Expand All @@ -1204,6 +1305,7 @@ class trx_sys_t
void register_trx(trx_t *trx)
{
trx_list.push_front(*trx);
rw_trx_ids.register_trx(trx);
}


Expand All @@ -1214,6 +1316,7 @@ class trx_sys_t
*/
void deregister_trx(trx_t *trx)
{
rw_trx_ids.deregister_trx(trx);
trx_list.remove(*trx);
}

Expand Down Expand Up @@ -1266,33 +1369,8 @@ class trx_sys_t
private:
static my_bool find_same_or_older_callback(void *el, void *i) noexcept;


struct snapshot_ids_arg
{
snapshot_ids_arg(trx_ids_t *ids): m_ids(ids) {}
trx_ids_t *m_ids;
trx_id_t m_id;
trx_id_t m_no;
};


static my_bool copy_one_id(void* el, void *a)
{
auto element= static_cast<const rw_trx_hash_element_t *>(el);
auto arg= static_cast<snapshot_ids_arg*>(a);
if (element->id < arg->m_id)
{
trx_id_t no= element->no;
arg->m_ids->push_back(element->id);
if (no < arg->m_no)
arg->m_no= no;
}
return 0;
}


/** Getter for m_rw_trx_hash_version, must issue ACQUIRE memory barrier. */
trx_id_t get_rw_trx_hash_version()
trx_id_t get_rw_trx_hash_version() const noexcept
{
return m_rw_trx_hash_version.load(std::memory_order_acquire);
}
Expand Down
2 changes: 2 additions & 0 deletions storage/innobase/include/trx0trx.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ struct trx_t : ilist_node<>


public:
/** trx_sys.rw_trx_ids index, protected by trx_sys.rw_trx_ids.latch */
uint32_t rw_trx_ids_slot;
/** Transaction identifier (0 if no locks were acquired).
Set by trx_sys_t::register_rw() or trx_resurrect() before
the transaction is added to trx_sys.rw_trx_hash.
Expand Down
1 change: 1 addition & 0 deletions storage/innobase/include/ut0new.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ extern PSI_memory_key mem_key_other;
extern PSI_memory_key mem_key_row_log_buf;
extern PSI_memory_key mem_key_row_merge_sort;
extern PSI_memory_key mem_key_std;
extern PSI_memory_key mem_key_trx_sys_t_rw_trx_ids;

/** Setup the internal objects needed for UT_NEW() to operate.
This must be called before the first call to UT_NEW(). */
Expand Down
2 changes: 1 addition & 1 deletion storage/innobase/read/read0read.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ For details see: row_undo_mod_sec_is_unsafe() and row_purge_poss_sec()
*/
inline void ReadViewBase::snapshot(trx_t *trx)
{
trx_sys.snapshot_ids(trx, &m_ids, &m_low_limit_id, &m_low_limit_no);
m_low_limit_no= trx_sys.snapshot_ids(m_ids, m_low_limit_id);
if (m_ids.empty())
{
m_up_limit_id= m_low_limit_id;
Expand Down
Loading
Loading