From b8e75fa29e36ba72683b8dbc81f66d62c9aa8cb5 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 06:00:39 +0900 Subject: [PATCH 1/8] feat: add round-robin retries via Retry Rounds/Retry Delay --- runner/options.go | 8 +++ runner/runner.go | 110 +++++++++++++++++++++++++++++++++++++----- runner/runner_test.go | 78 ++++++++++++++++++++++++++++-- runner/types.go | 11 +++++ 4 files changed, 192 insertions(+), 15 deletions(-) diff --git a/runner/options.go b/runner/options.go index 2fa8d601..8edfccff 100644 --- a/runner/options.go +++ b/runner/options.go @@ -274,6 +274,8 @@ type Options struct { RateLimitMinute int Probe bool Resume bool + RetryRounds int + RetryDelay int resumeCfg *ResumeCfg Exclude goflags.StringSlice HostMaxErrors int @@ -530,6 +532,8 @@ func ParseOptions() *Options { flagSet.DurationVar(&options.Delay, "delay", -1, "duration between each http request (eg: 200ms, 1s)"), flagSet.IntVarP(&options.MaxResponseBodySizeToSave, "response-size-to-save", "rsts", math.MaxInt32, "max response size to save in bytes"), flagSet.IntVarP(&options.MaxResponseBodySizeToRead, "response-size-to-read", "rstr", math.MaxInt32, "max response size to read in bytes"), + flagSet.IntVarP(&options.RetryRounds, "retry-rounds", "rr", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"), + flagSet.IntVarP(&options.RetryDelay, "retry-delay", "rd", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"), ) flagSet.CreateGroup("cloud", "Cloud", @@ -757,6 +761,10 @@ func (options *Options) ValidateOptions() error { options.Threads = defaultThreads } + if options.RetryRounds > 0 && options.RetryDelay <= 0 { + return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) + } + return nil } diff --git a/runner/runner.go b/runner/runner.go index f6ec504d..aaf848a1 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -1257,6 +1257,9 @@ func (r *Runner) RunEnumeration() { }(nextStep) wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) + retryCh := make(chan retryJob) + + retryCancel, retryWait := r.retryLoop(context.Background(), retryCh, output, r.analyze) processItem := func(k string) error { if r.options.resumeCfg != nil { @@ -1279,10 +1282,10 @@ func (r *Runner) RunEnumeration() { for _, p := range r.options.requestURIs { scanopts := r.scanopts.Clone() scanopts.RequestURI = p - r.process(k, wg, r.hp, protocol, scanopts, output) + r.process(k, wg, r.hp, protocol, scanopts, output, retryCh) } } else { - r.process(k, wg, r.hp, protocol, &r.scanopts, output) + r.process(k, wg, r.hp, protocol, &r.scanopts, output, retryCh) } return nil @@ -1300,8 +1303,11 @@ func (r *Runner) RunEnumeration() { wg.Wait() - close(output) + retryWait() + retryCancel() + close(retryCh) + close(output) wgoutput.Wait() if r.scanopts.StoreVisionReconClusters { @@ -1323,6 +1329,62 @@ func (r *Runner) RunEnumeration() { } } +type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanOptions) Result + +func (r *Runner) retryLoop( + parent context.Context, + ch chan retryJob, + output chan<- Result, + analyze analyzeFunc, +) (func(), func()) { + ctx, cancel := context.WithCancel(parent) + var jobWG sync.WaitGroup + + go func() { + for { + select { + case <-ctx.Done(): + return + case job := <-ch: + jobWG.Add(1) + + go func(j retryJob) { + defer jobWG.Done() + + if wait := time.Until(j.when); wait > 0 { + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + } + + res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) + output <- res + + if res.StatusCode == http.StatusTooManyRequests && + j.attempt < r.options.RetryRounds { + + j.attempt++ + j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) + + select { + case <-ctx.Done(): + case ch <- j: + } + } + }(job) + } + } + }() + + stop := func() { cancel() } + wait := func() { jobWG.Wait() } + return stop, wait +} + func logFilteredErrorPage(fileName, url string) { dir := filepath.Dir(fileName) if !fileutil.FolderExists(dir) { @@ -1380,11 +1442,11 @@ func (r *Runner) GetScanOpts() ScanOptions { return r.scanopts } -func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result) { - r.process(t, wg, r.hp, protocol, scanopts, output) +func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { + r.process(t, wg, r.hp, protocol, scanopts, output, retryCh) } -func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result) { +func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { // attempts to set the workpool size to the number of threads if r.options.Threads > 0 && wg.Size != r.options.Threads { if err := wg.Resize(context.Background(), r.options.Threads); err != nil { @@ -1409,15 +1471,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT defer wg.Done() result := r.analyze(hp, protocol, target, method, t, scanopts) output <- result + if result.StatusCode == http.StatusTooManyRequests && + r.options.RetryRounds > 0 { + retryCh <- retryJob{ + hp: hp, + protocol: protocol, + target: target, + method: method, + origInput: t, + scanopts: scanopts.Clone(), + attempt: 1, + when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), + } + } if scanopts.TLSProbe && result.TLSData != nil { for _, tt := range result.TLSData.SubjectAN { if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output) + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output) + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) } } if scanopts.CSPProbe && result.CSPData != nil { @@ -1428,7 +1503,7 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output) + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } } }(target, method, prot) @@ -1463,15 +1538,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT } result := r.analyze(hp, protocol, target, method, t, scanopts) output <- result + if result.StatusCode == http.StatusTooManyRequests && + r.options.RetryRounds > 0 { + retryCh <- retryJob{ + hp: hp, + protocol: protocol, + target: target, + method: method, + origInput: t, + scanopts: scanopts.Clone(), + attempt: 1, + when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), + } + } if scanopts.TLSProbe && result.TLSData != nil { for _, tt := range result.TLSData.SubjectAN { if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output) + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output) + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) } } }(port, target, method, wantedProtocol) diff --git a/runner/runner_test.go b/runner/runner_test.go index 850566b8..3395a61f 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -1,7 +1,9 @@ package runner import ( + "context" "fmt" + "net/http" "os" "strings" "testing" @@ -227,10 +229,10 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { runner := &Runner{} tests := []struct { - name string - allow []string - deny []string - testCases []struct { + name string + allow []string + deny []string + testCases []struct { ip string expected bool reason string @@ -312,3 +314,71 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { }) } } + +func TestRunner_RetryLoop(t *testing.T) { + retryCh := make(chan retryJob) + out := make(chan Result) + + r, err := New(&Options{ + RetryDelay: 500, + RetryRounds: 3, + }) + require.Nil(t, err, "could not create httpx runner") + + var calls = map[string]int{} + analyze := func(hp *httpx.HTTPX, + protocol string, + target httpx.Target, + method, origInput string, + scanopts *ScanOptions) Result { + calls[method]++ + if strings.HasPrefix(method, "retry-") && calls[method] == 1 { + return Result{StatusCode: http.StatusTooManyRequests} + } + return Result{StatusCode: http.StatusOK} + } + + cancel, wait := r.retryLoop(context.Background(), retryCh, out, analyze) + + seed := []retryJob{ + {method: "ok-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, + {method: "retry-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, + {method: "ok-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, + {method: "retry-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, + } + for _, j := range seed { + retryCh <- j + } + + want := 6 + got := make([]Result, 0, want) + deadline := time.After(2 * time.Second) + + for len(got) < want { + select { + case r := <-out: + got = append(got, r) + case <-deadline: + t.Errorf("timed out waiting results: got=%d want=%d", len(got), want) + } + } + + wait() + cancel() + close(retryCh) + + close(out) + + var n429, n200 int + for _, r := range got { + switch r.StatusCode { + case http.StatusTooManyRequests: + n429++ + case http.StatusOK: + n200++ + } + } + + require.GreaterOrEqual(t, n429, 2) + require.Equal(t, 4, n200) +} diff --git a/runner/types.go b/runner/types.go index 724e8697..5f4367c3 100644 --- a/runner/types.go +++ b/runner/types.go @@ -120,6 +120,17 @@ type Trace struct { WroteRequest time.Time `json:"wrote_request,omitempty"` } +type retryJob struct { + hp *httpx.HTTPX + protocol string + target httpx.Target + method string + origInput string + scanopts *ScanOptions + attempt int + when time.Time +} + // function to get dsl variables from result struct func dslVariables() ([]string, error) { fakeResult := Result{} From 8c2f4ffef97b2b9488eb5e93b92ad6a44f30ba36 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 18:42:06 +0900 Subject: [PATCH 2/8] fix: remove short aliases (-rr, -rd) to avoid redefinition --- runner/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/options.go b/runner/options.go index 8edfccff..44bda3b0 100644 --- a/runner/options.go +++ b/runner/options.go @@ -532,8 +532,8 @@ func ParseOptions() *Options { flagSet.DurationVar(&options.Delay, "delay", -1, "duration between each http request (eg: 200ms, 1s)"), flagSet.IntVarP(&options.MaxResponseBodySizeToSave, "response-size-to-save", "rsts", math.MaxInt32, "max response size to save in bytes"), flagSet.IntVarP(&options.MaxResponseBodySizeToRead, "response-size-to-read", "rstr", math.MaxInt32, "max response size to read in bytes"), - flagSet.IntVarP(&options.RetryRounds, "retry-rounds", "rr", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"), - flagSet.IntVarP(&options.RetryDelay, "retry-delay", "rd", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"), + flagSet.IntVar(&options.RetryRounds, "retry-rounds", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"), + flagSet.IntVar(&options.RetryDelay, "retry-delay", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"), ) flagSet.CreateGroup("cloud", "Cloud", From 6147653d9821a2807fe2f5c68d849729ad145960 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 18:42:34 +0900 Subject: [PATCH 3/8] fix: add max retries reached --- out.txt | 1 + runner/runner.go | 7 ++- runner/runner_test.go | 129 ++++++++++++++++++++++++++---------------- 3 files changed, 85 insertions(+), 52 deletions(-) create mode 100644 out.txt diff --git a/out.txt b/out.txt new file mode 100644 index 00000000..df20805c --- /dev/null +++ b/out.txt @@ -0,0 +1 @@ +http://localhost:8080/once-429 diff --git a/runner/runner.go b/runner/runner.go index aaf848a1..a54d292d 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -1364,14 +1364,17 @@ func (r *Runner) retryLoop( res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) output <- res - if res.StatusCode == http.StatusTooManyRequests && - j.attempt < r.options.RetryRounds { + if res.StatusCode == http.StatusTooManyRequests { + if j.attempt >= r.options.RetryRounds { + return + } j.attempt++ j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) select { case <-ctx.Done(): + return case ch <- j: } } diff --git a/runner/runner_test.go b/runner/runner_test.go index 3395a61f..79bb1cbf 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -3,9 +3,13 @@ package runner import ( "context" "fmt" + "log" "net/http" + "net/http/httptest" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -13,6 +17,7 @@ import ( "github.com/projectdiscovery/httpx/common/httpx" "github.com/projectdiscovery/mapcidr/asn" stringsutil "github.com/projectdiscovery/utils/strings" + syncutil "github.com/projectdiscovery/utils/sync" "github.com/stretchr/testify/require" ) @@ -315,70 +320,94 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { } } -func TestRunner_RetryLoop(t *testing.T) { - retryCh := make(chan retryJob) - out := make(chan Result) +func TestRunner_Process_And_RetryLoop(t *testing.T) { + var hits1, hits2 int32 + srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&hits1, 1) != 4 { + log.Println("serv1 429") + w.WriteHeader(http.StatusTooManyRequests) + return + } + log.Println("serv1 200") + w.WriteHeader(http.StatusOK) + })) + defer srv1.Close() + + srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&hits2, 1) != 3 { + log.Println("serv2 429") + w.WriteHeader(http.StatusTooManyRequests) + return + } + log.Println("serv2 200") + w.WriteHeader(http.StatusOK) + })) + defer srv2.Close() r, err := New(&Options{ - RetryDelay: 500, + Threads: 1, + Delay: 0, RetryRounds: 3, + RetryDelay: 200, // Duration 권장 + Timeout: 2, }) - require.Nil(t, err, "could not create httpx runner") + require.NoError(t, err) - var calls = map[string]int{} - analyze := func(hp *httpx.HTTPX, - protocol string, - target httpx.Target, - method, origInput string, - scanopts *ScanOptions) Result { - calls[method]++ - if strings.HasPrefix(method, "retry-") && calls[method] == 1 { - return Result{StatusCode: http.StatusTooManyRequests} - } - return Result{StatusCode: http.StatusOK} - } + output := make(chan Result) + retryCh := make(chan retryJob) - cancel, wait := r.retryLoop(context.Background(), retryCh, out, analyze) + // ctx, timeout := context.WithTimeout(context.Background(), time.Duration(r.options.Timeout)) + // defer timeout() + cancel, wait := r.retryLoop(context.Background(), retryCh, output, r.analyze) - seed := []retryJob{ - {method: "ok-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, - {method: "retry-a", when: time.Now().Add(-time.Millisecond), attempt: 1}, - {method: "ok-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, - {method: "retry-b", when: time.Now().Add(-time.Millisecond), attempt: 1}, - } - for _, j := range seed { - retryCh <- j - } + wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) + so := r.scanopts.Clone() + so.Methods = []string{"GET"} + so.TLSProbe = false + so.CSPProbe = false - want := 6 - got := make([]Result, 0, want) - deadline := time.After(2 * time.Second) + seed := map[string]string{ + "srv1": srv1.URL, + "srv2": srv2.URL, + } - for len(got) < want { - select { - case r := <-out: - got = append(got, r) - case <-deadline: - t.Errorf("timed out waiting results: got=%d want=%d", len(got), want) + var drainWG sync.WaitGroup + drainWG.Add(1) + var s1n429, s1n200, s2n429, s2n200 int + go func(output chan Result) { + defer drainWG.Done() + for res := range output { + switch res.StatusCode { + case http.StatusTooManyRequests: + if res.URL == srv1.URL { + s1n429++ + } else { + s2n429++ + } + case http.StatusOK: + if res.URL == srv1.URL { + s1n200++ + } else { + s2n200++ + } + } } + }(output) + + for _, url := range seed { + r.process(url, wg, r.hp, httpx.HTTP, so, output, retryCh) } + wg.Wait() wait() cancel() - close(retryCh) - close(out) - - var n429, n200 int - for _, r := range got { - switch r.StatusCode { - case http.StatusTooManyRequests: - n429++ - case http.StatusOK: - n200++ - } - } + close(retryCh) + close(output) + drainWG.Wait() - require.GreaterOrEqual(t, n429, 2) - require.Equal(t, 4, n200) + require.Equal(t, 3, s1n429) + require.Equal(t, 1, s1n200) + require.Equal(t, 2, s2n429) + require.Equal(t, 1, s2n200) } From ae4b2e900d23efd6a52714a6369aad0b7441cb84 Mon Sep 17 00:00:00 2001 From: jhjang Date: Wed, 13 Aug 2025 06:00:39 +0900 Subject: [PATCH 4/8] feat: add round-robin retries via Retry Rounds/Retry Delay --- runner/options.go | 4 ++++ runner/runner.go | 48 +++++++++++++++++++++++-------------------- runner/runner_test.go | 23 ++++++--------------- 3 files changed, 36 insertions(+), 39 deletions(-) diff --git a/runner/options.go b/runner/options.go index 44bda3b0..e1cb3041 100644 --- a/runner/options.go +++ b/runner/options.go @@ -765,6 +765,10 @@ func (options *Options) ValidateOptions() error { return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) } + if options.RetryRounds > 0 && options.RetryDelay <= 0 { + return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) + } + return nil } diff --git a/runner/runner.go b/runner/runner.go index a54d292d..9f003544 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "golang.org/x/exp/maps" @@ -1259,7 +1260,7 @@ func (r *Runner) RunEnumeration() { wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) retryCh := make(chan retryJob) - retryCancel, retryWait := r.retryLoop(context.Background(), retryCh, output, r.analyze) + _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) processItem := func(k string) error { if r.options.resumeCfg != nil { @@ -1302,11 +1303,9 @@ func (r *Runner) RunEnumeration() { } wg.Wait() - - retryWait() - retryCancel() - close(retryCh) - + if r.options.RetryRounds > 0 { + <-drainedCh + } close(output) wgoutput.Wait() @@ -1333,24 +1332,30 @@ type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanO func (r *Runner) retryLoop( parent context.Context, - ch chan retryJob, + retryCh chan retryJob, output chan<- Result, analyze analyzeFunc, -) (func(), func()) { +) (stop func(), drained <-chan struct{}) { + var remaining atomic.Int64 ctx, cancel := context.WithCancel(parent) - var jobWG sync.WaitGroup + drainedCh := make(chan struct{}) go func() { + defer close(retryCh) + for { select { case <-ctx.Done(): return - case job := <-ch: - jobWG.Add(1) + case job, ok := <-retryCh: + if !ok { + return + } + if job.attempt == 1 { + remaining.Add(1) + } go func(j retryJob) { - defer jobWG.Done() - if wait := time.Until(j.when); wait > 0 { timer := time.NewTimer(wait) select { @@ -1364,28 +1369,27 @@ func (r *Runner) retryLoop( res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) output <- res - if res.StatusCode == http.StatusTooManyRequests { - if j.attempt >= r.options.RetryRounds { - return - } - + if res.StatusCode == http.StatusTooManyRequests && j.attempt < r.options.RetryRounds { j.attempt++ j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) select { case <-ctx.Done(): return - case ch <- j: + case retryCh <- j: + return } } + + if remaining.Add(-1) == 0 { + close(drainedCh) + } }(job) } } }() - stop := func() { cancel() } - wait := func() { jobWG.Wait() } - return stop, wait + return func() { cancel() }, drainedCh } func logFilteredErrorPage(fileName, url string) { diff --git a/runner/runner_test.go b/runner/runner_test.go index 79bb1cbf..950b97d7 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -3,7 +3,6 @@ package runner import ( "context" "fmt" - "log" "net/http" "net/http/httptest" "os" @@ -324,41 +323,34 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { var hits1, hits2 int32 srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits1, 1) != 4 { - log.Println("serv1 429") w.WriteHeader(http.StatusTooManyRequests) return } - log.Println("serv1 200") w.WriteHeader(http.StatusOK) })) defer srv1.Close() srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits2, 1) != 3 { - log.Println("serv2 429") w.WriteHeader(http.StatusTooManyRequests) return } - log.Println("serv2 200") w.WriteHeader(http.StatusOK) })) defer srv2.Close() r, err := New(&Options{ Threads: 1, - Delay: 0, - RetryRounds: 3, - RetryDelay: 200, // Duration 권장 - Timeout: 2, + RetryRounds: 2, + RetryDelay: 5, + Timeout: 3, }) require.NoError(t, err) output := make(chan Result) retryCh := make(chan retryJob) - // ctx, timeout := context.WithTimeout(context.Background(), time.Duration(r.options.Timeout)) - // defer timeout() - cancel, wait := r.retryLoop(context.Background(), retryCh, output, r.analyze) + _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) so := r.scanopts.Clone() @@ -399,15 +391,12 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { } wg.Wait() - wait() - cancel() - - close(retryCh) + <-drainedCh close(output) drainWG.Wait() require.Equal(t, 3, s1n429) - require.Equal(t, 1, s1n200) + require.Equal(t, 0, s1n200) require.Equal(t, 2, s2n429) require.Equal(t, 1, s2n200) } From 7fe6a48eebf7148130038d8de27d044d5477fbf6 Mon Sep 17 00:00:00 2001 From: jhjang Date: Thu, 21 Aug 2025 11:32:30 +0900 Subject: [PATCH 5/8] refactor: retryLoop with atomic counter and drained channel --- out.txt | 1 - runner/options.go | 4 ---- runner/runner_test.go | 11 +++++++++-- 3 files changed, 9 insertions(+), 7 deletions(-) delete mode 100644 out.txt diff --git a/out.txt b/out.txt deleted file mode 100644 index df20805c..00000000 --- a/out.txt +++ /dev/null @@ -1 +0,0 @@ -http://localhost:8080/once-429 diff --git a/runner/options.go b/runner/options.go index e1cb3041..44bda3b0 100644 --- a/runner/options.go +++ b/runner/options.go @@ -765,10 +765,6 @@ func (options *Options) ValidateOptions() error { return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) } - if options.RetryRounds > 0 && options.RetryDelay <= 0 { - return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) - } - return nil } diff --git a/runner/runner_test.go b/runner/runner_test.go index 950b97d7..a566356d 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -321,6 +321,8 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { func TestRunner_Process_And_RetryLoop(t *testing.T) { var hits1, hits2 int32 + + // srv1: returns 429 for the first 3 requests, and 200 on the 4th request srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits1, 1) != 4 { w.WriteHeader(http.StatusTooManyRequests) @@ -330,6 +332,7 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { })) defer srv1.Close() + // srv2: returns 429 for the first 2 requests, and 200 on the 3rd request srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if atomic.AddInt32(&hits2, 1) != 3 { w.WriteHeader(http.StatusTooManyRequests) @@ -366,7 +369,7 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { var drainWG sync.WaitGroup drainWG.Add(1) var s1n429, s1n200, s2n429, s2n200 int - go func(output chan Result) { + go func() { defer drainWG.Done() for res := range output { switch res.StatusCode { @@ -384,7 +387,7 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { } } } - }(output) + }() for _, url := range seed { r.process(url, wg, r.hp, httpx.HTTP, so, output, retryCh) @@ -395,8 +398,12 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { close(output) drainWG.Wait() + // Verify expected results + // srv1: should have 3x 429 responses and no 200 (never succeeds within retries) require.Equal(t, 3, s1n429) require.Equal(t, 0, s1n200) + + // srv2: should have 2x 429 responses and 1x 200 (succeeds on 3rd attempt) require.Equal(t, 2, s2n429) require.Equal(t, 1, s2n200) } From 7b1182569778629831c5fe85e83d7f058bef4e8e Mon Sep 17 00:00:00 2001 From: Mzack9999 Date: Sun, 8 Mar 2026 00:21:16 +0100 Subject: [PATCH 6/8] adding max cap + tests --- README.md | 4 + cmd/integration-test/http.go | 120 +++++++++++++++++++++++++ runner/options.go | 13 ++- runner/runner.go | 144 +++++++++++++++++------------- runner/runner_test.go | 166 +++++++++++++++++++++++++++++++++-- 5 files changed, 379 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index 5dddd737..4ee61c90 100644 --- a/README.md +++ b/README.md @@ -267,6 +267,9 @@ OPTIMIZATIONS: -delay value duration between each http request (eg: 200ms, 1s) (default -1ns) -rsts, -response-size-to-save int max response size to save in bytes (default 512000000) -rstr, -response-size-to-read int max response size to read in bytes (default 512000000) + -retry-rounds int number of retry rounds for HTTP 429 responses (Too Many Requests) + -retry-delay int fallback delay in ms when Retry-After header is absent (HTTP 429) (default 500) + -retry-timeout int max total time in seconds for retry rounds (HTTP 429) (default 30) CLOUD: -auth configure projectdiscovery cloud (pdcp) api key (default true) @@ -310,6 +313,7 @@ For details about running httpx, see https://docs.projectdiscovery.io/tools/http username: admin password: secret ``` +- The `-retry-rounds` flag enables automatic retries for HTTP 429 (Too Many Requests) responses. When a server returns a `Retry-After` header, httpx respects it to determine the wait time before retrying. If the header is absent, the `-retry-delay` value is used as a fallback. The `-retry-timeout` flag sets a hard upper bound on total retry time to prevent indefinite blocking. - The following flags should be used for specific use cases instead of running them as default with other probes: - `-ports` - `-path` diff --git a/cmd/integration-test/http.go b/cmd/integration-test/http.go index 10179973..85b6d65a 100644 --- a/cmd/integration-test/http.go +++ b/cmd/integration-test/http.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "strings" + "sync/atomic" "github.com/julienschmidt/httprouter" "github.com/projectdiscovery/httpx/internal/testutils" @@ -33,6 +34,9 @@ var httpTestcases = map[string]testutils.TestCase{ "Output Match Condition": &outputMatchCondition{inputData: []string{"-silent", "-mdc", "\"status_code == 200\""}}, "Output Filter Condition": &outputFilterCondition{inputData: []string{"-silent", "-fdc", "\"status_code == 400\""}}, "Output All": &outputAll{}, + "Retry 429 with Retry-After header": &retry429WithHeader{}, + "Retry 429 with fallback delay": &retry429FallbackDelay{}, + "Retry 429 respects timeout": &retry429Timeout{}, } type standardHttpGet struct { @@ -419,3 +423,119 @@ func (h *outputAll) Execute() error { return nil } + +type retry429WithHeader struct{} + +func (h *retry429WithHeader) Execute() error { + var hits int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&hits, 1) + if n < 3 { + w.Header().Set("Retry-After", "1") + w.WriteHeader(http.StatusTooManyRequests) + return + } + w.WriteHeader(http.StatusOK) + _, _ = fmt.Fprint(w, "OK") + })) + defer ts.Close() + + results, err := testutils.RunHttpxAndGetResults(ts.URL, debug, + "-retry-rounds", "3", + "-retry-delay", "100", + "-retry-timeout", "15", + "-status-code", "-no-color", + ) + if err != nil { + return err + } + + var has200 bool + for _, line := range results { + if strings.Contains(line, "[200]") { + has200 = true + } + } + if !has200 { + return fmt.Errorf("expected 200 after retrying 429, got results: %v", results) + } + + totalHits := atomic.LoadInt32(&hits) + if totalHits < 3 { + return fmt.Errorf("expected at least 3 server hits (429->429->200), got %d", totalHits) + } + return nil +} + +type retry429FallbackDelay struct{} + +func (h *retry429FallbackDelay) Execute() error { + var hits int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&hits, 1) + if n < 2 { + w.WriteHeader(http.StatusTooManyRequests) + return + } + w.WriteHeader(http.StatusOK) + _, _ = fmt.Fprint(w, "OK") + })) + defer ts.Close() + + results, err := testutils.RunHttpxAndGetResults(ts.URL, debug, + "-retry-rounds", "3", + "-retry-delay", "200", + "-retry-timeout", "15", + "-status-code", "-no-color", + ) + if err != nil { + return err + } + + var has200 bool + for _, line := range results { + if strings.Contains(line, "[200]") { + has200 = true + } + } + if !has200 { + return fmt.Errorf("expected 200 after retrying 429 with fallback delay, got results: %v", results) + } + + totalHits := atomic.LoadInt32(&hits) + if totalHits < 2 { + return fmt.Errorf("expected at least 2 server hits (429->200), got %d", totalHits) + } + return nil +} + +type retry429Timeout struct{} + +func (h *retry429Timeout) Execute() error { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Retry-After", "60") + w.WriteHeader(http.StatusTooManyRequests) + })) + defer ts.Close() + + results, err := testutils.RunHttpxAndGetResults(ts.URL, debug, + "-retry-rounds", "5", + "-retry-delay", "60000", + "-retry-timeout", "3", + "-status-code", "-no-color", + ) + if err != nil { + return err + } + + var has429 bool + for _, line := range results { + if strings.Contains(line, "[429]") { + has429 = true + } + } + if !has429 { + return fmt.Errorf("expected 429 in output, got results: %v", results) + } + return nil +} diff --git a/runner/options.go b/runner/options.go index 4c8b7a37..ea1424a3 100644 --- a/runner/options.go +++ b/runner/options.go @@ -289,6 +289,7 @@ type Options struct { Resume bool RetryRounds int RetryDelay int + RetryTimeout int resumeCfg *ResumeCfg Exclude goflags.StringSlice HostMaxErrors int @@ -581,7 +582,8 @@ func ParseOptions() *Options { flagSet.IntVarP(&options.MaxResponseBodySizeToSave, "response-size-to-save", "rsts", int(httpxcommon.DefaultMaxResponseBodySize), "max response size to save in bytes"), flagSet.IntVarP(&options.MaxResponseBodySizeToRead, "response-size-to-read", "rstr", int(httpxcommon.DefaultMaxResponseBodySize), "max response size to read in bytes"), flagSet.IntVar(&options.RetryRounds, "retry-rounds", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"), - flagSet.IntVar(&options.RetryDelay, "retry-delay", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"), + flagSet.IntVar(&options.RetryDelay, "retry-delay", 500, "fallback delay in ms when Retry-After header is absent (HTTP 429)"), + flagSet.IntVar(&options.RetryTimeout, "retry-timeout", 30, "max total time in seconds for retry rounds (HTTP 429)"), ) flagSet.CreateGroup("cloud", "Cloud", @@ -844,8 +846,13 @@ func (options *Options) ValidateOptions() error { options.Threads = defaultThreads } - if options.RetryRounds > 0 && options.RetryDelay <= 0 { - return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) + if options.RetryRounds > 0 { + if options.RetryDelay <= 0 { + return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay)) + } + if options.RetryTimeout <= 0 { + return errors.New(fmt.Sprintf("invalid retry-timeout: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryTimeout)) + } } return nil diff --git a/runner/runner.go b/runner/runner.go index 3d8738ba..b4307a5c 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -1504,7 +1504,13 @@ func (r *Runner) RunEnumeration() { wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) retryCh := make(chan retryJob) - _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) + retryCtx := context.Background() + var retryCancel context.CancelFunc + if r.options.RetryRounds > 0 && r.options.RetryTimeout > 0 { + retryCtx, retryCancel = context.WithTimeout(retryCtx, time.Duration(r.options.RetryTimeout)*time.Second) + defer retryCancel() + } + _, drainedCh := r.retryLoop(retryCtx, retryCh, output, r.analyze) processItem := func(k string) error { select { @@ -1599,6 +1605,22 @@ func (r *Runner) RunEnumeration() { type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanOptions) Result +func retryDelay(res Result, fallbackMs int) time.Duration { + if res.Response != nil { + if ra, ok := res.Response.Headers["Retry-After"]; ok && len(ra) > 0 { + if seconds, err := strconv.Atoi(ra[0]); err == nil && seconds > 0 { + return time.Duration(seconds) * time.Second + } + if t, err := http.ParseTime(ra[0]); err == nil { + if d := time.Until(t); d > 0 { + return d + } + } + } + } + return time.Duration(fallbackMs) * time.Millisecond +} + func (r *Runner) retryLoop( parent context.Context, retryCh chan retryJob, @@ -1630,6 +1652,9 @@ func (r *Runner) retryLoop( select { case <-ctx.Done(): timer.Stop() + if remaining.Add(-1) == 0 { + close(drainedCh) + } return case <-timer.C: } @@ -1640,11 +1665,10 @@ func (r *Runner) retryLoop( if res.StatusCode == http.StatusTooManyRequests && j.attempt < r.options.RetryRounds { j.attempt++ - j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) + j.when = time.Now().Add(retryDelay(res, r.options.RetryDelay)) select { case <-ctx.Done(): - return case retryCh <- j: return } @@ -1754,44 +1778,44 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT wg.Add() go func(target httpx.Target, method, protocol string) { defer wg.Done() - result := r.analyze(hp, protocol, target, method, t, scanopts) - output <- result - if result.StatusCode == http.StatusTooManyRequests && - r.options.RetryRounds > 0 { - retryCh <- retryJob{ - hp: hp, - protocol: protocol, - target: target, - method: method, - origInput: t, - scanopts: scanopts.Clone(), - attempt: 1, - when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), - } + result := r.analyze(hp, protocol, target, method, t, scanopts) + output <- result + if result.StatusCode == http.StatusTooManyRequests && + r.options.RetryRounds > 0 { + retryCh <- retryJob{ + hp: hp, + protocol: protocol, + target: target, + method: method, + origInput: t, + scanopts: scanopts.Clone(), + attempt: 1, + when: time.Now().Add(retryDelay(result, r.options.RetryDelay)), } - if scanopts.TLSProbe && result.TLSData != nil { - for _, tt := range result.TLSData.SubjectAN { - if !r.testAndSet(tt) { - continue - } - r.process(tt, wg, hp, protocol, scanopts, output, retryCh) - } - if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) + } + if scanopts.TLSProbe && result.TLSData != nil { + for _, tt := range result.TLSData.SubjectAN { + if !r.testAndSet(tt) { + continue } + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) + } + if r.testAndSet(result.TLSData.SubjectCN) { + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) } - if scanopts.CSPProbe && result.CSPData != nil { - scanopts.CSPProbe = false - domains := result.CSPData.Domains - domains = append(domains, result.CSPData.Fqdns...) - for _, tt := range domains { - if !r.testAndSet(tt) { - continue - } - r.process(tt, wg, hp, protocol, scanopts, output, retryCh) + } + if scanopts.CSPProbe && result.CSPData != nil { + scanopts.CSPProbe = false + domains := result.CSPData.Domains + domains = append(domains, result.CSPData.Fqdns...) + for _, tt := range domains { + if !r.testAndSet(tt) { + continue } + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } - }(target, method, prot) + } + }(target, method, prot) } } } @@ -1821,33 +1845,33 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT urlx.UpdatePort(fmt.Sprint(port)) target.Host = urlx.String() } - result := r.analyze(hp, protocol, target, method, t, scanopts) - output <- result - if result.StatusCode == http.StatusTooManyRequests && - r.options.RetryRounds > 0 { - retryCh <- retryJob{ - hp: hp, - protocol: protocol, - target: target, - method: method, - origInput: t, - scanopts: scanopts.Clone(), - attempt: 1, - when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), - } + result := r.analyze(hp, protocol, target, method, t, scanopts) + output <- result + if result.StatusCode == http.StatusTooManyRequests && + r.options.RetryRounds > 0 { + retryCh <- retryJob{ + hp: hp, + protocol: protocol, + target: target, + method: method, + origInput: t, + scanopts: scanopts.Clone(), + attempt: 1, + when: time.Now().Add(retryDelay(result, r.options.RetryDelay)), } - if scanopts.TLSProbe && result.TLSData != nil { - for _, tt := range result.TLSData.SubjectAN { - if !r.testAndSet(tt) { - continue - } - r.process(tt, wg, hp, protocol, scanopts, output, retryCh) - } - if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) + } + if scanopts.TLSProbe && result.TLSData != nil { + for _, tt := range result.TLSData.SubjectAN { + if !r.testAndSet(tt) { + continue } + r.process(tt, wg, hp, protocol, scanopts, output, retryCh) } - }(port, target, method, wantedProtocol) + if r.testAndSet(result.TLSData.SubjectCN) { + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) + } + } + }(port, target, method, wantedProtocol) } } } diff --git a/runner/runner_test.go b/runner/runner_test.go index 92c6df6d..fe171a79 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -444,10 +444,11 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { defer srv2.Close() r, err := New(&Options{ - Threads: 1, - RetryRounds: 2, - RetryDelay: 5, - Timeout: 3, + Threads: 1, + RetryRounds: 2, + RetryDelay: 5, + RetryTimeout: 30, + Timeout: 3, }) require.NoError(t, err) @@ -499,7 +500,6 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { close(output) drainWG.Wait() - // Verify expected results // srv1: should have 3x 429 responses and no 200 (never succeeds within retries) require.Equal(t, 3, s1n429) require.Equal(t, 0, s1n200) @@ -508,3 +508,159 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { require.Equal(t, 2, s2n429) require.Equal(t, 1, s2n200) } + +func TestRetryDelay_RetryAfterHeader(t *testing.T) { + fallbackMs := 500 + + t.Run("uses Retry-After seconds header", func(t *testing.T) { + res := Result{ + Response: &httpx.Response{ + Headers: map[string][]string{ + "Retry-After": {"3"}, + }, + }, + } + d := retryDelay(res, fallbackMs) + require.Equal(t, 3*time.Second, d) + }) + + t.Run("uses Retry-After HTTP-date header", func(t *testing.T) { + future := time.Now().Add(10 * time.Second) + res := Result{ + Response: &httpx.Response{ + Headers: map[string][]string{ + "Retry-After": {future.UTC().Format(http.TimeFormat)}, + }, + }, + } + d := retryDelay(res, fallbackMs) + require.InDelta(t, 10*time.Second, d, float64(2*time.Second)) + }) + + t.Run("falls back when no header", func(t *testing.T) { + res := Result{ + Response: &httpx.Response{ + Headers: map[string][]string{}, + }, + } + d := retryDelay(res, fallbackMs) + require.Equal(t, 500*time.Millisecond, d) + }) + + t.Run("falls back when response is nil", func(t *testing.T) { + res := Result{} + d := retryDelay(res, fallbackMs) + require.Equal(t, 500*time.Millisecond, d) + }) + + t.Run("falls back when Retry-After is unparseable", func(t *testing.T) { + res := Result{ + Response: &httpx.Response{ + Headers: map[string][]string{ + "Retry-After": {"not-a-number"}, + }, + }, + } + d := retryDelay(res, fallbackMs) + require.Equal(t, 500*time.Millisecond, d) + }) +} + +func TestRetryLoop_RespectsRetryAfterHeader(t *testing.T) { + var hits int32 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&hits, 1) + if n < 3 { + w.Header().Set("Retry-After", "1") + w.WriteHeader(http.StatusTooManyRequests) + return + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + rn, err := New(&Options{ + Threads: 1, + RetryRounds: 3, + RetryDelay: 5, + RetryTimeout: 30, + Timeout: 3, + }) + require.NoError(t, err) + + output := make(chan Result, 10) + retryCh := make(chan retryJob) + + _, drainedCh := rn.retryLoop(context.Background(), retryCh, output, rn.analyze) + + wg, _ := syncutil.New(syncutil.WithSize(1)) + so := rn.scanopts.Clone() + so.Methods = []string{"GET"} + so.TLSProbe = false + so.CSPProbe = false + + rn.process(srv.URL, wg, rn.hp, httpx.HTTP, so, output, retryCh) + wg.Wait() + <-drainedCh + close(output) + + var n429, n200 int + for res := range output { + if res.StatusCode == http.StatusTooManyRequests { + n429++ + } else if res.StatusCode == http.StatusOK { + n200++ + } + } + require.Equal(t, 2, n429) + require.Equal(t, 1, n200) +} + +func TestRetryLoop_RespectsTimeout(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Retry-After", "60") + w.WriteHeader(http.StatusTooManyRequests) + })) + defer srv.Close() + + rn, err := New(&Options{ + Threads: 1, + RetryRounds: 10, + RetryDelay: 60000, + RetryTimeout: 2, + Timeout: 3, + }) + require.NoError(t, err) + + output := make(chan Result, 100) + retryCh := make(chan retryJob) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, drainedCh := rn.retryLoop(ctx, retryCh, output, rn.analyze) + + wg, _ := syncutil.New(syncutil.WithSize(1)) + so := rn.scanopts.Clone() + so.Methods = []string{"GET"} + so.TLSProbe = false + so.CSPProbe = false + + start := time.Now() + rn.process(srv.URL, wg, rn.hp, httpx.HTTP, so, output, retryCh) + wg.Wait() + <-drainedCh + close(output) + elapsed := time.Since(start) + + require.Less(t, elapsed, 10*time.Second, "retry loop should have been cancelled by timeout") + + var n429 int + for res := range output { + if res.StatusCode == http.StatusTooManyRequests { + n429++ + } + } + require.GreaterOrEqual(t, n429, 1, "should have received at least the initial 429") +} From 8d424bea47cc62355bea949d1f8390112349962f Mon Sep 17 00:00:00 2001 From: Mzack9999 Date: Sun, 8 Mar 2026 00:23:09 +0100 Subject: [PATCH 7/8] lint --- runner/runner_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runner/runner_test.go b/runner/runner_test.go index fe171a79..ed8cb415 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -607,9 +607,10 @@ func TestRetryLoop_RespectsRetryAfterHeader(t *testing.T) { var n429, n200 int for res := range output { - if res.StatusCode == http.StatusTooManyRequests { + switch res.StatusCode { + case http.StatusTooManyRequests: n429++ - } else if res.StatusCode == http.StatusOK { + case http.StatusOK: n200++ } } From c4d5a294b085065c6be33736a5f9f941cf64e211 Mon Sep 17 00:00:00 2001 From: Mzack9999 Date: Sun, 8 Mar 2026 00:54:56 +0100 Subject: [PATCH 8/8] use batches --- runner/runner.go | 181 ++++++++++++++++++------------------------ runner/runner_test.go | 135 +++++++++++++++++++------------ runner/types.go | 11 --- 3 files changed, 163 insertions(+), 164 deletions(-) diff --git a/runner/runner.go b/runner/runner.go index b4307a5c..f5f23503 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -23,7 +23,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "golang.org/x/exp/maps" @@ -1502,15 +1501,7 @@ func (r *Runner) RunEnumeration() { }(nextStep) wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) - retryCh := make(chan retryJob) - - retryCtx := context.Background() - var retryCancel context.CancelFunc - if r.options.RetryRounds > 0 && r.options.RetryTimeout > 0 { - retryCtx, retryCancel = context.WithTimeout(retryCtx, time.Duration(r.options.RetryTimeout)*time.Second) - defer retryCancel() - } - _, drainedCh := r.retryLoop(retryCtx, retryCh, output, r.analyze) + var rq retryQueue processItem := func(k string) error { select { @@ -1541,10 +1532,10 @@ func (r *Runner) RunEnumeration() { for _, p := range r.options.requestURIs { scanopts := r.scanopts.Clone() scanopts.RequestURI = p - r.process(k, wg, r.hp, protocol, scanopts, output, retryCh) + r.process(k, wg, r.hp, protocol, scanopts, output, &rq) } } else { - r.process(k, wg, r.hp, protocol, &r.scanopts, output, retryCh) + r.process(k, wg, r.hp, protocol, &r.scanopts, output, &rq) } } } @@ -1576,9 +1567,17 @@ func (r *Runner) RunEnumeration() { } wg.Wait() + if r.options.RetryRounds > 0 { - <-drainedCh + retryCtx := context.Background() + if r.options.RetryTimeout > 0 { + var cancel context.CancelFunc + retryCtx, cancel = context.WithTimeout(retryCtx, time.Duration(r.options.RetryTimeout)*time.Second) + defer cancel() + } + r.processRetries(retryCtx, &rq, output) } + close(output) wgoutput.Wait() @@ -1603,8 +1602,6 @@ func (r *Runner) RunEnumeration() { } } -type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanOptions) Result - func retryDelay(res Result, fallbackMs int) time.Duration { if res.Response != nil { if ra, ok := res.Response.Headers["Retry-After"]; ok && len(ra) > 0 { @@ -1621,68 +1618,60 @@ func retryDelay(res Result, fallbackMs int) time.Duration { return time.Duration(fallbackMs) * time.Millisecond } -func (r *Runner) retryLoop( - parent context.Context, - retryCh chan retryJob, - output chan<- Result, - analyze analyzeFunc, -) (stop func(), drained <-chan struct{}) { - var remaining atomic.Int64 - ctx, cancel := context.WithCancel(parent) - drainedCh := make(chan struct{}) +type retryItem struct { + hp *httpx.HTTPX + protocol string + target httpx.Target + method string + input string + scanopts *ScanOptions + delay time.Duration +} - go func() { - defer close(retryCh) +type retryQueue struct { + mu sync.Mutex + items []retryItem +} - for { +func (q *retryQueue) push(item retryItem) { + q.mu.Lock() + q.items = append(q.items, item) + q.mu.Unlock() +} + +func (q *retryQueue) drain() []retryItem { + q.mu.Lock() + items := q.items + q.items = nil + q.mu.Unlock() + return items +} + +func (r *Runner) processRetries(ctx context.Context, rq *retryQueue, output chan Result) { + for round := 0; round < r.options.RetryRounds; round++ { + items := rq.drain() + if len(items) == 0 { + return + } + for _, item := range items { + timer := time.NewTimer(item.delay) select { case <-ctx.Done(): + timer.Stop() return - case job, ok := <-retryCh: - if !ok { - return - } - if job.attempt == 1 { - remaining.Add(1) - } - - go func(j retryJob) { - if wait := time.Until(j.when); wait > 0 { - timer := time.NewTimer(wait) - select { - case <-ctx.Done(): - timer.Stop() - if remaining.Add(-1) == 0 { - close(drainedCh) - } - return - case <-timer.C: - } - } - - res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) - output <- res - - if res.StatusCode == http.StatusTooManyRequests && j.attempt < r.options.RetryRounds { - j.attempt++ - j.when = time.Now().Add(retryDelay(res, r.options.RetryDelay)) - - select { - case <-ctx.Done(): - case retryCh <- j: - return - } - } - - if remaining.Add(-1) == 0 { - close(drainedCh) - } - }(job) + case <-timer.C: + } + result := r.analyze(item.hp, item.protocol, item.target, item.method, item.input, item.scanopts) + output <- result + if result.StatusCode == http.StatusTooManyRequests { + rq.push(retryItem{ + hp: item.hp, protocol: item.protocol, target: item.target, + method: item.method, input: item.input, scanopts: item.scanopts, + delay: retryDelay(result, r.options.RetryDelay), + }) } } - }() - - return func() { cancel() }, drainedCh + } } func handleStripAnsiCharacters(data string, skip bool) string { @@ -1751,11 +1740,11 @@ func (r *Runner) GetScanOpts() ScanOptions { return r.scanopts } -func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { - r.process(t, wg, r.hp, protocol, scanopts, output, retryCh) +func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result, rq *retryQueue) { + r.process(t, wg, r.hp, protocol, scanopts, output, rq) } -func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { +func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result, rq *retryQueue) { // attempts to set the workpool size to the number of threads if r.options.Threads > 0 && wg.Size != r.options.Threads { if err := wg.Resize(context.Background(), r.options.Threads); err != nil { @@ -1780,28 +1769,22 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT defer wg.Done() result := r.analyze(hp, protocol, target, method, t, scanopts) output <- result - if result.StatusCode == http.StatusTooManyRequests && - r.options.RetryRounds > 0 { - retryCh <- retryJob{ - hp: hp, - protocol: protocol, - target: target, - method: method, - origInput: t, - scanopts: scanopts.Clone(), - attempt: 1, - when: time.Now().Add(retryDelay(result, r.options.RetryDelay)), - } + if result.StatusCode == http.StatusTooManyRequests && r.options.RetryRounds > 0 && rq != nil { + rq.push(retryItem{ + hp: hp, protocol: protocol, target: target, + method: method, input: t, scanopts: scanopts.Clone(), + delay: retryDelay(result, r.options.RetryDelay), + }) } if scanopts.TLSProbe && result.TLSData != nil { for _, tt := range result.TLSData.SubjectAN { if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output, retryCh) + r.process(tt, wg, hp, protocol, scanopts, output, rq) } if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, rq) } } if scanopts.CSPProbe && result.CSPData != nil { @@ -1812,7 +1795,7 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output, retryCh) + r.process(tt, wg, hp, protocol, scanopts, output, rq) } } }(target, method, prot) @@ -1847,28 +1830,22 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT } result := r.analyze(hp, protocol, target, method, t, scanopts) output <- result - if result.StatusCode == http.StatusTooManyRequests && - r.options.RetryRounds > 0 { - retryCh <- retryJob{ - hp: hp, - protocol: protocol, - target: target, - method: method, - origInput: t, - scanopts: scanopts.Clone(), - attempt: 1, - when: time.Now().Add(retryDelay(result, r.options.RetryDelay)), - } + if result.StatusCode == http.StatusTooManyRequests && r.options.RetryRounds > 0 && rq != nil { + rq.push(retryItem{ + hp: hp, protocol: protocol, target: target, + method: method, input: t, scanopts: scanopts.Clone(), + delay: retryDelay(result, r.options.RetryDelay), + }) } if scanopts.TLSProbe && result.TLSData != nil { for _, tt := range result.TLSData.SubjectAN { if !r.testAndSet(tt) { continue } - r.process(tt, wg, hp, protocol, scanopts, output, retryCh) + r.process(tt, wg, hp, protocol, scanopts, output, rq) } if r.testAndSet(result.TLSData.SubjectCN) { - r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) + r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, rq) } } }(port, target, method, wantedProtocol) diff --git a/runner/runner_test.go b/runner/runner_test.go index ed8cb415..8bf6e4c1 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -7,7 +7,6 @@ import ( "net/http/httptest" "os" "strings" - "sync" "sync/atomic" "testing" "time" @@ -451,11 +450,10 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { Timeout: 3, }) require.NoError(t, err) + defer r.Close() - output := make(chan Result) - retryCh := make(chan retryJob) - - _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) + output := make(chan Result, 20) + var rq retryQueue wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) so := r.scanopts.Clone() @@ -468,43 +466,35 @@ func TestRunner_Process_And_RetryLoop(t *testing.T) { "srv2": srv2.URL, } - var drainWG sync.WaitGroup - drainWG.Add(1) - var s1n429, s1n200, s2n429, s2n200 int - go func() { - defer drainWG.Done() - for res := range output { - switch res.StatusCode { - case http.StatusTooManyRequests: - if res.URL == srv1.URL { - s1n429++ - } else { - s2n429++ - } - case http.StatusOK: - if res.URL == srv1.URL { - s1n200++ - } else { - s2n200++ - } - } - } - }() - for _, url := range seed { - r.process(url, wg, r.hp, httpx.HTTP, so, output, retryCh) + r.process(url, wg, r.hp, httpx.HTTP, so, output, &rq) } wg.Wait() - <-drainedCh + r.processRetries(context.Background(), &rq, output) close(output) - drainWG.Wait() - // srv1: should have 3x 429 responses and no 200 (never succeeds within retries) + var s1n429, s1n200, s2n429, s2n200 int + for res := range output { + switch res.StatusCode { + case http.StatusTooManyRequests: + if res.URL == srv1.URL { + s1n429++ + } else { + s2n429++ + } + case http.StatusOK: + if res.URL == srv1.URL { + s1n200++ + } else { + s2n200++ + } + } + } + require.Equal(t, 3, s1n429) require.Equal(t, 0, s1n200) - // srv2: should have 2x 429 responses and 1x 200 (succeeds on 3rd attempt) require.Equal(t, 2, s2n429) require.Equal(t, 1, s2n200) } @@ -566,7 +556,7 @@ func TestRetryDelay_RetryAfterHeader(t *testing.T) { }) } -func TestRetryLoop_RespectsRetryAfterHeader(t *testing.T) { +func TestRetryRespectsRetryAfterHeader(t *testing.T) { var hits int32 srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -588,11 +578,10 @@ func TestRetryLoop_RespectsRetryAfterHeader(t *testing.T) { Timeout: 3, }) require.NoError(t, err) + defer rn.Close() output := make(chan Result, 10) - retryCh := make(chan retryJob) - - _, drainedCh := rn.retryLoop(context.Background(), retryCh, output, rn.analyze) + var rq retryQueue wg, _ := syncutil.New(syncutil.WithSize(1)) so := rn.scanopts.Clone() @@ -600,9 +589,12 @@ func TestRetryLoop_RespectsRetryAfterHeader(t *testing.T) { so.TLSProbe = false so.CSPProbe = false - rn.process(srv.URL, wg, rn.hp, httpx.HTTP, so, output, retryCh) + rn.process(srv.URL, wg, rn.hp, httpx.HTTP, so, output, &rq) wg.Wait() - <-drainedCh + + start := time.Now() + rn.processRetries(context.Background(), &rq, output) + elapsed := time.Since(start) close(output) var n429, n200 int @@ -616,9 +608,10 @@ func TestRetryLoop_RespectsRetryAfterHeader(t *testing.T) { } require.Equal(t, 2, n429) require.Equal(t, 1, n200) + require.GreaterOrEqual(t, elapsed, 2*time.Second, "should have waited ~1s per Retry-After") } -func TestRetryLoop_RespectsTimeout(t *testing.T) { +func TestRetryRespectsTimeout(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Retry-After", "60") w.WriteHeader(http.StatusTooManyRequests) @@ -633,14 +626,10 @@ func TestRetryLoop_RespectsTimeout(t *testing.T) { Timeout: 3, }) require.NoError(t, err) + defer rn.Close() output := make(chan Result, 100) - retryCh := make(chan retryJob) - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - _, drainedCh := rn.retryLoop(ctx, retryCh, output, rn.analyze) + var rq retryQueue wg, _ := syncutil.New(syncutil.WithSize(1)) so := rn.scanopts.Clone() @@ -648,14 +637,18 @@ func TestRetryLoop_RespectsTimeout(t *testing.T) { so.TLSProbe = false so.CSPProbe = false - start := time.Now() - rn.process(srv.URL, wg, rn.hp, httpx.HTTP, so, output, retryCh) + rn.process(srv.URL, wg, rn.hp, httpx.HTTP, so, output, &rq) wg.Wait() - <-drainedCh - close(output) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + rn.processRetries(ctx, &rq, output) elapsed := time.Since(start) + close(output) - require.Less(t, elapsed, 10*time.Second, "retry loop should have been cancelled by timeout") + require.Less(t, elapsed, 10*time.Second, "retry should have been cancelled by context timeout") var n429 int for res := range output { @@ -665,3 +658,43 @@ func TestRetryLoop_RespectsTimeout(t *testing.T) { } require.GreaterOrEqual(t, n429, 1, "should have received at least the initial 429") } + +func TestRetryNo429_CompletesNormally(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + rn, err := New(&Options{ + Threads: 1, + RetryRounds: 3, + RetryDelay: 500, + RetryTimeout: 30, + Timeout: 3, + }) + require.NoError(t, err) + defer rn.Close() + + output := make(chan Result, 10) + var rq retryQueue + + wg, _ := syncutil.New(syncutil.WithSize(1)) + so := rn.scanopts.Clone() + so.Methods = []string{"GET"} + so.TLSProbe = false + so.CSPProbe = false + + rn.process(srv.URL, wg, rn.hp, httpx.HTTP, so, output, &rq) + wg.Wait() + rn.processRetries(context.Background(), &rq, output) + close(output) + + var n200 int + for res := range output { + if res.StatusCode == http.StatusOK { + n200++ + } + } + require.Equal(t, 1, n200) + require.Empty(t, rq.items, "no retries should have been queued") +} diff --git a/runner/types.go b/runner/types.go index 855c19fa..1eaa3608 100644 --- a/runner/types.go +++ b/runner/types.go @@ -125,17 +125,6 @@ type Trace struct { WroteRequest time.Time `json:"wrote_request,omitempty"` } -type retryJob struct { - hp *httpx.HTTPX - protocol string - target httpx.Target - method string - origInput string - scanopts *ScanOptions - attempt int - when time.Time -} - // function to get dsl variables from result struct func dslVariables() ([]string, error) { fakeResult := Result{}