Skip to content

Commit 76bcbf9

Browse files
authored
SHOW tsa_cache (#1529)
1 parent 962fe9d commit 76bcbf9

File tree

16 files changed

+97
-3
lines changed

16 files changed

+97
-3
lines changed

docs/sharding/console/sql_commands.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ This command is used to retrieve information about a specific entity of SPQR clu
149149
```sql
150150
SHOW entity [WHERE where_clause]
151151

152-
where entity is one of databases, routers, pools, shards, backend_connections, key_ranges, clients, status, distributions, version, relations, sequences, task_groups, prepared_statements:
152+
where entity is one of backend_connections, clients, databases, distributions, key_ranges, pools, prepared_statements, relations, routers, sequences, shards, status, task_groups, tsa_cache, users, version:
153153
```
154154

155155
The`where_clause` is used to filter the results based on specific conditions. It allows you to specify criteria for selecting the information to be shown.

pkg/clientinteractor/interactor.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,6 +1817,33 @@ func (pi *PSQLInteractor) Users(ctx context.Context) error {
18171817
return pi.CompleteMsg(len(berules))
18181818
}
18191819

1820+
// TsaCache outputs TSA cache entries showing the target session attributes cache status
1821+
func (pi *PSQLInteractor) TsaCache(ctx context.Context, cacheEntries map[pool.TsaKey]pool.CachedEntry) error {
1822+
if err := pi.WriteHeader("tsa", "host", "az", "alive", "match", "reason", "last_check_time"); err != nil {
1823+
spqrlog.Zero.Error().Err(err).Msg("")
1824+
return err
1825+
}
1826+
1827+
count := 0
1828+
for key, entry := range cacheEntries {
1829+
if err := pi.WriteDataRow(
1830+
string(key.Tsa),
1831+
key.Host,
1832+
key.AZ,
1833+
fmt.Sprintf("%v", entry.Result.Alive),
1834+
fmt.Sprintf("%v", entry.Result.Match),
1835+
entry.Result.Reason,
1836+
entry.LastCheckTime.Format(time.RFC3339),
1837+
); err != nil {
1838+
spqrlog.Zero.Error().Err(err).Msg("")
1839+
return err
1840+
}
1841+
count++
1842+
}
1843+
1844+
return pi.CompleteMsg(count)
1845+
}
1846+
18201847
// Outputs groupBy get list values and counts its 'groupByCol' property.
18211848
// 'groupByCol' sorted in grouped result by string key ASC mode
18221849
//

pkg/connmgr/connmgr.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package connmgr
22

33
import (
4+
"github.com/pg-sharding/spqr/pkg/pool"
45
"github.com/pg-sharding/spqr/pkg/tsa"
56
)
67

78
type ConnectionStatsMgr interface {
89
ConnectionIterator
910

1011
InstanceHealthChecks() map[string]tsa.CachedCheckResult
12+
TsaCacheEntries() map[pool.TsaKey]pool.CachedEntry
1113

1214
/*
1315
user-facing connection stat callbacks.

pkg/coord/clustered_coord.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ func (ci grpcConnMgr) InstanceHealthChecks() map[string]tsa.CachedCheckResult {
5959
return map[string]tsa.CachedCheckResult{}
6060
}
6161

62+
// TsaCacheEntries implements connmgr.ConnectionStatsMgr.
63+
func (ci grpcConnMgr) TsaCacheEntries() map[pool.TsaKey]pool.CachedEntry {
64+
return map[pool.TsaKey]pool.CachedEntry{}
65+
}
66+
6267
// TODO implement it
6368
// ActiveTcpCount implements connectiterator.ConnectIterator.
6469
func (ci grpcConnMgr) ActiveTcpCount() int64 {

pkg/meta/meta.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,9 @@ func ProcessShow(ctx context.Context, stmt *spqrparser.Show, mngr EntityMgr, ci
880880
return cli.MoveStats(ctx, stats)
881881
case spqrparser.Users:
882882
return cli.Users(ctx)
883+
case spqrparser.TsaCacheStr:
884+
cacheEntries := ci.TsaCacheEntries()
885+
return cli.TsaCache(ctx, cacheEntries)
883886

884887
default:
885888
return ErrUnknownCoordinatorCommand

pkg/pool/dbpool.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ func (s *DBPool) InstanceHealthChecks() map[string]tsa.CachedCheckResult {
4646
return s.checker.InstanceHealthChecks()
4747
}
4848

49+
// TsaCacheEntries implements MultiShardTSAPool.
50+
func (s *DBPool) TsaCacheEntries() map[TsaKey]CachedEntry {
51+
return s.cache.GetAllCachedEntries()
52+
}
53+
4954
// View implements MultiShardPool.
5055
func (s *DBPool) View() Statistics {
5156
return s.pool.View()

pkg/pool/dbpool_cache.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,20 @@ func (c *DbpoolCache) GetAllEntries() map[TsaKey]LocalCheckResult {
155155
return entries
156156
}
157157

158+
// GetAllCachedEntries returns all cache entries with timestamps
159+
func (c *DbpoolCache) GetAllCachedEntries() map[TsaKey]CachedEntry {
160+
entries := make(map[TsaKey]CachedEntry)
161+
c.cache.Range(func(key, value any) bool {
162+
if tsaKey, ok := key.(TsaKey); ok {
163+
if entry, ok := value.(CachedEntry); ok {
164+
entries[tsaKey] = entry
165+
}
166+
}
167+
return true
168+
})
169+
return entries
170+
}
171+
158172
// startCacheCleanup starts a background goroutine that cleans up stale cache entries every 30 seconds
159173
func (c *DbpoolCache) startCacheCleanup(d time.Duration) {
160174
go func() {

pkg/pool/pool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type MultiShardTSAPool interface {
6060
ShardMapping() map[string]*config.Shard
6161
ConnectionWithTSA(clid uint, key kr.ShardKey, targetSessionAttrs tsa.TSA) (shard.ShardHostInstance, error)
6262
InstanceHealthChecks() map[string]tsa.CachedCheckResult
63+
TsaCacheEntries() map[TsaKey]CachedEntry
6364

6465
StopCacheWatchdog()
6566
}

router/rulerouter/rulerouter.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/pg-sharding/spqr/pkg/catalog"
1212
"github.com/pg-sharding/spqr/pkg/connmgr"
1313
"github.com/pg-sharding/spqr/pkg/models/spqrerror"
14+
"github.com/pg-sharding/spqr/pkg/pool"
1415
"github.com/pg-sharding/spqr/pkg/tsa"
1516
"github.com/pg-sharding/spqr/pkg/txstatus"
1617
"golang.org/x/sync/semaphore"
@@ -82,6 +83,19 @@ func (r *RuleRouterImpl) InstanceHealthChecks() map[string]tsa.CachedCheckResult
8283
return rt
8384
}
8485

86+
// TsaCacheEntries implements ConnectionStatsMgr.
87+
func (r *RuleRouterImpl) TsaCacheEntries() map[pool.TsaKey]pool.CachedEntry {
88+
rt := map[pool.TsaKey]pool.CachedEntry{}
89+
_ = r.NotifyRoutes(func(r *route.Route) (bool, error) {
90+
m := r.ServPool().TsaCacheEntries()
91+
for k, v := range m {
92+
rt[k] = v
93+
}
94+
return true, nil
95+
})
96+
return rt
97+
}
98+
8599
// ReleaseConnection implements RuleRouter.
86100
func (r *RuleRouterImpl) ReleaseConnection() {
87101
r.activeTcpCount.Add(-1)

test/regress/schedule/console

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ test: sequence
1919
test: alter_relation
2020
test: reference_table
2121
test: show_users
22+
test: show_tsa_cache
2223
test: routing_expression

0 commit comments

Comments
 (0)