|
| 1 | +package gateway |
| 2 | + |
| 3 | +import ( |
| 4 | + "errors" |
| 5 | + "sync" |
| 6 | + "time" |
| 7 | + |
| 8 | + "crypto/sha256" |
| 9 | + "encoding/hex" |
| 10 | +) |
| 11 | + |
| 12 | +// In general, "query persistance" is a term for a family of optimizations that involve |
| 13 | +// storing some kind of representation of the queries that the client will send. For |
| 14 | +// nautilus, this allows for the pre-computation of query plans and can drastically speed |
| 15 | +// up response times. |
| 16 | +// |
| 17 | +// There are a few different strategies when it comes to timing the computation of these |
| 18 | +// plans. Each strategy has its own trade-offs and should be carefully considered |
| 19 | +// |
| 20 | +// Automatic Persisted Queries: |
| 21 | +// - client asks for the query associated with a particular hash |
| 22 | +// - if the server knows that hash, execute the query plan. if not, return with a known value |
| 23 | +// - if the client sees the known value, resend the query with the full query body |
| 24 | +// - the server will then calculate the plan and save it for later use |
| 25 | +// - if the client sends a known hash along with the query body, the query body is ignored |
| 26 | +// |
| 27 | +// pros/cons: |
| 28 | +// - no need for a build step |
| 29 | +// - the client can send any queries they want |
| 30 | +// |
| 31 | +// StaticPersistedQueries (not implemented here): |
| 32 | +// - as part of a build step, the gateway is given the list of queries and associated |
| 33 | +// hashes |
| 34 | +// - the client only sends the hash with queries |
| 35 | +// - if the server recognizes the hash, execute the query. Otherwise, return with en error |
| 36 | +// |
| 37 | +// pros/cons: |
| 38 | +// - need for a separate build step that prepares the queries and shares it with the server |
| 39 | +// - tighter control on operations. The client can only send queries that are approved (pre-computed) |
| 40 | + |
| 41 | +// MessageMissingCachedQuery is the string that the server sends when the user assumes that the server knows about |
| 42 | +// a caches query plan |
| 43 | +const MessageMissingCachedQuery = "PersistedQueryNotFound" |
| 44 | + |
| 45 | +// QueryPlanCache decides when to compute a plan |
| 46 | +type QueryPlanCache interface { |
| 47 | + Retrieve(ctx *PlanningContext, hash *string, planner QueryPlanner) ([]*QueryPlan, error) |
| 48 | +} |
| 49 | + |
| 50 | +// WithNoQueryPlanCache is the default option and disables any persisted query behavior |
| 51 | +func WithNoQueryPlanCache() Option { |
| 52 | + return WithQueryPlanCache(&NoQueryPlanCache{}) |
| 53 | +} |
| 54 | + |
| 55 | +// NoQueryPlanCache will always compute the plan for a query, regardless of the value passed as `hash` |
| 56 | +type NoQueryPlanCache struct{} |
| 57 | + |
| 58 | +// Retrieve just computes the query plan |
| 59 | +func (p *NoQueryPlanCache) Retrieve(ctx *PlanningContext, hash *string, planner QueryPlanner) ([]*QueryPlan, error) { |
| 60 | + return planner.Plan(ctx) |
| 61 | +} |
| 62 | + |
| 63 | +// WithQueryPlanCache sets the query plan cache that the gateway will use |
| 64 | +func WithQueryPlanCache(p QueryPlanCache) Option { |
| 65 | + return func(g *Gateway) { |
| 66 | + g.queryPlanCache = p |
| 67 | + } |
| 68 | +} |
| 69 | + |
| 70 | +// WithAutomaticQueryPlanCache enables the "automatic persisted query" technique |
| 71 | +func WithAutomaticQueryPlanCache() Option { |
| 72 | + return WithQueryPlanCache(NewAutomaticQueryPlanCache()) |
| 73 | +} |
| 74 | + |
| 75 | +type queryPlanCacheItem struct { |
| 76 | + LastUsed time.Time |
| 77 | + Value []*QueryPlan |
| 78 | +} |
| 79 | + |
| 80 | +// AutomaticQueryPlanCache is a QueryPlanCache that will use the hash if it points to a known query plan, |
| 81 | +// otherwise it will compute the plan and save it for later, to be referenced by the designated hash. |
| 82 | +type AutomaticQueryPlanCache struct { |
| 83 | + cache map[string]*queryPlanCacheItem |
| 84 | + ttl time.Duration |
| 85 | + // the automatic query plan cache needs to clear itself of query plans that have been used |
| 86 | + // recently. This coordination requires a channel over which events can be trigger whenever |
| 87 | + // a query is fired, triggering a check to clean up other queries. |
| 88 | + retrievedPlan chan bool |
| 89 | + // a boolean to track if there is a timer that needs to be reset |
| 90 | + resetTimer bool |
| 91 | + // a mutex on the timer bool |
| 92 | + timeMutex sync.Mutex |
| 93 | +} |
| 94 | + |
| 95 | +// WithCacheTTL updates and returns the cache with the new cache lifetime. Queries that haven't been |
| 96 | +// used in that long are cleaned up on the next query. |
| 97 | +func (c *AutomaticQueryPlanCache) WithCacheTTL(duration time.Duration) *AutomaticQueryPlanCache { |
| 98 | + return &AutomaticQueryPlanCache{ |
| 99 | + cache: c.cache, |
| 100 | + ttl: duration, |
| 101 | + retrievedPlan: c.retrievedPlan, |
| 102 | + resetTimer: c.resetTimer, |
| 103 | + } |
| 104 | +} |
| 105 | + |
| 106 | +// NewAutomaticQueryPlanCache returns a fresh instance of |
| 107 | +func NewAutomaticQueryPlanCache() *AutomaticQueryPlanCache { |
| 108 | + return &AutomaticQueryPlanCache{ |
| 109 | + cache: map[string]*queryPlanCacheItem{}, |
| 110 | + // default cache lifetime of 3 days |
| 111 | + ttl: 10 * 24 * time.Hour, |
| 112 | + retrievedPlan: make(chan bool), |
| 113 | + resetTimer: false, |
| 114 | + } |
| 115 | +} |
| 116 | + |
| 117 | +// Retrieve follows the "automatic query persistance" technique. If the hash is known, it will use the referenced query plan. |
| 118 | +// If the hash is not know but the query is provided, it will compute the plan, return it, and save it for later use. |
| 119 | +// If the hash is not known and the query is not provided, it will return with an error prompting the client to provide the hash and query |
| 120 | +func (c *AutomaticQueryPlanCache) Retrieve(ctx *PlanningContext, hash *string, planner QueryPlanner) ([]*QueryPlan, error) { |
| 121 | + |
| 122 | + // when we're done with retrieving the value we have to clear the cache |
| 123 | + defer func() { |
| 124 | + // spawn a goroutine that might be responsible for clearing the cache |
| 125 | + go func() { |
| 126 | + // check if there is a timer to reset |
| 127 | + c.timeMutex.Lock() |
| 128 | + resetTimer := c.resetTimer |
| 129 | + c.timeMutex.Unlock() |
| 130 | + |
| 131 | + // if there is already a goroutine that's waiting to clean things up |
| 132 | + if resetTimer { |
| 133 | + // just reset their time |
| 134 | + c.retrievedPlan <- true |
| 135 | + // and we're done |
| 136 | + return |
| 137 | + } |
| 138 | + c.timeMutex.Lock() |
| 139 | + c.resetTimer = true |
| 140 | + c.timeMutex.Unlock() |
| 141 | + |
| 142 | + // otherwise this is the goroutine responsible for cleaning up the cache |
| 143 | + timer := time.NewTimer(c.ttl) |
| 144 | + |
| 145 | + // we will have to consume more than one input |
| 146 | + TRUE_LOOP: |
| 147 | + for { |
| 148 | + select { |
| 149 | + // if another plan was retrieved |
| 150 | + case <-c.retrievedPlan: |
| 151 | + // reset the time |
| 152 | + timer.Reset(c.ttl) |
| 153 | + |
| 154 | + // if the timer dinged |
| 155 | + case <-timer.C: |
| 156 | + // there is no longer a timer to reset |
| 157 | + c.timeMutex.Lock() |
| 158 | + c.resetTimer = false |
| 159 | + c.timeMutex.Unlock() |
| 160 | + |
| 161 | + // loop over every time in the cache |
| 162 | + for key, cacheItem := range c.cache { |
| 163 | + // if the cached query hasn't been used recently enough |
| 164 | + if cacheItem.LastUsed.Before(time.Now().Add(-c.ttl)) { |
| 165 | + // delete it from the cache |
| 166 | + delete(c.cache, key) |
| 167 | + } |
| 168 | + } |
| 169 | + |
| 170 | + // stop consuming |
| 171 | + break TRUE_LOOP |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + }() |
| 176 | + }() |
| 177 | + |
| 178 | + // if we have a cached value for the hash |
| 179 | + if cached, hasCachedValue := c.cache[*hash]; hasCachedValue { |
| 180 | + // update the last used |
| 181 | + cached.LastUsed = time.Now() |
| 182 | + // return it |
| 183 | + return cached.Value, nil |
| 184 | + } |
| 185 | + |
| 186 | + // we dont have a cached value |
| 187 | + |
| 188 | + // if we were not given a query string |
| 189 | + if ctx.Query == "" { |
| 190 | + // return an error with the magic string |
| 191 | + return nil, errors.New(MessageMissingCachedQuery) |
| 192 | + } |
| 193 | + |
| 194 | + // compute the plan |
| 195 | + plan, err := planner.Plan(ctx) |
| 196 | + if err != nil { |
| 197 | + return nil, err |
| 198 | + } |
| 199 | + |
| 200 | + // if there is no hash |
| 201 | + if *hash == "" { |
| 202 | + hashString := sha256.Sum256([]byte(ctx.Query)) |
| 203 | + // generate a hash that will identify the query for later use |
| 204 | + *hash = hex.EncodeToString(hashString[:]) |
| 205 | + } |
| 206 | + |
| 207 | + // save it for later |
| 208 | + c.cache[*hash] = &queryPlanCacheItem{ |
| 209 | + LastUsed: time.Now(), |
| 210 | + Value: plan, |
| 211 | + } |
| 212 | + |
| 213 | + // we're done |
| 214 | + return plan, nil |
| 215 | +} |
0 commit comments