forked from cbuijs/sdproxy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache.go
More file actions
executable file
·509 lines (455 loc) · 17.9 KB
/
cache.go
File metadata and controls
executable file
·509 lines (455 loc) · 17.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
/*
File: cache.go
Version: 2.2.0
Updated: 2026-03-23 23:00 CET
Description: High-performance, sharded, non-blocking DNS cache for sdproxy.
Caches positive (NOERROR), negative (NXDOMAIN), and empty (NOERROR
with no answer records) responses per RFC 2308.
Optimised for embedded: struct-based zero-allocation cache keys,
pseudo-random eviction, wire-format storage.
Changes:
2.2.0 - [REFACTOR] storeItem() helper extracted — CacheSet and CacheSetSynth
now share a single shard-lock + eviction + store path, eliminating
drift risk between the two callers.
CacheSetSynth pack errors now logged (matched CacheSet behaviour).
2.1.0 - [FEAT] CacheSetSynth — stores any synthesised sdproxy response at
syntheticTTL without the RFC 2308 RCODE filter that CacheSet enforces.
Three startup flags: cacheUpstreamNeg, cacheSynthFlag,
cacheLocalIdentity — set from new config fields in InitCache.
staleNano == expireNano for synth entries — no stale window,
no backgroundRevalidate.
2.0.0 - [PERF] time.Time fields replaced with int64 unix nanoseconds — saves
32 bytes per cacheItem; all time comparisons become integer ops.
atomic.Uint32 prefetched → atomic.Bool.
hasPrefetch + staleEnabled startup bools gate hot-path branches:
hits.Add(1) skipped when prefetch is not configured.
packBufPool removed — reuses smallBufPool from globals.go.
Sweeper toDelete slice allocated once and reused across all shards
per tick ([:0] reset) — eliminates up to 32 allocs per sweep cycle.
1.22.0 - [PERF] CacheGet accepts caller-provided *dns.Msg (from msgPool).
1.21.0 - [PERF] Wire-format storage; PackBuffer + immutable packed bytes.
1.20.0 - [FEAT] Prefetch: hit-gated early background refresh.
1.19.0 - [FIX] Sub-second fresh-window edge case; sweeper two-phase R/W.
1.18.0 - [FEAT] Serve-stale / background revalidation (RFC 8767).
1.17.0 - [PERF] shardCount 16→32.
1.16.0 - [PERF] Pre-computed cacheMaxPerShard.
1.15.0 - [FEAT] max_ttl support.
1.14.0 - [PERF] Inline FNV-1a shard selector; RouteIdx uint8 cache key.
1.13.0 - [PERF] Single time.Now() per CacheGet call.
... Older commit-information removed for brevity.
*/
package main
import (
"log"
"sync"
"sync/atomic"
"time"
"github.com/miekg/dns"
)
// DNSCacheKey is the map key for all cache lookups.
//
// Name must always be normalised (lowercase, no trailing dot) as produced by
// lowerTrimDot in policy.go — ensures "GOOGLE.COM." and "google.com" share
// the same entry. RouteIdx is a compact uint8 so the key stays small and
// struct-comparable without a string route name on every lookup.
type DNSCacheKey struct {
Name string
Qtype uint16
Qclass uint16
RouteIdx uint8
}
// cacheItem is a single cached DNS response in wire format.
type cacheItem struct {
packed []byte // immutable packed DNS wire bytes (never mutated after store)
expireNano int64 // expiry deadline as unix nanoseconds
staleNano int64 // end of stale-serving window; == expireNano for synth entries
routeName string // upstream group name used by backgroundRevalidate
hits atomic.Uint32 // hit counter for prefetch popularity gate
prefetched atomic.Bool // CAS flag: exactly one prefetch fires per entry lifetime
}
// cacheShard is a independently locked segment of the cache.
// 32 shards reduce write-lock contention by ~32× compared to a single mutex.
type cacheShard struct {
sync.RWMutex
items map[DNSCacheKey]*cacheItem
}
const shardCount = 32
var shards [shardCount]*cacheShard
// cacheMaxPerShard is the per-shard entry ceiling, pre-computed from the
// configured total size at InitCache time.
var cacheMaxPerShard int
// ---------------------------------------------------------------------------
// Startup feature flags — set once in InitCache, read on every hot-path call.
// ---------------------------------------------------------------------------
// hasPrefetch is true when both prefetch knobs are > 0.
// Guards hits.Add(1) — skipping the atomic barrier on every cache hit when
// prefetch is disabled is meaningful on MIPS/ARM routers.
var hasPrefetch bool
// staleEnabled is true when cfg.Cache.StaleTTL > 0.
// Guards the stale-window logic so the common disabled case pays nothing
// beyond a single bool load.
var staleEnabled bool
// cacheUpstreamNeg controls whether upstream NXDOMAIN / NODATA responses are
// stored. true = cache them (RFC 2308 compliant, default).
// false = always forward negative queries upstream — useful when upstream
// blocklists change frequently and you don't want negatives to linger.
var cacheUpstreamNeg bool
// cacheSynthFlag controls whether synthesised policy responses (domain_policy,
// rtype_policy, AAAA filter, strict_ptr, obsolete qtypes) are stored via
// CacheSetSynth. When true, repeat policy-blocked queries hit the cache at
// step 3 in process.go and skip domain walks + policy lookups entirely.
var cacheSynthFlag bool
// cacheLocalIdentity controls whether local A/AAAA/PTR responses from
// hosts/leases files are stored via CacheSetSynth. Only safe when
// syntheticTTL ≤ identity.poll_interval — otherwise stale local addresses
// may be served in the gap between a file change and the next poll.
var cacheLocalIdentity bool
// InitCache initialises all shards and starts the background sweeper.
// Called once from main() after cfg is populated.
func InitCache(maxSize int, _ int) {
if !cfg.Cache.Enabled {
return
}
for i := range shards {
shards[i] = &cacheShard{items: make(map[DNSCacheKey]*cacheItem)}
}
cacheMaxPerShard = maxSize / shardCount
if cacheMaxPerShard < 1 {
cacheMaxPerShard = 1
}
// Set hot-path feature flags once so CacheGet/CacheSet branches are pure
// bool loads — no config struct field accesses on the critical path.
hasPrefetch = cfg.Cache.PrefetchBefore > 0 && cfg.Cache.PrefetchMinHits > 0
staleEnabled = cfg.Cache.StaleTTL > 0
cacheUpstreamNeg = cfg.Cache.CacheUpstreamNegative
cacheSynthFlag = cfg.Cache.CacheSynthetic
cacheLocalIdentity = cfg.Cache.CacheLocalIdentity
sweepInterval := 60 * time.Second
if cfg.Cache.SweepIntervalS > 0 {
sweepInterval = time.Duration(cfg.Cache.SweepIntervalS) * time.Second
}
log.Printf("[CACHE] Initialised: size=%d shards=%d sweep=%s stale=%ds "+
"prefetch=%ds/%dhits synth=%v localid=%v upneg=%v",
maxSize, shardCount, sweepInterval,
cfg.Cache.StaleTTL, cfg.Cache.PrefetchBefore, cfg.Cache.PrefetchMinHits,
cacheSynthFlag, cacheLocalIdentity, cacheUpstreamNeg)
go runSweeper(sweepInterval)
}
// runSweeper periodically reclaims cache entries whose stale window has passed.
//
// Correctness note: the sweeper only frees memory. CacheGet independently
// rejects expired entries on every read, so a late sweep never serves stale data.
//
// Two-phase strategy (prevents holding a write lock during the full scan):
// Phase 1 — RLock: scan shard, collect expired keys into toDelete.
// Phase 2 — Lock: delete each key, re-checking staleNano to skip any entry
// that was just refreshed by a concurrent CacheSet.
//
// toDelete is allocated once before the loop with a capacity hint and reused
// across all 32 shards per tick (reset with [:0]) — no per-shard allocation.
func runSweeper(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
// Pre-allocate with a reasonable capacity so the first real sweep rarely
// needs to grow. cacheMaxPerShard/4 is a conservative estimate of how many
// entries might have expired in one interval.
toDelete := make([]DNSCacheKey, 0, max(cacheMaxPerShard/4, 16))
for range ticker.C {
now := time.Now().UnixNano()
for i := range shards {
shard := shards[i]
toDelete = toDelete[:0] // reset length, keep backing array
shard.RLock()
for k, v := range shard.items {
if now >= v.staleNano {
toDelete = append(toDelete, k)
}
}
shard.RUnlock()
if len(toDelete) == 0 {
continue
}
shard.Lock()
for _, k := range toDelete {
// Re-check: a concurrent CacheSet may have refreshed this key.
if v, ok := shard.items[k]; ok && now >= v.staleNano {
delete(shard.items, k)
}
}
shard.Unlock()
}
}
}
// getShard maps a cache key to a shard using inline FNV-1a.
//
// Purely arithmetic — no allocation, no method dispatch. Name carries the
// most entropy so it is hashed first. Qtype, Qclass, and RouteIdx are folded
// in with single multiplications. Matters on MIPS/ARM where anything that
// allocates is expensive.
func getShard(key DNSCacheKey) *cacheShard {
const (
basis uint64 = 14695981039346656037
prime uint64 = 1099511628211
)
h := basis
for i := 0; i < len(key.Name); i++ {
h ^= uint64(key.Name[i])
h *= prime
}
h ^= uint64(key.Qtype)<<16 | uint64(key.Qclass)
h *= prime
h ^= uint64(key.RouteIdx)
h *= prime
return shards[h&(shardCount-1)]
}
// storeItem acquires the shard write-lock, evicts one pseudo-random entry when
// the shard is at capacity, then stores item under key.
//
// Shared by CacheSet and CacheSetSynth — eviction and store logic live in
// exactly one place, so the two callers cannot drift out of sync.
//
// Pseudo-random eviction: Go's map iteration order is deliberately randomised,
// so the first key returned is statistically uniform across all entries.
// No division, no sorting, no secondary data structure needed.
func storeItem(key DNSCacheKey, item *cacheItem) {
shard := getShard(key)
shard.Lock()
if len(shard.items) >= cacheMaxPerShard {
for k := range shard.items {
delete(shard.items, k)
break
}
}
shard.items[key] = item
shard.Unlock()
}
// CacheGet unpacks a cached response into the caller-provided *dns.Msg
// (sourced from msgPool in process.go) and returns three status flags.
//
// The caller must zero the message before passing it in: `*out = dns.Msg{}`.
// After WriteMsg the caller returns out to msgPool.
//
// isStale=false, isPrefetch=false — normal fresh hit.
// isStale=false, isPrefetch=true — fresh; background prefetch just fired.
// isStale=true, isPrefetch=false — past TTL but inside stale window;
// all TTLs set to 0 per RFC 8767 §4.
//
// Returns ok=false on a miss or when the stale window has passed.
// out is in an undefined state when ok=false — do not use it.
func CacheGet(key DNSCacheKey, out *dns.Msg) (isStale, isPrefetch, ok bool) {
if !cfg.Cache.Enabled {
return
}
shard := getShard(key)
shard.RLock()
item, found := shard.items[key]
shard.RUnlock()
if !found {
return
}
// One syscall for the entire time-dependent path.
now := time.Now().UnixNano()
// Past the full stale window — treat as miss (sweeper may not have fired yet).
if now >= item.staleNano {
return
}
isStale = now >= item.expireNano
if isStale && !staleEnabled {
return // stale window disabled
}
// Hit counting — only pay the atomic barrier when prefetch is configured.
// This is the common home-router case: prefetch off → zero atomic cost.
if hasPrefetch {
item.hits.Add(1)
}
// Remaining TTL in whole seconds (integer arithmetic — no float64 division).
// remaining==0 for stale responses — intentional per RFC 8767 §4.
var remaining uint32
if !isStale {
r := item.expireNano - now // nanoseconds remaining
if r < int64(time.Second) {
// Sub-second window: functionally expired. With stale enabled,
// promote to stale so the client gets TTL=0 and backgroundRevalidate
// fires — avoids a synchronous upstream round-trip on the last second.
if !staleEnabled {
return false, false, false
}
isStale = true
} else {
remaining = uint32(r / int64(time.Second))
}
}
// ── Prefetch gate ──────────────────────────────────────────────────────
// All five conditions must hold:
// 1. hasPrefetch — both config knobs are set (checked at startup).
// 2. Entry is still fresh — stale path triggers its own revalidation.
// 3. Remaining TTL is inside the prefetch window.
// 4. Hit count has reached the minimum popularity threshold.
// 5. CAS false→true succeeds — exactly one goroutine fires per lifetime.
if hasPrefetch && !isStale &&
remaining > 0 &&
remaining <= uint32(cfg.Cache.PrefetchBefore) &&
item.hits.Load() >= uint32(cfg.Cache.PrefetchMinHits) &&
item.prefetched.CompareAndSwap(false, true) {
go backgroundRevalidate(key, item.routeName, "")
isPrefetch = true
}
// Unpack wire bytes into the caller-provided *dns.Msg.
// item.packed is immutable — safe for concurrent callers without the lock.
if err := out.Unpack(item.packed); err != nil {
return false, false, false
}
// Rewrite TTLs to reflect actual remaining lifetime.
// OPT (EDNS0) carries flags, not a TTL — always skip it.
for _, rr := range out.Answer {
rr.Header().Ttl = remaining
}
for _, rr := range out.Ns {
rr.Header().Ttl = remaining
}
for _, rr := range out.Extra {
if rr.Header().Rrtype != dns.TypeOPT {
rr.Header().Ttl = remaining
}
}
return isStale, isPrefetch, true
}
// CacheSet packs msg into wire format and stores it under key.
//
// Uses smallBufPool (globals.go) as a scratch buffer for PackBuffer — the
// same 4096-byte pool used elsewhere in the pipeline. The final stored slice
// is an independent copy so the pool buffer is immediately safe to return.
//
// TTL derivation:
// Positive (NOERROR with answers): minimum TTL across all answer RRs.
// Negative (NXDOMAIN or NODATA): SOA minimum from authority section;
// falls back to NegativeTTL, then MinTTL.
//
// A fresh *cacheItem is always allocated — implicitly resets hits and
// prefetched to zero so a refreshed entry re-accumulates popularity from scratch.
func CacheSet(key DNSCacheKey, msg *dns.Msg, routeName string) {
if !cfg.Cache.Enabled || msg == nil {
return
}
// Negative upstream caching gate — checked first so cache_upstream_negative:
// false takes effect even before the RCODE filter below.
isNeg := msg.Rcode == dns.RcodeNameError ||
(msg.Rcode == dns.RcodeSuccess && len(msg.Answer) == 0)
if isNeg && !cacheUpstreamNeg {
return
}
// Only NOERROR and NXDOMAIN are cacheable (RFC 2308).
// SERVFAIL, REFUSED, FORMERR etc. are transient or policy — never cache.
switch msg.Rcode {
case dns.RcodeSuccess, dns.RcodeNameError:
default:
return
}
var ttl uint32
if !isNeg {
// Positive: minimum TTL across all answer records.
ttl = ^uint32(0)
for _, rr := range msg.Answer {
if rr.Header().Ttl < ttl {
ttl = rr.Header().Ttl
}
}
} else {
// Negative (NXDOMAIN or NODATA — RFC 2308 §5).
// Prefer SOA minimum from authority section when present.
for _, rr := range msg.Ns {
if soa, ok := rr.(*dns.SOA); ok {
ttl = soa.Hdr.Ttl
if soa.Minttl < ttl {
ttl = soa.Minttl
}
break
}
}
if ttl == 0 {
if cfg.Cache.NegativeTTL > 0 {
ttl = uint32(cfg.Cache.NegativeTTL)
} else {
ttl = uint32(cfg.Cache.MinTTL)
}
}
}
// Floor: positive → MinTTL, negative → NegativeTTL (or MinTTL).
effectiveMin := uint32(cfg.Cache.MinTTL)
if isNeg && cfg.Cache.NegativeTTL > 0 {
effectiveMin = uint32(cfg.Cache.NegativeTTL)
}
if ttl < effectiveMin {
ttl = effectiveMin
}
// Ceiling: MaxTTL=0 means unlimited.
if cfg.Cache.MaxTTL > 0 && ttl > uint32(cfg.Cache.MaxTTL) {
ttl = uint32(cfg.Cache.MaxTTL)
}
// Compute deadlines in nanoseconds — pure integer arithmetic.
now := time.Now().UnixNano()
expireNano := now + int64(ttl)*int64(time.Second)
staleNano := expireNano
if staleEnabled {
staleNano = expireNano + int64(cfg.Cache.StaleTTL)*int64(time.Second)
}
// Pack via smallBufPool (shared with server.go / upstream.go).
bufp := smallBufPool.Get().(*[]byte)
packed, err := msg.PackBuffer(*bufp)
if err != nil {
smallBufPool.Put(bufp)
log.Printf("[CACHE] CacheSet: pack failed for %q: %v", key.Name, err)
return
}
stored := make([]byte, len(packed))
copy(stored, packed)
smallBufPool.Put(bufp)
storeItem(key, &cacheItem{
packed: stored,
expireNano: expireNano,
staleNano: staleNano,
routeName: routeName,
// hits and prefetched are zero-valued atomics — no explicit init needed.
})
}
// CacheSetSynth stores a synthesised sdproxy response (any RCODE) at a fixed
// syntheticTTL. Unlike CacheSet it bypasses the RFC 2308 RCODE filter, so
// REFUSED, NOTIMP, and NOERROR-with-no-answers can all be cached here.
//
// Used for:
// - Policy exits: domain_policy, rtype_policy, AAAA filter, strict_ptr,
// obsolete qtypes — when cache_synthetic: true.
// - Local identity A/AAAA/PTR responses — when cache_local_identity: true.
//
// staleNano is always set equal to expireNano — synthetic entries have no
// upstream to revalidate against, so backgroundRevalidate must never fire
// for them (routeName stored as "", which storeItem writes to the item).
// Prefetch is also disabled for synthetic entries (routeName="").
//
// Called from process.go immediately after writing the synthesised response.
func CacheSetSynth(key DNSCacheKey, msg *dns.Msg) {
if !cfg.Cache.Enabled || !cacheSynthFlag || msg == nil {
return
}
now := time.Now().UnixNano()
expireNano := now + int64(syntheticTTL)*int64(time.Second)
bufp := smallBufPool.Get().(*[]byte)
packed, err := msg.PackBuffer(*bufp)
if err != nil {
smallBufPool.Put(bufp)
log.Printf("[CACHE] CacheSetSynth: pack failed for %q: %v", key.Name, err)
return
}
stored := make([]byte, len(packed))
copy(stored, packed)
smallBufPool.Put(bufp)
storeItem(key, &cacheItem{
packed: stored,
expireNano: expireNano,
staleNano: expireNano, // no stale window — nothing to revalidate
routeName: "", // synthetic; backgroundRevalidate must not fire
})
}
// atomic import guard — keeps the import live when all usages are via struct
// field types rather than direct package-level function calls.
var _ atomic.Uint32