diff --git a/model/base.go b/model/base.go new file mode 100644 index 0000000..bcab6c1 --- /dev/null +++ b/model/base.go @@ -0,0 +1,96 @@ +package model + +import ( + "time" + + "github.com/sjmudd/ps-top/config" +) + +// BaseCollector encapsulates the common collection state and logic for all models. +// T is the row type (e.g., tableio.Row, memoryusage.Row) +// R is a slice of T (e.g., []Row, or a named type like Rows) +type BaseCollector[T any, R ~[]T] struct { + config *config.Config + db QueryExecutor + + FirstCollected time.Time + LastCollected time.Time + First R // baseline snapshot + Last R // most recent raw data collection + Results R // processed results (after subtraction, etc.) + Totals T // totals row computed from results + process ProcessFunc[T, R] +} + +// NewBaseCollector creates a new BaseCollector with the given config, database, and process function. +func NewBaseCollector[T any, R ~[]T](cfg *config.Config, db QueryExecutor, process ProcessFunc[T, R]) *BaseCollector[T, R] { + return &BaseCollector[T, R]{ + config: cfg, + db: db, + process: process, + } +} + +// ProcessFunc defines the transformation from raw data to displayable results. +// It receives the last collected data and the baseline (first) data, +// and returns the processed results along with their totals. +type ProcessFunc[T any, R ~[]T] func(last, first R) (results R, totals T) + +// WantRefreshFunc determines whether the baseline should be refreshed +// (i.e., copy last to first) based on current state. +type WantRefreshFunc = func() bool + +// FetchFunc retrieves raw data from the database. +type FetchFunc[R any] func() (R, error) + +// Collect orchestrates a full collection cycle: +// 1. Fetch raw data via fetchFunc +// 2. Optionally refresh baseline if wantRefresh returns true +// 3. Process data via the stored process function to produce results and totals +func (bc *BaseCollector[T, R]) Collect( + fetch FetchFunc[R], + wantRefresh WantRefreshFunc, +) { + // Fetch the latest data + last, err := fetch() + if err != nil { + // TODO: log error? For now, skip this collection cycle. + return + } + + // Update last snapshot and timestamp + bc.Last = last + bc.LastCollected = time.Now() + if bc.FirstCollected.IsZero() { + bc.FirstCollected = bc.LastCollected + } + + // Refresh baseline if needed (e.g., on first collection or wrap-around) + if wantRefresh() { + bc.First = make(R, len(last)) + copy(bc.First, last) + bc.FirstCollected = bc.LastCollected + } + + // Process results and compute totals using the stored process function + bc.Results, bc.Totals = bc.process(bc.Last, bc.First) +} + +// ResetStatistics sets the baseline to the last collected values. +// This is used when the user requests a manual reset. +func (bc *BaseCollector[T, R]) ResetStatistics() { + bc.First = make(R, len(bc.Last)) + copy(bc.First, bc.Last) + bc.FirstCollected = bc.LastCollected + bc.Results, bc.Totals = bc.process(bc.Last, bc.First) +} + +// Config returns the collector's configuration +func (bc *BaseCollector[T, R]) Config() *config.Config { + return bc.config +} + +// DB returns the QueryExecutor (for use in fetch functions) +func (bc *BaseCollector[T, R]) DB() QueryExecutor { + return bc.db +} diff --git a/model/fileinfo/fileinfo.go b/model/fileinfo/fileinfo.go index ed59111..eef6144 100644 --- a/model/fileinfo/fileinfo.go +++ b/model/fileinfo/fileinfo.go @@ -2,78 +2,49 @@ package fileinfo import ( - "database/sql" - "log" - "time" - "github.com/sjmudd/ps-top/config" - "github.com/sjmudd/ps-top/utils" + "github.com/sjmudd/ps-top/model" ) // FileIoLatency represents the contents of the data collected from file_summary_by_instance type FileIoLatency struct { - config *config.Config - FirstCollected time.Time // the first collection time (for relative data) - LastCollected time.Time // the last collection time - first Rows - last Rows - Results Rows - Totals Row - db *sql.DB + *model.BaseCollector[Row, Rows] } // NewFileSummaryByInstance creates a new structure and include various variable values: // - datadir, relay_log // There's no checking that these are actually provided! -func NewFileSummaryByInstance(cfg *config.Config, db *sql.DB) *FileIoLatency { - fiol := &FileIoLatency{ - db: db, - config: cfg, +func NewFileSummaryByInstance(cfg *config.Config, db model.QueryExecutor) *FileIoLatency { + process := func(last, first Rows) (Rows, Row) { + results := make(Rows, len(last)) + copy(results, last) + if cfg.WantRelativeStats() { + results.subtract(first) + } + tot := totals(results) + return results, tot } - - return fiol -} - -// ResetStatistics resets the statistics to last values -func (fiol *FileIoLatency) ResetStatistics() { - fiol.first = utils.DuplicateSlice(fiol.last) - fiol.FirstCollected = fiol.LastCollected - - fiol.calculate() + bc := model.NewBaseCollector[Row, Rows](cfg, db, process) + return &FileIoLatency{BaseCollector: bc} } // Collect data from the db, then merge it in. func (fiol *FileIoLatency) Collect() { - start := time.Now() - fiol.last = FileInfo2MySQLNames( - fiol.config.Variables().Get("datadir"), - fiol.config.Variables().Get("relaylog"), - collect(fiol.db), - ) - fiol.LastCollected = time.Now() - - // copy in first data if it was not there - // or check for reload initial characteristics - if (len(fiol.first) == 0 && len(fiol.last) > 0) || fiol.first.needsRefresh(fiol.last) { - fiol.first = utils.DuplicateSlice(fiol.last) - fiol.FirstCollected = fiol.LastCollected + bc := fiol.BaseCollector + fetch := func() (Rows, error) { + raw := collect(bc.DB()) + // Apply transformation using config variables + transformed := FileInfo2MySQLNames( + bc.Config().Variables().Get("datadir"), + bc.Config().Variables().Get("relaylog"), + raw, + ) + return transformed, nil } - - fiol.calculate() - - log.Println("fiol.first.totals():", totals(fiol.first)) - log.Println("fiol.last.totals():", totals(fiol.last)) - log.Println("FileIoLatency.Collect() took:", time.Since(start)) -} - -func (fiol *FileIoLatency) calculate() { - fiol.Results = utils.DuplicateSlice(fiol.last) - - if fiol.config.WantRelativeStats() { - fiol.Results.subtract(fiol.first) + wantRefresh := func() bool { + return (len(bc.First) == 0 && len(bc.Last) > 0) || totals(bc.First).SumTimerWait > totals(bc.Last).SumTimerWait } - - fiol.Totals = totals(fiol.Results) + bc.Collect(fetch, wantRefresh) } // HaveRelativeStats is true for this object @@ -81,7 +52,7 @@ func (fiol FileIoLatency) HaveRelativeStats() bool { return true } -// WantRelativeStats +// WantRelativeStats returns the config setting. func (fiol FileIoLatency) WantRelativeStats() bool { - return fiol.config.WantRelativeStats() + return fiol.Config().WantRelativeStats() } diff --git a/model/fileinfo/rows.go b/model/fileinfo/rows.go index e7754b4..a2c525c 100644 --- a/model/fileinfo/rows.go +++ b/model/fileinfo/rows.go @@ -3,10 +3,10 @@ package fileinfo import ( - "database/sql" "time" "github.com/sjmudd/ps-top/log" + "github.com/sjmudd/ps-top/model" ) // Config provides an interface for getting a configuration value from a key/value store @@ -46,7 +46,7 @@ func (rows Rows) Valid() bool { } // Select the raw data from the database into Rows -func collect(db *sql.DB) Rows { +func collect(db model.QueryExecutor) Rows { log.Println("collect() starts") var t Rows start := time.Now() @@ -149,9 +149,3 @@ func (rows *Rows) subtract(initial Rows) { log.Println("WARNING: END") } } - -// if the data in t2 is "newer", "has more values" than t then it needs refreshing. -// check this by comparing totals. -func (rows Rows) needsRefresh(otherRows Rows) bool { - return totals(rows).SumTimerWait > totals(otherRows).SumTimerWait -} diff --git a/model/memoryusage/memoryusage.go b/model/memoryusage/memoryusage.go index 561af8d..0bfe7ad 100644 --- a/model/memoryusage/memoryusage.go +++ b/model/memoryusage/memoryusage.go @@ -3,75 +3,51 @@ package memoryusage import ( - "database/sql" - "time" - - _ "github.com/go-sql-driver/mysql" // keep golint happy - "github.com/sjmudd/ps-top/config" + "github.com/sjmudd/ps-top/model" ) // MemoryUsage represents a table of rows type MemoryUsage struct { - config *config.Config - FirstCollected time.Time // the first collection time (for relative data) - LastCollected time.Time // the last collection time - last []Row // last loaded values - Results []Row // results (maybe with subtraction) - Totals Row // totals of results - db *sql.DB + *model.BaseCollector[Row, []Row] } // NewMemoryUsage returns a pointer to a MemoryUsage struct -func NewMemoryUsage(cfg *config.Config, db *sql.DB) *MemoryUsage { - mu := &MemoryUsage{ - db: db, - config: cfg, +func NewMemoryUsage(cfg *config.Config, db model.QueryExecutor) *MemoryUsage { + process := func(last, _ []Row) ([]Row, Row) { + results := make([]Row, len(last)) + copy(results, last) + tot := totals(results) + return results, tot } - - return mu + bc := model.NewBaseCollector[Row, []Row](cfg, db, process) + return &MemoryUsage{BaseCollector: bc} } // Collect data from the db, no merging needed -// DEPRECATED func (mu *MemoryUsage) Collect() { - mu.AddRows(collect(mu.db)) -} - -// AddRows takes an new set of rows to be added to the dataset -func (mu *MemoryUsage) AddRows(rows []Row) { - mu.last = rows - mu.LastCollected = time.Now() - - mu.calculate() -} - -// ResetStatistics resets the statistics to current values -func (mu *MemoryUsage) ResetStatistics() { - - mu.calculate() + bc := mu.BaseCollector + fetch := func() ([]Row, error) { + return collect(bc.DB()), nil + } + wantRefresh := func() bool { + // MemoryUsage does not support relative stats, so always refresh baseline to current + return true + } + bc.Collect(fetch, wantRefresh) } // Rows returns the rows we have which are interesting -func (mu MemoryUsage) Rows() []Row { - rows := make([]Row, 0, len(mu.Results)) - rows = append(rows, mu.Results...) - - return rows +func (mu *MemoryUsage) Rows() []Row { + return mu.Results } // HaveRelativeStats returns if the values returned are relative to a previous collection -func (mu MemoryUsage) HaveRelativeStats() bool { +func (mu *MemoryUsage) HaveRelativeStats() bool { return false } -// WantRelativeStats -func (mu MemoryUsage) WantRelativeStats() bool { - return mu.config.WantRelativeStats() -} - -func (mu *MemoryUsage) calculate() { - mu.Results = make([]Row, len(mu.last)) - copy(mu.Results, mu.last) - mu.Totals = totals(mu.Results) +// WantRelativeStats returns whether relative stats are desired based on config +func (mu *MemoryUsage) WantRelativeStats() bool { + return mu.Config().WantRelativeStats() } diff --git a/model/memoryusage/row.go b/model/memoryusage/row.go index 27dd26c..e75dddf 100644 --- a/model/memoryusage/row.go +++ b/model/memoryusage/row.go @@ -3,12 +3,12 @@ package memoryusage import ( - "database/sql" "fmt" "github.com/go-sql-driver/mysql" "github.com/sjmudd/ps-top/log" + "github.com/sjmudd/ps-top/model" ) /* This table does not exist in MySQL 5.6 @@ -103,7 +103,7 @@ func sqlErrorHandler(err error) bool { } // Select the raw data from the database -func collect(db *sql.DB) []Row { +func collect(db model.QueryExecutor) []Row { var t []Row var skip bool diff --git a/model/mutexlatency/mutexlatency.go b/model/mutexlatency/mutexlatency.go index 043e4dd..b10c11a 100644 --- a/model/mutexlatency/mutexlatency.go +++ b/model/mutexlatency/mutexlatency.go @@ -1,86 +1,48 @@ -// Package mutexlatency provides library routines for ps-top. +// Package mutexlatency provides library routines for ps-top // for managing the events_waits_summary_global_by_event_name table. package mutexlatency import ( - "database/sql" - "log" - "time" - "github.com/sjmudd/ps-top/config" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/common" - "github.com/sjmudd/ps-top/utils" ) // MutexLatency holds a table of rows type MutexLatency struct { - config *config.Config - FirstCollected time.Time - LastCollected time.Time - first Rows // initial data for relative values - last Rows // last loaded values - Results Rows // results (maybe with subtraction) - Totals Row // totals of results - db *sql.DB + *model.BaseCollector[Row, Rows] } -// NewMutexLatency returns a mutex latency object using given config and db -func NewMutexLatency(cfg *config.Config, db *sql.DB) *MutexLatency { - log.Println("NewMutexLatency()") - if cfg == nil { - log.Println("NewMutexLatency() cfg == nil!") - } - ml := &MutexLatency{ - config: cfg, - db: db, +// NewMutexLatency creates a new MutexLatency instance. +func NewMutexLatency(cfg *config.Config, db model.QueryExecutor) *MutexLatency { + process := func(last, first Rows) (Rows, Row) { + results := make(Rows, len(last)) + copy(results, last) + if cfg.WantRelativeStats() { + common.SubtractByName(&results, first, + func(r Row) string { return r.Name }, + func(r *Row, o Row) { r.subtract(o) }, + ) + } + tot := totals(results) + return results, tot } - - return ml + bc := model.NewBaseCollector[Row, Rows](cfg, db, process) + return &MutexLatency{BaseCollector: bc} } // Collect collects data from the db, updating first // values if needed, and then subtracting first values if we want // relative values, after which it stores totals. func (ml *MutexLatency) Collect() { - start := time.Now() - - ml.last = collect(ml.db) - ml.LastCollected = time.Now() - - // check if no first data or we need to reload initial characteristics - if (len(ml.first) == 0 && len(ml.last) > 0) || totals(ml.first).SumTimerWait > totals(ml.last).SumTimerWait { - ml.first = utils.DuplicateSlice(ml.last) - ml.FirstCollected = ml.LastCollected + bc := ml.BaseCollector + fetch := func() (Rows, error) { + return collect(bc.DB()), nil } - - ml.calculate() - - log.Println("t.initial.totals():", totals(ml.first)) - log.Println("t.current.totals():", totals(ml.last)) - log.Println("MutexLatency.Collect() END, took:", time.Since(start).String()) -} - -func (ml *MutexLatency) calculate() { - // log.Println( "- t.results set from t.current" ) - ml.Results = make(Rows, len(ml.last)) - copy(ml.Results, ml.last) - if ml.config.WantRelativeStats() { - // log.Println( "- subtracting t.initial from t.results as WantRelativeStats()" ) - common.SubtractByName(&ml.Results, ml.first, - func(r Row) string { return r.Name }, - func(r *Row, o Row) { r.subtract(o) }, - ) + wantRefresh := func() bool { + return (len(bc.First) == 0 && len(bc.Last) > 0) || totals(bc.First).SumTimerWait > totals(bc.Last).SumTimerWait } - - ml.Totals = totals(ml.Results) -} - -// ResetStatistics resets the statistics to current values -func (ml *MutexLatency) ResetStatistics() { - ml.first = utils.DuplicateSlice(ml.last) - ml.FirstCollected = ml.LastCollected - - ml.calculate() + bc.Collect(fetch, wantRefresh) } // HaveRelativeStats is true for this object @@ -88,7 +50,7 @@ func (ml MutexLatency) HaveRelativeStats() bool { return true } -// WantRelativeStats +// WantRelativeStats returns the config setting. func (ml MutexLatency) WantRelativeStats() bool { - return ml.config.WantRelativeStats() + return ml.Config().WantRelativeStats() } diff --git a/model/mutexlatency/rows.go b/model/mutexlatency/rows.go index 7f6de6c..7ef6dfb 100644 --- a/model/mutexlatency/rows.go +++ b/model/mutexlatency/rows.go @@ -3,9 +3,8 @@ package mutexlatency import ( - "database/sql" - "github.com/sjmudd/ps-top/log" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/common" ) @@ -23,7 +22,7 @@ func totals(rows Rows) Row { return total } -func collect(db *sql.DB) Rows { +func collect(db model.QueryExecutor) Rows { const prefix = "wait/synch/" var t Rows diff --git a/model/processlist/processlist.go b/model/processlist/processlist.go index baeeca8..8b199bd 100644 --- a/model/processlist/processlist.go +++ b/model/processlist/processlist.go @@ -7,12 +7,13 @@ import ( "github.com/sjmudd/anonymiser" "github.com/sjmudd/ps-top/log" + "github.com/sjmudd/ps-top/model" ) const selectCountPSProcesslistTableSQL = `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'performance_schema' and table_name = 'processlist'` // Do we have P_S.processlist table? -func HavePerformanceSchema(db *sql.DB) (bool, error) { +func HavePerformanceSchema(db model.QueryExecutor) (bool, error) { var count int if err := db.QueryRow(selectCountPSProcesslistTableSQL).Scan(&count); err != nil { @@ -39,7 +40,7 @@ type Row struct { } // Return the output of P_S or I_S.PROCESSLIST -func Collect(db *sql.DB) []Row { +func Collect(db model.QueryExecutor) []Row { // we collect all information even if it's mainly empty as we may reference it later const ( InformationSchemaQuery = "SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, INFO FROM INFORMATION_SCHEMA.PROCESSLIST" diff --git a/model/queryexecutor.go b/model/queryexecutor.go new file mode 100644 index 0000000..06ea88d --- /dev/null +++ b/model/queryexecutor.go @@ -0,0 +1,12 @@ +package model + +import "database/sql" + +// QueryExecutor abstracts database query operations. +// It allows models to work with any database-like entity, not just *sql.DB. +// This improves testability because tests can provide a mock implementation. +type QueryExecutor interface { + Query(query string, args ...interface{}) (*sql.Rows, error) + QueryRow(query string, args ...interface{}) *sql.Row + Exec(query string, args ...interface{}) (sql.Result, error) +} diff --git a/model/stageslatency/rows.go b/model/stageslatency/rows.go index f0e220f..a634056 100644 --- a/model/stageslatency/rows.go +++ b/model/stageslatency/rows.go @@ -1,9 +1,8 @@ package stageslatency import ( - "database/sql" - "github.com/sjmudd/ps-top/log" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/common" ) @@ -11,7 +10,7 @@ import ( type Rows []Row // select the rows into table -func collect(db *sql.DB) Rows { +func collect(db model.QueryExecutor) Rows { var t Rows log.Println("events_stages_summary_global_by_event_name.collect()") diff --git a/model/stageslatency/stageslatency.go b/model/stageslatency/stageslatency.go index c2d9844..e0e6966 100644 --- a/model/stageslatency/stageslatency.go +++ b/model/stageslatency/stageslatency.go @@ -2,113 +2,46 @@ package stageslatency import ( - "database/sql" - "log" - "time" - "github.com/sjmudd/ps-top/config" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/common" - "github.com/sjmudd/ps-top/utils" ) -/* - -root@localhost [performance_schema]> select * from events_stages_summary_global_by_event_name where sum_timer_wait > 0; -+--------------------------------+------------+----------------+----------------+----------------+----------------+ -| EVENT_NAME | COUNT_STAR | SUM_TIMER_WAIT | MIN_TIMER_WAIT | AVG_TIMER_WAIT | MAX_TIMER_WAIT | -+--------------------------------+------------+----------------+----------------+----------------+----------------+ -| stage/sql/After create | 3 | 21706000 | 558000 | 7235000 | 11693000 | -| stage/sql/checking permissions | 5971 | 92553236000 | 406000 | 15500000 | 12727728000 | -| stage/sql/cleaning up | 6531 | 4328103000 | 154000 | 662000 | 23464000 | -| stage/sql/closing tables | 4281 | 18303106000 | 118000 | 4275000 | 71505000 | -| stage/sql/Creating sort index | 2 | 31300648000 | 14183237000 | 15650324000 | 17117411000 | -| stage/sql/creating table | 2 | 138276471000 | 64077127000 | 69138235000 | 74199344000 | -| stage/sql/end | 4254 | 9940694000 | 220000 | 2336000 | 42683000 | -| stage/sql/executing | 1256 | 252300800000 | 151000 | 200876000 | 59564212000 | -| stage/sql/freeing items | 3733 | 83966405000 | 5341000 | 22493000 | 2527549000 | -| stage/sql/init | 4256 | 63836793000 | 1990000 | 14999000 | 7656920000 | -| stage/sql/Opening tables | 6002 | 1489653915000 | 1411000 | 248192000 | 216300236000 | -| stage/sql/optimizing | 1257 | 2685426016000 | 255000 | 2136377000 | 2656149827000 | -| stage/sql/preparing | 1166 | 8626237000 | 1666000 | 7398000 | 91804000 | -| stage/sql/query end | 4280 | 37299265000 | 411000 | 8714000 | 12018400000 | -| stage/sql/removing tmp table | 1187 | 11890909000 | 1838000 | 10017000 | 2365358000 | -| stage/sql/Sending data | 1165 | 3071893676000 | 2925000 | 2636818000 | 63354201000 | -| stage/sql/Sorting result | 2 | 4128000 | 1930000 | 2064000 | 2198000 | -| stage/sql/statistics | 1166 | 26655651000 | 2078000 | 22860000 | 8446818000 | -| stage/sql/System lock | 4263 | 1901250693000 | 584000 | 445988000 | 1651181465000 | -| stage/sql/update | 4 | 8246608000 | 78145000 | 2061652000 | 7597263000 | -| stage/sql/updating | 2994 | 1608420140000 | 285867000 | 537214000 | 15651495000 | -| stage/sql/starting | 6532 | 364087027000 | 2179000 | 55738000 | 23420395000 | -+--------------------------------+------------+----------------+----------------+----------------+----------------+ -22 rows in set (0.01 sec) - -*/ - // StagesLatency provides a public view of object type StagesLatency struct { - config *config.Config - FirstCollected time.Time - LastCollected time.Time - first Rows // initial data for relative values - last Rows // last loaded values - Results Rows // results (maybe with subtraction) - Totals Row // totals of results - db *sql.DB + *model.BaseCollector[Row, Rows] } -// NewStagesLatency returns a stageslatency StagesLatency -func NewStagesLatency(cfg *config.Config, db *sql.DB) *StagesLatency { - log.Println("NewStagesLatency()") - sl := &StagesLatency{ - config: cfg, - db: db, +// NewStagesLatency creates a new StagesLatency instance. +func NewStagesLatency(cfg *config.Config, db model.QueryExecutor) *StagesLatency { + process := func(last, first Rows) (Rows, Row) { + results := make(Rows, len(last)) + copy(results, last) + if cfg.WantRelativeStats() { + common.SubtractByName(&results, first, + func(r Row) string { return r.Name }, + func(r *Row, o Row) { r.subtract(o) }, + ) + } + tot := totals(results) + return results, tot } - - return sl + bc := model.NewBaseCollector[Row, Rows](cfg, db, process) + return &StagesLatency{BaseCollector: bc} } // Collect collects data from the db, updating initial // values if needed, and then subtracting initial values if we want // relative values, after which it stores totals. func (sl *StagesLatency) Collect() { - start := time.Now() - sl.last = collect(sl.db) - sl.LastCollected = time.Now() - log.Println("t.current collected", len(sl.last), "row(s) from SELECT") - - // check if we need to update first or we need to reload initial characteristics - if (len(sl.first) == 0 && len(sl.last) > 0) || totals(sl.first).SumTimerWait > totals(sl.last).SumTimerWait { - sl.first = utils.DuplicateSlice(sl.last) - sl.FirstCollected = sl.LastCollected + bc := sl.BaseCollector + fetch := func() (Rows, error) { + return collect(bc.DB()), nil } - - sl.calculate() - - log.Println("t.initial.totals():", totals(sl.first)) - log.Println("t.current.totals():", totals(sl.last)) - log.Println("Table_io_waits_summary_by_table.Collect() END, took:", time.Since(start).String()) -} - -// ResetStatistics resets the statistics to current values -func (sl *StagesLatency) ResetStatistics() { - sl.first = utils.DuplicateSlice(sl.last) - sl.FirstCollected = sl.LastCollected - - sl.calculate() -} - -// generate the results and totals and sort data -func (sl *StagesLatency) calculate() { - // log.Println( "- t.results set from t.current" ) - sl.Results = make(Rows, len(sl.last)) - copy(sl.Results, sl.last) - if sl.config.WantRelativeStats() { - common.SubtractByName(&sl.Results, sl.first, - func(r Row) string { return r.Name }, - func(r *Row, o Row) { r.subtract(o) }, - ) + wantRefresh := func() bool { + return (len(bc.First) == 0 && len(bc.Last) > 0) || totals(bc.First).SumTimerWait > totals(bc.Last).SumTimerWait } - sl.Totals = totals(sl.Results) + bc.Collect(fetch, wantRefresh) } // HaveRelativeStats is true for this object @@ -116,7 +49,7 @@ func (sl StagesLatency) HaveRelativeStats() bool { return true } -// WantRelativeStats +// WantRelativeStats returns the config setting. func (sl StagesLatency) WantRelativeStats() bool { - return sl.config.WantRelativeStats() + return sl.Config().WantRelativeStats() } diff --git a/model/tableio/rows.go b/model/tableio/rows.go index 3bf4a9b..008cb1d 100644 --- a/model/tableio/rows.go +++ b/model/tableio/rows.go @@ -3,10 +3,10 @@ package tableio import ( - "database/sql" "fmt" "github.com/sjmudd/ps-top/log" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/common" "github.com/sjmudd/ps-top/model/filter" "github.com/sjmudd/ps-top/utils" @@ -39,7 +39,7 @@ func totals(rows Rows) Row { return total } -func collect(db *sql.DB, databaseFilter *filter.DatabaseFilter) Rows { +func collect(db model.QueryExecutor, databaseFilter *filter.DatabaseFilter) Rows { var t Rows log.Printf("collect(?,%q)\n", databaseFilter) diff --git a/model/tableio/tableio.go b/model/tableio/tableio.go index 1f7c891..7f4ee39 100644 --- a/model/tableio/tableio.go +++ b/model/tableio/tableio.go @@ -2,83 +2,51 @@ package tableio import ( - "database/sql" - "log" - "time" - "github.com/sjmudd/ps-top/config" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/common" - "github.com/sjmudd/ps-top/utils" ) // TableIo contains performance_schema.table_io_waits_summary_by_table data type TableIo struct { - config *config.Config - FirstCollected time.Time - LastCollected time.Time - wantLatency bool - first Rows // initial data for relative values - last Rows // last loaded values - Results Rows // results (maybe with subtraction) - Totals Row // totals of results - db *sql.DB -} - -// NewTableIo returns an i/o latency object with config and db handle -func NewTableIo(cfg *config.Config, db *sql.DB) *TableIo { - tiol := &TableIo{ - config: cfg, - db: db, + *model.BaseCollector[Row, Rows] + wantLatency bool +} + +// NewTableIo creates a new TableIo instance. +func NewTableIo(cfg *config.Config, db model.QueryExecutor) *TableIo { + process := func(last, first Rows) (Rows, Row) { + results := make(Rows, len(last)) + copy(results, last) + if cfg.WantRelativeStats() { + common.SubtractByName(&results, first, + func(r Row) string { return r.Name }, + func(r *Row, o Row) { r.subtract(o) }, + ) + } + tot := totals(results) + return results, tot } - - return tiol -} - -// ResetStatistics resets the statistics to current values -func (tiol *TableIo) ResetStatistics() { - tiol.first = utils.DuplicateSlice(tiol.last) - tiol.FirstCollected = tiol.LastCollected - - tiol.calculate() + bc := model.NewBaseCollector[Row, Rows](cfg, db, process) + return &TableIo{BaseCollector: bc, wantLatency: false} } // Collect collects data from the db, updating initial values // if needed, and then subtracting initial values if we want relative // values, after which it stores totals. func (tiol *TableIo) Collect() { - start := time.Now() - - tiol.last = collect(tiol.db, tiol.config.DatabaseFilter()) - tiol.LastCollected = time.Now() - - // check for no first data or need to reload initial characteristics - if (len(tiol.first) == 0 && len(tiol.last) > 0) || totals(tiol.first).SumTimerWait > totals(tiol.last).SumTimerWait { - tiol.first = utils.DuplicateSlice(tiol.last) - tiol.FirstCollected = tiol.LastCollected + bc := tiol.BaseCollector + fetch := func() (Rows, error) { + return collect(bc.DB(), bc.Config().DatabaseFilter()), nil } - - tiol.calculate() - - log.Println("tiol.first.totals():", totals(tiol.first)) - log.Println("tiol.last.totals():", totals(tiol.last)) - log.Println("TableIo.Collect() END, took:", time.Since(start)) -} - -func (tiol *TableIo) calculate() { - tiol.Results = utils.DuplicateSlice(tiol.last) - - if tiol.config.WantRelativeStats() { - common.SubtractByName(&tiol.Results, tiol.first, - func(r Row) string { return r.Name }, - func(r *Row, o Row) { r.subtract(o) }, - ) + wantRefresh := func() bool { + return (len(bc.First) == 0 && len(bc.Last) > 0) || totals(bc.First).SumTimerWait > totals(bc.Last).SumTimerWait } - - tiol.Totals = totals(tiol.Results) + bc.Collect(fetch, wantRefresh) } // WantsLatency returns whether we want to see latency information -func (tiol TableIo) WantsLatency() bool { +func (tiol *TableIo) WantsLatency() bool { return tiol.wantLatency } @@ -87,7 +55,7 @@ func (tiol TableIo) HaveRelativeStats() bool { return true } -// WantRelativeStats +// WantRelativeStats returns whether relative stats are desired based on config func (tiol TableIo) WantRelativeStats() bool { - return tiol.config.WantRelativeStats() + return tiol.Config().WantRelativeStats() } diff --git a/model/tablelocks/rows.go b/model/tablelocks/rows.go index 351a558..c4111b2 100644 --- a/model/tablelocks/rows.go +++ b/model/tablelocks/rows.go @@ -3,10 +3,10 @@ package tablelocks import ( - "database/sql" "fmt" "github.com/sjmudd/ps-top/log" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/filter" "github.com/sjmudd/ps-top/utils" @@ -42,7 +42,7 @@ func totals(rows []Row) Row { // Select the raw data from the database into file_summary_by_instance_rows // - filter out empty values // - change FILE_NAME into a more descriptive value. -func collect(db *sql.DB, filter *filter.DatabaseFilter) []Row { +func collect(db model.QueryExecutor, filter *filter.DatabaseFilter) []Row { sql := ` SELECT OBJECT_SCHEMA, OBJECT_NAME, @@ -132,9 +132,3 @@ func (rows *Rows) subtract(initial Rows) { } } } - -// if the data in t2 is "newer", "has more values" than t then it needs refreshing. -// check this by comparing totals. -func (rows Rows) needsRefresh(otherRows Rows) bool { - return totals(rows).SumTimerWait > totals(otherRows).SumTimerWait -} diff --git a/model/tablelocks/tablelocks.go b/model/tablelocks/tablelocks.go index c979858..b561914 100644 --- a/model/tablelocks/tablelocks.go +++ b/model/tablelocks/tablelocks.go @@ -2,71 +2,40 @@ package tablelocks import ( - "database/sql" - "log" - "time" - "github.com/sjmudd/ps-top/config" - - _ "github.com/go-sql-driver/mysql" // keep golint happy + "github.com/sjmudd/ps-top/model" ) // TableLocks represents a table of rows type TableLocks struct { - config *config.Config - FirstCollected time.Time - LastCollected time.Time - initial Rows // initial data for relative values - current Rows // last loaded values - Results Rows // results (maybe with subtraction) - Totals Row // totals of results - db *sql.DB -} - -// NewTableLocks returns a pointer to an object of this type -func NewTableLocks(cfg *config.Config, db *sql.DB) *TableLocks { - tl := &TableLocks{ - config: cfg, - db: db, + *model.BaseCollector[Row, Rows] +} + +// NewTableLocks creates a new TableLocks instance. +func NewTableLocks(cfg *config.Config, db model.QueryExecutor) *TableLocks { + process := func(last, first Rows) (Rows, Row) { + results := make(Rows, len(last)) + copy(results, last) + if cfg.WantRelativeStats() { + results.subtract(first) + } + tot := totals(results) + return results, tot } - - return tl -} - -func (tl *TableLocks) copyCurrentToInitial() { - tl.initial = make(Rows, len(tl.current)) - copy(tl.initial, tl.current) - tl.FirstCollected = tl.LastCollected + bc := model.NewBaseCollector[Row, Rows](cfg, db, process) + return &TableLocks{BaseCollector: bc} } // Collect data from the db, then merge it in. func (tl *TableLocks) Collect() { - start := time.Now() - tl.current = collect(tl.db, tl.config.DatabaseFilter()) - tl.LastCollected = time.Now() - - // check for no data or check for reload initial characteristics - if (len(tl.initial) == 0 && len(tl.current) > 0) || tl.initial.needsRefresh(tl.current) { - tl.copyCurrentToInitial() + bc := tl.BaseCollector + fetch := func() (Rows, error) { + return collect(bc.DB(), bc.Config().DatabaseFilter()), nil } - - tl.calculate() - log.Println("TableLocks.Collect() took:", time.Since(start).String()) -} - -func (tl *TableLocks) calculate() { - tl.Results = make(Rows, len(tl.current)) - copy(tl.Results, tl.current) - if tl.config.WantRelativeStats() { - tl.Results.subtract(tl.initial) + wantRefresh := func() bool { + return (len(bc.First) == 0 && len(bc.Last) > 0) || totals(bc.First).SumTimerWait > totals(bc.Last).SumTimerWait } - tl.Totals = totals(tl.Results) -} - -// ResetStatistics resets the statistics to current values -func (tl *TableLocks) ResetStatistics() { - tl.copyCurrentToInitial() - tl.calculate() + bc.Collect(fetch, wantRefresh) } // HaveRelativeStats is true for this object @@ -74,7 +43,7 @@ func (tl TableLocks) HaveRelativeStats() bool { return true } -// WantRelativeStats is true for this object +// WantRelativeStats returns the config setting. func (tl TableLocks) WantRelativeStats() bool { - return tl.config.WantRelativeStats() + return tl.Config().WantRelativeStats() } diff --git a/model/userlatency/userlatency.go b/model/userlatency/userlatency.go index 5c06560..aa44221 100644 --- a/model/userlatency/userlatency.go +++ b/model/userlatency/userlatency.go @@ -2,62 +2,99 @@ package userlatency import ( - "database/sql" - "log" "regexp" "strings" - "time" "github.com/sjmudd/ps-top/config" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/processlist" ) type mapStringInt map[string]int -// UserLatency contains a table of rows +// UserLatency aggregates processlist data by user type UserLatency struct { - config *config.Config - FirstCollected time.Time - LastCollected time.Time - current []processlist.Row // processlist - Results []Row // results by user - Totals Row // totals of results - db *sql.DB + *model.BaseCollector[Row, []Row] } -// NewUserLatency returns a user latency object -func NewUserLatency(cfg *config.Config, db *sql.DB) *UserLatency { - log.Println("NewUserLatency()") - ul := &UserLatency{ - config: cfg, - db: db, +// NewUserLatency creates a new UserLatency instance. +func NewUserLatency(cfg *config.Config, db model.QueryExecutor) *UserLatency { + process := func(last, _ []Row) ([]Row, Row) { + // last already contains aggregated rows; just copy and compute totals + results := make([]Row, len(last)) + copy(results, last) + tot := totals(results) + return results, tot } - - return ul + bc := model.NewBaseCollector[Row, []Row](cfg, db, process) + return &UserLatency{BaseCollector: bc} } -// Collect collects data from the db, updating initial -// values if needed, and then subtracting initial values if we want -// relative values, after which it stores totals. +// Collect fetches processlist data, aggregates by user, and updates results. func (ul *UserLatency) Collect() { - log.Println("UserLatency.Collect() - starting collection of data") - start := time.Now() + bc := ul.BaseCollector + fetch := func() ([]Row, error) { + raw := processlist.Collect(bc.DB()) + aggregated := ul.processlist2byUser(raw) + return aggregated, nil + } + wantRefresh := func() bool { + // Refresh baseline on first collection only + return len(bc.First) == 0 && len(bc.Last) > 0 + } + bc.Collect(fetch, wantRefresh) +} - ul.current = processlist.Collect(ul.db) - log.Println("t.current collected", len(ul.current), "row(s) from SELECT") +// processlist2byUser aggregates raw processlist rows by username +func (ul *UserLatency) processlist2byUser(raw []processlist.Row) []Row { + reActiveReplMasterThread := regexp.MustCompile("Sending binlog event to slave") + reSelect := regexp.MustCompile(`(?i)SELECT`) + reInsert := regexp.MustCompile(`(?i)INSERT`) + reUpdate := regexp.MustCompile(`(?i)UPDATE`) + reDelete := regexp.MustCompile(`(?i)DELETE`) - ul.processlist2byUser() + rowByUser := make(map[string]*Row) + hostsByUser := make(map[string]mapStringInt) + dbsByUser := make(map[string]mapStringInt) + globalHosts := make(mapStringInt) + globalDbs := make(mapStringInt) - log.Println("UserLatency.Collect() END, took:", time.Since(start).String()) -} + for i := range raw { + pl := raw[i] + username := pl.User + host := getHostname(pl.Host) + command := pl.Command + db := pl.Db + info := pl.Info + state := pl.State -// return the hostname without the port part -func getHostname(hostPort string) string { - i := strings.Index(hostPort, ":") - if i >= 0 { - return hostPort[0:i] + // fill global values + if host != "" { + globalHosts[host] = 1 + } + if db != "" { + globalDbs[db] = 1 + } + + r := getOrCreateRow(rowByUser, username, pl.User) + r.Connections++ + + updateRuntimeAndActive(r, command, pl.Time, host, state, reActiveReplMasterThread) + + // track hosts and dbs per user + r.Hosts = addHost(hostsByUser, username, host) + r.Dbs = addDB(dbsByUser, username, db) + + addStatementCounts(r, info, reSelect, reInsert, reUpdate, reDelete) } - return hostPort // shouldn't happen !!! + + results := make([]Row, 0, len(rowByUser)) + for _, v := range rowByUser { + results = append(results, *v) + } + // Totals are computed later by BaseCollector's process function; not computed here. + _ = totals(results) // ensure totals function exists; but not storing here + return results } // helper: get or create a Row pointer for username @@ -129,81 +166,21 @@ func addStatementCounts(r *Row, info string, reSelect, reInsert, reUpdate, reDel } } -// read in processlist and add the appropriate values into a new pl_by_user table -func (ul *UserLatency) processlist2byUser() { - log.Println("UserLatency.processlist2byUser() START") - - reActiveReplMasterThread := regexp.MustCompile("Sending binlog event to slave") - reSelect := regexp.MustCompile(`(?i)SELECT`) // make case insensitive - reInsert := regexp.MustCompile(`(?i)INSERT`) // make case insensitive - reUpdate := regexp.MustCompile(`(?i)UPDATE`) // make case insensitive - reDelete := regexp.MustCompile(`(?i)DELETE`) // make case insensitive - - rowByUser := make(map[string]*Row) - hostsByUser := make(map[string]mapStringInt) - dbsByUser := make(map[string]mapStringInt) - - // global values for totals. - globalHosts := make(mapStringInt) - globalDbs := make(mapStringInt) - - for i := range ul.current { - pl := ul.current[i] - id := pl.ID - Username := pl.User // limit size for display - host := getHostname(pl.Host) - command := pl.Command - db := pl.Db - info := pl.Info - state := pl.State - - log.Println("- id/user/host:", id, Username, host) - - // fill global values - if host != "" { - globalHosts[host] = 1 - } - if db != "" { - globalDbs[db] = 1 - } - - r := getOrCreateRow(rowByUser, Username, pl.User) - log.Println("- processing row for user:", Username) - - r.Connections++ - - updateRuntimeAndActive(r, command, pl.Time, host, state, reActiveReplMasterThread) - - // track hosts and dbs per user - r.Hosts = addHost(hostsByUser, Username, host) - r.Dbs = addDB(dbsByUser, Username, db) - - addStatementCounts(r, info, reSelect, reInsert, reUpdate, reDelete) - } - - results := make([]Row, 0, len(rowByUser)) - for _, v := range rowByUser { - results = append(results, *v) - } - ul.Results = results - ul.Totals = totals(ul.Results) - ul.Totals.Hosts = uint64(len(globalHosts)) - ul.Totals.Dbs = uint64(len(globalDbs)) - - log.Println("UserLatency.processlist2byUser() END") -} - -// HaveRelativeStats returns if we have relative information +// HaveRelativeStats returns false for this model (no baseline subtraction) func (ul UserLatency) HaveRelativeStats() bool { return false } -// WantRelativeStats +// WantRelativeStats returns the config setting. func (ul UserLatency) WantRelativeStats() bool { - return ul.config.WantRelativeStats() + return ul.Config().WantRelativeStats() } -// ResetStatistics - NOT IMPLEMENTED -func (ul *UserLatency) ResetStatistics() { - log.Println("userlatency.UserLatency.ResetStatistics() NOT IMPLEMENTED") +// return the hostname without the port part +func getHostname(hostPort string) string { + i := strings.Index(hostPort, ":") + if i >= 0 { + return hostPort[0:i] + } + return hostPort // shouldn't happen !!!! } diff --git a/wrapper/tableiolatency/wrapper_test.go b/wrapper/tableiolatency/wrapper_test.go index eecc14f..1bb23f1 100644 --- a/wrapper/tableiolatency/wrapper_test.go +++ b/wrapper/tableiolatency/wrapper_test.go @@ -4,9 +4,22 @@ import ( "strings" "testing" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/tableio" ) +func newTableIo(rows []tableio.Row, totals tableio.Row) *tableio.TableIo { + process := func(last, _ tableio.Rows) (tableio.Rows, tableio.Row) { + // Dummy process; not used in tests. + return last, tableio.Row{} + } + bc := model.NewBaseCollector[tableio.Row, tableio.Rows](nil, nil, process) + tiol := &tableio.TableIo{BaseCollector: bc} + bc.Results = rows + bc.Totals = totals + return tiol +} + // TestRowContentUsesSumTimerWait verifies that RowContent produces output based on SumTimerWait. func TestRowContentUsesSumTimerWait(t *testing.T) { // Create multiple rows with SumTimerWait values @@ -16,7 +29,7 @@ func TestRowContentUsesSumTimerWait(t *testing.T) { } // Sum = 4ms totals := tableio.Row{SumTimerWait: 4000000} - tiol := &tableio.TableIo{Results: rows, Totals: totals} + tiol := newTableIo(rows, totals) w := &Wrapper{tiol: tiol} lines := w.RowContent() @@ -53,7 +66,8 @@ func TestHeadings(t *testing.T) { // TestDescription checks that description contains "Latency". func TestDescription(t *testing.T) { rows := []tableio.Row{{Name: "db.t", SumTimerWait: 1000}} - w := &Wrapper{tiol: &tableio.TableIo{Results: rows}} + tiol := newTableIo(rows, tableio.Row{}) + w := &Wrapper{tiol: tiol} d := w.Description() if !strings.Contains(d, "Latency") { t.Errorf("Description missing 'Latency': %q", d) @@ -83,7 +97,7 @@ func TestRowContentOperationPercentages(t *testing.T) { SumTimerWrite: 250, // write total ≥ insert+update+delete (200) } totals := tableio.Row{SumTimerWait: 550} - tiol := &tableio.TableIo{Results: []tableio.Row{row}, Totals: totals} + tiol := newTableIo([]tableio.Row{row}, totals) w := &Wrapper{tiol: tiol} line := w.RowContent()[0] diff --git a/wrapper/tableioops/wrapper_test.go b/wrapper/tableioops/wrapper_test.go index 506b4af..f464ed6 100644 --- a/wrapper/tableioops/wrapper_test.go +++ b/wrapper/tableioops/wrapper_test.go @@ -4,9 +4,22 @@ import ( "strings" "testing" + "github.com/sjmudd/ps-top/model" "github.com/sjmudd/ps-top/model/tableio" ) +func newTableIo(rows []tableio.Row, totals tableio.Row) *tableio.TableIo { + process := func(last, _ tableio.Rows) (tableio.Rows, tableio.Row) { + // Dummy process; not used in tests. + return last, tableio.Row{} + } + bc := model.NewBaseCollector[tableio.Row, tableio.Rows](nil, nil, process) + tiol := &tableio.TableIo{BaseCollector: bc} + bc.Results = rows + bc.Totals = totals + return tiol +} + // TestRowContentUsesCounts verifies that RowContent uses CountStar and Count* fields. func TestRowContentUsesCounts(t *testing.T) { rows := []tableio.Row{ @@ -14,7 +27,7 @@ func TestRowContentUsesCounts(t *testing.T) { {Name: "db2.t2", CountStar: 100, CountFetch: 50}, } totals := tableio.Row{CountStar: 200} - tiol := &tableio.TableIo{Results: rows, Totals: totals} + tiol := newTableIo(rows, totals) w := &Wrapper{tiol: tiol} lines := w.RowContent() @@ -52,7 +65,8 @@ func TestHeadings(t *testing.T) { // TestDescription checks that description contains "Ops". func TestDescription(t *testing.T) { rows := []tableio.Row{{Name: "db.t", CountStar: 100}} - w := &Wrapper{tiol: &tableio.TableIo{Results: rows}} + tiol := newTableIo(rows, tableio.Row{}) + w := &Wrapper{tiol: tiol} d := w.Description() if !strings.Contains(d, "Ops") { t.Errorf("Description missing 'Ops': %q", d) @@ -81,7 +95,7 @@ func TestRowContentOperationPercentages(t *testing.T) { CountWrite: 120, // ≥ insert+update+delete (100) } totals := tableio.Row{CountStar: 270} - tiol := &tableio.TableIo{Results: []tableio.Row{row}, Totals: totals} + tiol := newTableIo([]tableio.Row{row}, totals) w := &Wrapper{tiol: tiol} line := w.RowContent()[0]