From 57237eef8e293d9f4b2ea787db4f6ca6401e38d6 Mon Sep 17 00:00:00 2001 From: Siddhartha Basu Date: Sun, 24 May 2026 16:04:44 +0000 Subject: [PATCH 1/4] feat(interpro): add concurrent-scan subcommand with semaphore-bounded worker pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Process FASTA records concurrently (default 25 in-flight) through the InterProScan submitβ†’pollβ†’downloadβ†’save pipeline using golang.org/x/sync semaphore for bounded concurrency with best-effort error collection. πŸ’˜ Generated with Crush Assisted-by: Crush:deepseek/deepseek-v4-pro --- cmd/interpro-cli/main.go | 46 ++ docs/concurrent-scan-plan.md | 948 +++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + internal/interpro/scan_concurrent.go | 164 +++++ internal/interpro/scan_model.go | 1 + 6 files changed, 1162 insertions(+) create mode 100644 docs/concurrent-scan-plan.md create mode 100644 internal/interpro/scan_concurrent.go diff --git a/cmd/interpro-cli/main.go b/cmd/interpro-cli/main.go index 12d5b27..706d926 100644 --- a/cmd/interpro-cli/main.go +++ b/cmd/interpro-cli/main.go @@ -88,6 +88,52 @@ func main() { }, Action: interpro.Scan, }, + { + Name: "concurrent-scan", + Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "fasta", + Aliases: []string{"f"}, + Usage: "Path to FASTA file (supports multi-FASTA)", + }, + &cli.StringFlag{ + Name: "email", + Aliases: []string{"e"}, + Sources: cli.EnvVars("EBI_EMAIL"), + Usage: "Email for EMBL-EBI Job Dispatcher (required)", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: ".", + Usage: "Output directory for JSON results", + }, + &cli.StringFlag{ + Name: "seq-type", + Aliases: []string{"s"}, + Value: "p", + Usage: "Sequence type: p (protein) or n (nucleotide)", + }, + &cli.DurationFlag{ + Name: "poll-interval", + Value: defaultPollInterval, + Usage: "How often to check job status", + }, + &cli.DurationFlag{ + Name: "timeout", + Value: defaultTimeout, + Usage: "Maximum time to wait for a single job", + }, + &cli.IntFlag{ + Name: "concurrency", + Aliases: []string{"c"}, + Value: 25, + Usage: "Maximum number of concurrent InterProScan jobs", + }, + }, + Action: interpro.ConcurrentScan, + }, }, } diff --git a/docs/concurrent-scan-plan.md b/docs/concurrent-scan-plan.md new file mode 100644 index 0000000..4d46c3e --- /dev/null +++ b/docs/concurrent-scan-plan.md @@ -0,0 +1,948 @@ +# Concurrent Scan Subcommand β€” Architecture & Implementation Plan + +> **Status:** Draft β€” for review and discussion +> **Target:** `scan` subcommand of `interpro-manager` +> **Goal:** Process FASTA records concurrently (max 25 in-flight) through the InterProScan submitβ†’pollβ†’downloadβ†’save pipeline + + +--- + +## 1. Current Architecture Summary + +### 1.1 The Scan Pipeline (Sequential) + +``` +CLI flags (fasta, email, output, seq-type, poll-interval, timeout) + β”‚ + β”œβ”€ extractScanRequest β†’ ScanRequest struct + β”œβ”€ validateScanRequest β†’ Either[error, ScanRequest] + β”‚ β”œβ”€ validate email (format + parse) + β”‚ β”œβ”€ validate fasta path + β”‚ └─ stat the file to confirm it exists + β”‚ + β”œβ”€ IOE.FromEither β†’ lift into IOEither + β”œβ”€ IOE.Map(makeHTTPClient) β†’ bundle http.Client + ScanRequest into SubmitArgs + β”œβ”€ IOE.ChainFirst(mkdir) β†’ ensure output dir exists + β”‚ + └─ streamFastaRecords β†’ IOEither[error, []string] + β”‚ + └─ for each Fasta record (sequential, via channel): + β”‚ + β”œβ”€ IOE.FromEither(record) β†’ lift parsed record + β”œβ”€ IOE.Map(to SubmitInput) β†’ bundle client + config + record + β”œβ”€ IOE.Chain(buildSubmit) β†’ POST /run β†’ SubmittedJob + β”œβ”€ IOE.Chain(pollJob) β†’ GET /status until FINISHED β†’ CompletedJob + β”œβ”€ IOE.Chain(downloadAndSave) β†’ GET /result + write JSON file β†’ string (path) + β”œβ”€ toEither β†’ execute the IOEither + └─ E.Fold β†’ collect path or capture error + β”‚ + └─ first error aborts the stream; returns all collected paths otherwise +``` + +### 1.2 Files involved in the scan subcommand + +| File | Purpose | +|------|---------| +| `cmd/interpro-cli/main.go:51-90` | CLI flag definitions, wires `interpro.Scan` as Action | +| `internal/interpro/scan.go` | Top-level `Scan` entry point, validation, orchestration | +| `internal/interpro/scan_model.go` | Type definitions: `ScanRequest`, `SubmittedJob`, `CompletedJob`, `SubmitArgs`, `SubmitInput` | +| `internal/interpro/scan_loop.go` | `streamFastaRecords` β€” sequential loop driver | +| `internal/interpro/scan_submit.go` | `buildSubmitRequester` β€” POST /run β†’ `SubmittedJob` | +| `internal/interpro/scan_poll.go` | `pollJob`, `tickOnce`, `buildStatusHandler`, `getJobStatus` β€” polling loop | +| `internal/interpro/scan_result.go` | `downloadAndSave`, `saveResult` β€” GET /result + file write | +| `internal/interpro/extract.go:41-46` | `extractSeqID` β€” extracts sequence ID from FASTA header | +| `internal/interpro/client.go:14-16` | `toEither` β€” executes an IOEither lazily | +| `internal/seqio/fasta.go` | `ParseFASTA` β€” lazy FASTA parser returning `SeqResult[Fasta]` channel | + +### 1.3 Key Type Definitions + +```go +// From scan_model.go +type ScanRequest struct { + FastaPath string + Email string + OutputDir string + SeqType string + BaseURL string + PollInterval time.Duration + Timeout time.Duration +} + +type SubmittedJob struct { + JobID string + SeqID string + Client ioehttp.Client + Config ScanRequest +} + +type CompletedJob struct { + JobID string + SeqID string + Client ioehttp.Client + Config ScanRequest +} + +type SubmitArgs = T.Tuple2[ioehttp.Client, ScanRequest] +type SubmitInput = T.Tuple3[ioehttp.Client, ScanRequest, seqio.Fasta] +``` + +### 1.4 The Per-Record Functional Pipeline (what we want to reuse) + +The core processing for a single record is a clean IOEither pipeline: + +``` +seqio.Fasta record + β†’ buildSubmitRequester(SubmitInput) : IOEither[error, SubmittedJob] + β†’ pollJob(SubmittedJob) : IOEither[error, CompletedJob] + β†’ downloadAndSave(CompletedJob) : IOEither[error, string] // path to saved JSON +``` + +This pipeline is **already correct, well-tested, and purely functional**. The concurrency layer should **wrap** this pipeline, not modify it. + +### 1.5 The FASTA Parser + +`ParseFASTA(path)` returns `IR.SeqResult[Fasta]` β€” a channel-based lazy iterator. Each element is `Either[error, Fasta]`. The file is lazily read and closed when iteration completes. The channel **must be consumed from a single goroutine** (Go channels are not multi-consumer). + + +--- + +## 2. Problem Statement + +The current `streamFastaRecords` processes records **one at a time, +sequentially**. For a FASTA file with N records: + +- Submission of record `i+1` waits for submission, polling, and download of record `i` to complete. +- Each record spends most of its time **waiting** β€” polling an HTTP endpoint until the server-side analysis finishes. +- Total runtime β‰ˆ N Γ— (submit_time + poll_wait_time + download_time). + +For example, if polling takes 5 minutes per job and there are 100 records: +- Sequential: ~500 minutes (8+ hours) +- Concurrent (25 at a time in 4 batches): ~20 minutes (4 Γ— 5 minutes) + +The interproscan REST API supports concurrent job submissions β€” there is no +server-side restriction preventing parallel requests. The only constraints are +client-side: API rate limits, network bandwidth, and local resources. + +### 2.1 Key Concurrency Opportunities + +| Phase | Concurrency Opportunity | Blocking? | +|-------|------------------------|-----------| +| **Submit** (`POST /run`) | High β€” each submission is an independent HTTP POST. Can send all 25 at once. | Short (seconds) | +| **Poll** (`GET /status`) | High β€” each job polls independently. Most time is waiting for server-side processing. | Long (minutes) | +| **Download** (`GET /result`) | Medium β€” each download is an independent HTTP GET. | Short (seconds) | +| **Save** (write JSON file) | Medium β€” independent file writes to separate files. | Short (milliseconds) | + + +--- + +## 3. Design Goals & Constraints + +### 3.1 Non-negotiable Constraints + +1. **Preserve the per-record functional pipeline.** `buildSubmitRequester β†’ pollJob β†’ downloadAndSave` remains untouched as a pure IOEither composition. +2. **New subcommand `concurrent-scan`.** A separate subcommand with its own flag set. Does not modify the existing `scan` subcommand. +3. **Preserve the existing error model.** Errors are propagated through the `Either[error, A]` / `IOEither[error, A]` monad stack. +4. **Max 25 concurrent in-flight jobs.** This is the API's informal guidance (or configurable). +5. **Each record saves to its own JSON file.** Same file naming convention: `{SeqID}_{JobID}.json`. +6. **fp-go everywhere.** The per-record pipeline stays in fp-go. The orchestration may use Go primitives but should present an fp-go-compatible interface. +7. **Stream the FASTA file β€” never slurp it into memory.** The parser (`ParseFASTA`) is a channel-based lazy iterator. Records must be consumed as they are parsed, not collected into a `[]seqio.Fasta` slice. FASTA files can contain tens of thousands of sequences (genome-scale); loading them into memory upfront is unacceptable. + +### 3.2 Design Goals + +1. **Minimal refactoring.** The submission, polling, download, and save functions should not change at all. +2. **Clear separation of concerns.** The concurrency orchestration lives in its own file, separate from the pure functional pipeline. +3. **Composable error handling.** Errors should compose: per-record errors should be collectable or fail-fast, depending on configuration. +4. **Deterministic file output.** Each file path depends on `SeqID + JobID`, which are inherently unique β€” no race conditions on file writes. +5. **Testable in isolation.** The concurrency orchestrator should be testable with a mock HTTP server, just like the existing tests. + +### 3.3 Open Design Questions (for you to answer) + +| # | Question | Options | +|---|----------|---------| +| **Q1** | Should concurrency limit be a CLI flag (`--concurrency`)? | Default 25, configurable, or hard-coded 25 | +| **Q2** | Error handling: fail-fast (first error cancels all) or best-effort (collect all errors, report at end)? | Fail-fast is simpler and matches current sequential behavior. Best-effort is more robust for large batches. | +| **Q3** | Result ordering: should output paths be reported in FASTA order, or is any order acceptable? | Any order is simpler (no ordering overhead). FASTA order requires sorting/buffering. | +| **Q4** | New subcommand (`concurrent-scan`) or replace existing `scan`? | New subcommand allows side-by-side comparison. Replacing `scan` is cleaner long-term. | +| **Q5** | Should an aggregated summary be printed (e.g., "25/25 succeeded", "3 failed")? | Yes, this is valuable for large batches. Current sequential version just prints each file path. | + + +--- + +## 4. Detailed Approach Analysis + +### 4.1 Approach A: `TraverseArrayPar` with Batch Chunking β€” ❌ ELIMINATED + +> **Eliminated** because `TraverseArrayPar` requires a `[]seqio.Fasta` slice β€” it cannot consume a channel-based iterator. This forces slurp-first semantics, which violates constraint #7. +> +> Additional issues: no concurrency bound (all 25 launch simultaneously within a batch), no context cancellation, and all-or-nothing batch error semantics. Even if the FASTA file is small, these deeper limitations make it unsuitable for production use. + +--- + +### 4.2 Approach B: `TraverseArrayPar` with Semaphore β€” ❌ ELIMINATED + +> **Eliminated** for the same reason as Approach A: `TraverseArrayPar` requires slurping the FASTA file into a `[]seqio.Fasta` slice. The semaphore addresses the concurrency-bounding problem but does not change the fundamental requirement that all records be loaded into memory before processing begins. + +--- + +### 4.3 Approach C: Semaphore-Bounded Worker Pool (SELECTED) + +**Core idea:** Use a `semaphore.Weighted` for bounded concurrency (max 25 in-flight) with Go's standard `sync.WaitGroup` for worker coordination. Each worker runs the functional IOEither pipeline for one record. Errors are collected per-record (best-effort mode) rather than cancelling on first failure. + +**How it works:** +1. A single goroutine reads FASTA records from the channel and dispatches tasks to workers. +2. Each dispatch acquires a semaphore token (blocks if 25 workers are in-flight), then launches a goroutine via `sync.WaitGroup`. +3. Each worker processes one record through the existing IOEither pipeline, releases the semaphore on completion. +4. Per-record errors are collected into a mutex-protected error slice (best-effort β€” processing continues after individual failures). +5. After all workers finish, results and errors are aggregated into a summary. + +**Pseudo-design (not actual code):** + +``` +// New file: internal/interpro/scan_concurrent.go + +func streamFastaRecordsConcurrent(args SubmitArgs) IOE.IOEither[error, []string] { + return IOE.TryCatchError(func() ([]string, error) { + sem := semaphore.NewWeighted(int64(args.F2.Concurrency)) + + var wg sync.WaitGroup + var mu sync.Mutex + var results []string + var errors []string + + // Reader loop: dispatch FASTA records to workers + for record := range seqio.ParseFASTA(args.F2.FastaPath) { + // Handle parse error β€” per-record, continue to next + if record is Left: + mu.Lock() + errors = append(errors, formatParseError(record.Err)) + mu.Unlock() + continue + + rec := record.Right + + // Block if N workers are already running + if err := sem.Acquire(context.Background(), 1); err != nil { + return nil, err + } + + wg.Add(1) + go func(r seqio.Fasta) { + defer sem.Release(1) + defer wg.Done() + + // Run the functional pipeline + result := F.Pipe4( + T.MakeTuple3(args.F1, args.F2, r), + buildSubmitRequester, + IOE.Chain(pollJob), + IOE.Chain(downloadAndSave), + toEither[error, string], + ) + + E.Fold( + func(err error) { + mu.Lock() + errors = append(errors, + fmt.Sprintf("seq %s: %v", extractSeqID(r), err)) + mu.Unlock() + }, + func(path string) { + mu.Lock() + results = append(results, path) + mu.Unlock() + }, + )(result) + }(rec) + } + + // Wait for all workers to finish + wg.Wait() + + // Return aggregated results + return results, fmt.Errorf(join errors if any) + }) +} +``` + +**Pros:** +- βœ“ Bounded concurrency via semaphore +- βœ“ Per-record error collection (best-effort) β€” a single failure doesn't abort other in-flight work +- βœ“ Each worker runs the exact same functional IOEither pipeline β€” zero changes to submit/poll/download/save +- βœ“ Results collected in a thread-safe way +- βœ“ Clear separation: the orchestrator (`scan_concurrent.go`) is imperative Go; the per-record processing is pure fp-go +- βœ“ Streams FASTA records (no need to slurp entire file) +- βœ“ Well-established Go pattern, easy to understand and maintain + +**Cons:** +- βœ— Mixes Go concurrency primitives with fp-go (but this is intentional and bounded) +- βœ— More imperative code than a pure fp-go approach +- βœ— Result ordering is non-deterministic (depends on which jobs finish first) +- βœ— No built-in cancellation β€” once dispatched, a worker runs to completion even if many others fail (acceptable for best-effort mode) + +**Best for:** Production use, large FASTA files, when you need robust per-record error handling. + +--- + +### 4.4 Approach D: Channel Fan-Out / Fan-In (PURE GO) + +**Core idea:** Classic Go concurrency pattern. One producer goroutine sends FASTA records into a channel. N worker goroutines consume from the channel. Results are collected via a result channel. + +**How it works:** +1. Create a buffered channel for FASTA records (acts as a work queue). +2. Launch 25 worker goroutines that read from the channel and process records. +3. Each worker runs the IOEither pipeline for its record. +4. Workers send results (path or error) to a result channel. +5. A collector goroutine aggregates results from the result channel. + +**Pseudo-design (not actual code):** + +``` +records := make(chan seqio.Fasta, 25) // work queue +results := make(chan result, 25) // result collector + +// Producer: read FASTA, feed into channel +go func() { + for r := range ParseFASTA(path) { records <- r } + close(records) +}() + +// Workers: 25 goroutines consuming from records channel +var wg sync.WaitGroup +for i := 0; i < 25; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for rec := range records { + // run pipeline, send to results channel + } + }() +} + +// Closer: wait for workers, close results channel +go func() { wg.Wait(); close(results) }() + +// Collector: aggregate results +for r := range results { ... } +``` + +**Pros:** +- βœ“ Classic, well-understood pattern +- βœ“ Natural backpressure via buffered channels +- βœ“ Streaming β€” no need to slurp entire FASTA file + +**Cons:** +- βœ— More boilerplate than errgroup +- βœ— Error handling is manual β€” no built-in cancellation on first error +- βœ— Worker lifecycle management is explicit and verbose +- βœ— No built-in context propagation +- βœ— Harder to reason about than errgroup (multiple channels, goroutines, WaitGroups) + +**Best for:** When you want maximum control over the concurrency model, or when errgroup is not available. + +--- + +### 4.5 Approach E: ReaderIOResult + TraverseArray β€” ❌ ELIMINATED + +> **Eliminated.** `RIO.TraverseArray` has the same structural requirement as `TraverseArrayPar`: it operates on `[]A` slices, not on lazy iterators. Converting the entire HTTP layer to `ReaderIOResult` is a major refactoring that still doesn't solve the streaming problem β€” it would still require slurping. The combination of high effort + no streaming makes this non-viable. +> +> **Original idea (retained for reference):** Convert the entire HTTP layer from `IOEither` to `ReaderIOResult` (which carries `context.Context`). Then use `RIO.TraverseArray` for parallel execution with context cancellation support. +> +> **Original pros/cons:** +> - βœ“ Most idiomatic fp-go approach for HTTP with context +> - βœ“ Context propagation is built into the type +> - βœ— Requires rewriting the entire HTTP layer (submit, poll, download) +> - βœ— Major refactoring β€” high risk, high effort +> - βœ— Still requires `[]A` β€” no streaming support +> +> **Best for:** (Not recommended) β€” only viable if the project plans to adopt `ReaderIOResult`/`Effect` more broadly AND the FASTA file is guaranteed small. + +--- + +### 4.7 Comparison Matrix (Viable Approaches Only) + +| Criterion | C: Semaphore Worker Pool | D: Channel Fan-out | +|---|---|:---:|:---:| +| Functional purity | β˜…β˜…β˜…β˜†β˜† | β˜…β˜…β˜†β˜†β˜† | +| Bounded concurrency | β˜…β˜…β˜…β˜…β˜… | β˜…β˜…β˜…β˜…β˜… | +| Error handling | Per-record (best-effort) | Manual | +| Streaming FASTA | βœ“ | βœ“ | +| Reuses existing pipeline | βœ“ | βœ“ | +| Code changes required | Low | Low | +| Test complexity | Medium | Medium | +| Production readiness | β˜… Best | Good | + +> **Approaches A, B, and E eliminated** β€” all three require slurping the FASTA file into memory (see Β§4.1, Β§4.2, Β§4.5). + + +--- + +## 5. Error Handling Architecture + +### 5.1 Error Taxonomy + +In the concurrent scan pipeline, errors can originate from multiple sources: + +| Error Source | Type | Example | +|-------------|------|---------| +| **FASTA parse error** | `Left(error)` from `ParseFASTA` | Malformed FASTA, stray sequence data | +| **Submission failure** | HTTP error from `buildSubmitRequester` | 500 from API, network timeout, invalid sequence | +| **Poll timeout** | Timeout error from `pollJob` | Job runs longer than `--timeout` | +| **Poll error status** | Status error from `pollJob` | API returns `FAILED`, `NOT_FOUND`, etc. | +| **Download failure** | HTTP error from `downloadAndSave` | 500 from result endpoint, network error | +| **File write failure** | IO error from `saveResult` | Disk full, permission denied | +| **Concurrency error** | Error from semaphore | Semaphore acquisition failed | + +### 5.2 Error Handling Strategies + +#### Strategy A: Best-Effort / Collect-All (Default) + +**Behavior:** All records are attempted. Per-record errors are collected. Successful results are returned alongside a list of failures. + +``` +Record 1: βœ“ saved β†’ "/out/seq1_JOB1.json" +Record 2: βœ— submit error β†’ "seq2: 500 Internal Server Error" +Record 3: βœ“ saved β†’ "/out/seq3_JOB2.json" +Record 4: βœ— poll timeout β†’ "seq4: timed out" + +Result: Right({ + Paths: ["/out/seq1_JOB1.json", "/out/seq3_JOB2.json"], + Errors: ["seq2: 500 Internal Server Error", "seq4: timed out"] +}) +``` + +**Implementation:** +- Use `sync.WaitGroup` for worker coordination (no cancellation on error). +- Each worker runs to completion, recording its own outcome. +- A mutex-protected error slice collects per-record failures. +- After all workers finish, aggregate results and errors into a summary report. +- FASTA parse errors also recorded per-record (skip the malformed record, continue). + +**When to use:** This is the default and only mode. Large batches benefit from not wasting in-flight work on a single failure. + +--- + +### 5.3 Error Message Design + +Per-record errors should be prefixed with the sequence ID for traceability: + +``` +seq tr|Q95Q25|ANMK_CAEEL: submission failed: 500 Internal Server Error +seq sp|P12345|PROT2: job timed out after 30m0s +seq tr|Q9XYZ1: API returned status FAILED +``` + +The `extractSeqID` function already exists in `extract.go:41-46` and extracts the identifier token from the FASTA header. + +### 5.4 Reporting + +The concurrent version should produce a summary at the end: + +``` +--- Scan Complete --- +Total records: 100 +Successful: 97 +Failed: 3 + seq ABC: timed out + seq DEF: 500 Internal Server Error + seq GHI: API returned status FAILED +Output files written to: ./output/ +``` + +The existing `reportScanResults` function should be upgraded to handle this summary format. + + +--- + +## 6. Mixing Go Concurrency with fp-go: Design Principles + +### 6.1 The Boundary Contract + +The concurrency layer and the functional layer meet at a well-defined boundary: + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ CONCURRENCY ORCHESTRATOR (imperative Go) β”‚ +β”‚ β”‚ +β”‚ - semaphore / WaitGroup β”‚ +β”‚ - worker lifecycle β”‚ +β”‚ - result aggregation (mutex-protected slice) β”‚ +β”‚ β”‚ +β”‚ Calls into: processOneRecord(SubmitInput) β†’ string β”‚ +β”‚ (executes the IOEither pipeline) β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ PER-RECORD PIPELINE (functional fp-go) β”‚ +β”‚ β”‚ +β”‚ - buildSubmitRequester β”‚ +β”‚ - pollJob β”‚ +β”‚ - downloadAndSave β”‚ +β”‚ β”‚ +β”‚ All pure composition, no concurrency awareness β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### 6.2 Functional Wrapper for Single-Record Processing + +Introduce a new function that wraps the existing pipeline into a single callable unit: + +```go +// processOneRecord executes the full submitβ†’pollβ†’downloadβ†’save pipeline +// for a single FASTA record. +// +// Type: SubmitInput β†’ E.Either[error, string] +// +// This is NOT a new implementation β€” it composes the existing functions. +func processOneRecord(input SubmitInput) E.Either[error, string] { + return F.Pipe4( + input, + buildSubmitRequester, // SubmitInput β†’ IOEither[error, SubmittedJob] + IOE.Chain(pollJob), // SubmittedJob β†’ IOEither[error, CompletedJob] + IOE.Chain(downloadAndSave), // CompletedJob β†’ IOEither[error, string] + toEither[error, string], // IOEither β†’ Either (execute lazily) + ) +} +``` + +This function: +- Is pure composition β€” it just pipes existing functions together. +- Is testable independently (see existing `TestProcessOneFastaIntegration`). +- Is the single entry point that the concurrency layer calls. +- Returns `Either[error, string]` β€” the concurrency layer folds over this. + +### 6.3 Polling Timeout (Unchanged) + +The current `pollJob` creates its own `context.Background()` with a per-job timeout via `context.WithTimeout`. Since best-effort mode does not cancel in-flight workers on error, there is no need for a shared parent context. Each worker's `pollJob` runs with its own independent timeout, matching the existing behavior exactly. + +**No change to `pollJob` signature required.** The existing `pollJob(job SubmittedJob) IOE.IOEither[error, CompletedJob]` is kept as-is. + +### 6.4 Imperative Residue Principle + +Following the pattern established in the `pollJob` refactoring plan (where the `select` block is acknowledged as "the minimal imperative residue"): + +> **Go has no monadic primitive for goroutine orchestration with bounded concurrency. The semaphore + WaitGroup block is the minimal imperative residue β€” all per-record logic above it is fully functional.** + +This is the same principle that justified the `select` block in `pollJob`. The imperative shell is thin, bounded, and intentional. + +### 6.5 Why Not IOEither.TraverseArrayPar? + +`TraverseArrayPar` is the most functional approach, but has three critical limitations: + +1. **Requires slurping.** `TraverseArrayPar` operates on `[]A` slices β€” it cannot consume a lazy channel-based iterator. This alone disqualifies it given the non-negotiable streaming constraint (Β§3.1, #7). +2. **No concurrency bounding.** For 25 records this is fine. For 1000 records it launches 1000 goroutines β€” potentially problematic for memory and API rate limits. +3. **All-or-nothing batch semantics.** If any record in a `TraverseArrayPar` batch fails, the entire batch returns `Left`. No partial results β€” incompatible with the best-effort error strategy. + +Approach C (semaphore + WaitGroup) solves all three while keeping the functional pipeline intact: streams the FASTA file, bounds concurrency precisely, and collects per-record errors. + +### 6.6 The Argument for Mixed Style + +Some functional programming purists may object to mixing goroutines with fp-go. The counter-argument: + +- **fp-go itself uses goroutines internally.** `ApPar` (used by `TraverseArrayPar`) launches goroutines. The `io` package's parallel operations are built on goroutines. +- **Go's concurrency model is first-class.** Goroutines and channels are Go's native concurrency primitives, just like Haskell has `par` and `async`. Using them alongside fp-go is not a violation β€” it's working with the language, not against it. +- **The project already accepts this.** `pollJob` has a `select` block and `TryCatchError` wraps an imperative `for` loop. The project's philosophy is: "keep the imperative shell thin." +- **The boundary is clear.** The imperative code is in the orchestrator; the functional code is in the per-record pipeline. They don't leak into each other. + + +--- + +## 7. Recommended Design: Approach C (Semaphore-Bounded Worker Pool) + +### 7.1 New Subcommand + +A new `concurrent-scan` subcommand is added to `cmd/interpro-cli/main.go`, separate from the existing `scan` subcommand. It shares the same scan-related flags plus one new flag: + +```go +{ + Name: "concurrent-scan", + Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{Name: "fasta", Aliases: []string{"f"}, Usage: "Path to FASTA file"}, + &cli.StringFlag{Name: "email", Aliases: []string{"e"}, Sources: cli.EnvVars("EBI_EMAIL"), Usage: "Email for EMBL-EBI Job Dispatcher (required)"}, + &cli.StringFlag{Name: "output", Aliases: []string{"o"}, Value: ".", Usage: "Output directory for JSON results"}, + &cli.StringFlag{Name: "seq-type", Aliases: []string{"s"}, Value: "p", Usage: "Sequence type: p (protein) or n (nucleotide)"}, + &cli.DurationFlag{Name: "poll-interval", Value: 15 * time.Second, Usage: "How often to check job status"}, + &cli.DurationFlag{Name: "timeout", Value: 30 * time.Minute, Usage: "Maximum time to wait for a single job"}, + &cli.IntFlag{Name: "concurrency", Aliases: []string{"c"}, Value: 25, Usage: "Maximum number of concurrent InterProScan jobs"}, + }, + Action: interpro.ConcurrentScan, +} +``` + +### 7.2 New Types + +```go +// Added to scan_model.go + +// ScanRequest extended with concurrency setting. +type ScanRequest struct { + // ... existing fields ... + Concurrency int // NEW: max concurrent in-flight jobs (default 25) +} +``` + +### 7.3 New Functions + +| Function | File | Purpose | +|----------|------|---------| +| `processOneRecord` | `scan_concurrent.go` | Composes submitβ†’pollβ†’downloadβ†’save for one record into a single `Either[error, string]` | +| `streamFastaRecordsConcurrent` | `scan_concurrent.go` | Concurrent orchestrator: reads FASTA, dispatches workers via WaitGroup, collects results | +| `ConcurrentScan` | `scan_concurrent.go` | Top-level entry point for the `concurrent-scan` subcommand | +| `reportConcurrentResults` | `scan_concurrent.go` | Reporter: prints summary with success/failure counts | + +### 7.4 Modified Functions + +| Function | Change | Reason | +|----------|--------|--------| +| `ConcurrentScan` | NEW entry point | Separate entry for concurrent-scan subcommand | +| `ScanRequest` | Add `Concurrency int` field | Store concurrency setting | +| `extractScanRequest` | Read `Concurrency` from CLI flag | Wire the flag | + +No existing pipeline functions are modified. `pollJob`, `buildSubmitRequester`, `downloadAndSave`, and all other per-record functions remain untouched. + +### 7.5 File Layout + +``` +internal/interpro/ + β”œβ”€ scan.go ← Unchanged: sequential scan subcommand + β”œβ”€ scan_model.go ← Modified: Concurrency field added to ScanRequest; new types + β”œβ”€ scan_concurrent.go ← NEW: top-level ConcurrentScan entry + orchestrator + worker + β”œβ”€ scan_loop.go ← Unchanged: sequential fallback + β”œβ”€ scan_submit.go ← Unchanged: per-record functional pipeline + β”œβ”€ scan_poll.go ← Unchanged: pollJob + β”œβ”€ scan_result.go ← Unchanged: per-record functional pipeline + β”œβ”€ extract.go ← Unchanged: extractSeqID + └─ client.go ← Unchanged: toEither +``` + +### 7.6 Top-Level Control Flow + +``` +ConcurrentScan(ctx, cmd) ← NEW entry point for concurrent-scan subcommand + β”‚ + β”œβ”€ extractScanRequest(cmd) β†’ ScanRequest (includes Concurrency) + β”œβ”€ validateScanRequest(req) β†’ Either[error, ScanRequest] + β”œβ”€ IOE.FromEither + β”œβ”€ IOE.Map(makeSubmitArgs) β†’ SubmitArgs + β”œβ”€ IOE.ChainFirst(ensureOutputDir) + β”‚ + └─ streamFastaRecordsConcurrent(args) β†’ IOEither[error, []string] (NEW) + β”‚ + └─ toEither β†’ E.Fold(wrapScanError, reportConcurrentResults) +``` + +The existing `Scan` entry point and `streamFastaRecords` are left untouched for the sequential `scan` subcommand. + +### 7.7 Concurrent Orchestrator Design + +``` +streamFastaRecordsConcurrent(args SubmitArgs) IOE.IOEither[error, []string] + +1. Parse CLI config: + - concurrency limit from args.F2.Concurrency (default 25) + - output dir, client, etc. from args + +2. Create synchronization primitives: + sem := semaphore.NewWeighted(int64(concurrency)) + var wg sync.WaitGroup + +3. Create shared state: + var mu sync.Mutex + var results []string + var errors []string + +4. Reader loop (runs in calling goroutine, not a separate one): + for record := range seqio.ParseFASTA(fastaPath): + β”‚ + β”œβ”€ If Left(err): record parse error in errors slice, continue + β”‚ + └─ If Right(rec): + sem.Acquire(context.Background(), 1) // block if N workers running + β”‚ + wg.Add(1) + go func(r seqio.Fasta) { // launch worker goroutine + defer sem.Release(1) + defer wg.Done() + + input := T.MakeTuple3(args.F1, args.F2, r) + + // Execute per-record pipeline + result := processOneRecord(input) + + E.Fold( + func(err error) { + mu.Lock() + errors = append(errors, + fmt.Sprintf("seq %s: %v", extractSeqID(r), err)) + mu.Unlock() + }, + func(path string) { + mu.Lock() + results = append(results, path) + mu.Unlock() + }, + )(result) + }(rec) + +5. Wait for all workers: + wg.Wait() + +6. Return aggregated results: + if len(errors) > 0: + return results, fmt.Errorf("\n %s", strings.Join(errors, "\n ")) + return results, nil +``` + +**Key behaviors:** +- `sem.Acquire(context.Background(), 1)` blocks the reader goroutine when N workers are in-flight. This provides natural backpressure β€” FASTA reading pauses until a worker slot opens up. +- Workers run independently to completion β€” no cancellation on individual errors (best-effort). +- Results are collected in FASTA-read order (approximately), not completion order β€” the semaphore acquisition gates the read loop. +- If `Concurrency >= len(records)`, all records are dispatched immediately (equivalent to full parallelism). +- FASTA parse errors are recorded as per-record errors, not fatal β€” the loop continues to the next record. + +### 7.9 Per-Record Pipeline (Unchanged) + +The functional pipeline for a single record is completely unchanged. No new context parameter needed: + +``` +Input: T.Tuple3[ioehttp.Client, ScanRequest, seqio.Fasta] + +F.Pipe4( + input, + buildSubmitRequester, // POST /run β†’ SubmittedJob + IOE.Chain(pollJob), // GET /status β†’ CompletedJob (poll loop) + IOE.Chain(downloadAndSave), // GET /result β†’ string (file path) + toEither[error, string], // execute β†’ Either[error, string] +) + +Output: Either[error, string] // Left = error, Right = file path +``` + +All functions are reused without modification. Each worker runs in its own goroutine with an independent `pollJob` timeout. + +### 7.10 Error Aggregation + +Each worker records its outcome (success path or error message). After all workers complete via `wg.Wait()`, the orchestrator returns the aggregated result: + +- If any errors were collected: return the successful paths alongside a multi-line error string containing all per-record failures. +- If no errors: return all paths with nil error. + +FASTA parse errors (malformed records) are treated the same as processing errors β€” recorded and skipped, not fatal. + +``` +// After wg.Wait(): +if len(errors) > 0 { + return results, fmt.Errorf( + "%d record(s) failed:\n %s", + len(errors), + strings.Join(errors, "\n "), + ) +} +return results, nil +``` + +### 7.11 Result Reporting + +Updated `reportScanResults` for concurrent mode: + +``` +--- Scan Summary --- +Records processed: 100 + Succeeded: 97 + Failed: 3 +Output directory: ./output/ + +Failures: + seq tr|Q95Q25: timed out after 30m0s + seq sp|P12345: 500 Internal Server Error + seq tr|Q9XYZ1: API returned status FAILED +``` + + +--- + +## 8. Testing Strategy + +### 8.1 Unit Tests + +All existing tests in `scan_test.go` should continue to pass unchanged: +- `TestExtractScanRequest` β€” add assertion for `Concurrency` field +- `TestValidateScanRequest` β€” unchanged +- `TestBuildSubmitRequester` β€” unchanged +- `TestSaveResult` β€” unchanged +- `TestPollJobFinished` β€” unchanged +- `TestPollJobTimeout` β€” unchanged +- `TestProcessOneFastaIntegration` β€” unchanged + +### 8.2 New Unit Tests + +| Test | What it covers | +|------|---------------| +| `TestProcessOneRecord` | Verifies `processOneRecord` composes submitβ†’pollβ†’downloadβ†’save correctly | +| `TestProcessOneRecordError` | Verifies error propagation through `processOneRecord` | + +### 8.3 Integration Tests (Concurrent) + +These are the critical new tests: + +``` +TestConcurrentScanTwoRecords + - Setup: mock server with /run, /status, /result endpoints + - Submit two records concurrently + - Verify both JSON files are created + - Verify both paths are returned + +TestConcurrentScanSomeRecordsFail + - Setup: mock server where some records fail (e.g., odd-indexed records return 500) + - Submit multiple records concurrently + - Verify successful records have their JSON files created + - Verify failed records appear in the error summary + - Verify processing continues past individual failures (best-effort) + +TestConcurrentScanAllFinish + - Setup: mock server that responds quickly + - Submit N records (N > 25 to test semaphore backpressure) + - Verify all N files are created + - Verify error list is empty + +TestConcurrentScanAllFail + - Setup: mock server that always returns 500 + - Submit multiple records + - Verify all records appear in error summary + - Verify no files were created + +TestConcurrentScanTimeout + - Setup: mock server that always returns RUNNING status + - Use short per-job timeout + - Verify timeout errors appear for each record + - Verify other records are not affected (best-effort) + +TestConcurrentScanSemaphoreLimiting + - Setup: mock server with artificial delay + - Track max concurrent in-flight requests + - Verify max never exceeds concurrency limit + +TestConcurrentScanParseErrors + - Setup: FASTA file with some malformed records (stray sequence data) + - Verify parse errors appear in error summary + - Verify valid records are processed normally +``` + +### 8.4 Test Fixtures + +Reuse the existing mock server pattern from `scan_test.go`: + +```go +// Multi-endpoint mock server (reuse from TestProcessOneFastaIntegration) +func newMockInterProServer(t *testing.T) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + switch { + case strings.HasSuffix(r.URL.Path, "/run"): + io.WriteString(w, "JOB-TEST-"+randomID()) + case strings.Contains(r.URL.Path, "/status/"): + io.WriteString(w, "FINISHED") + case strings.Contains(r.URL.Path, "/result/"): + w.Header().Set("Content-Type", "application/json") + io.WriteString(w, `{"results":[]}`) + } + })) +} +``` + + +--- + +## 9. Implementation Roadmap + +### Phase 1: Foundation + +1. **Add `Concurrency` field to `ScanRequest`** in `scan_model.go`. +2. **Add `concurrent-scan` subcommand** in `cmd/interpro-cli/main.go` with all scan flags + `--concurrency` (default 25). +3. **Add `processOneRecord` function** to new `scan_concurrent.go` β€” composes the existing pipeline functions. +4. **Implement `ConcurrentScan` entry point** β€” validation, mkdir, dispatch to `streamFastaRecordsConcurrent`. +5. **Write `TestProcessOneRecord`** β€” verify the composed pipeline works. +6. **Run full test suite** β€” verify no regressions. + +### Phase 2: Concurrent Orchestrator + +7. **Implement `streamFastaRecordsConcurrent`** in `scan_concurrent.go` using semaphore + WaitGroup. +8. **Add dependency:** `golang.org/x/sync` to `go.mod` (provides `semaphore`). +9. **Implement `reportConcurrentResults`** β€” prints summary with success/failure counts. +10. **Write `TestConcurrentScanTwoRecords`** β€” basic concurrent test. +11. **Write `TestConcurrentScanAllFinish`** β€” test with N > 25 records to verify semaphore backpressure. +12. **Write `TestConcurrentScanSomeFail`** β€” verify best-effort: one failure doesn't block others. + +### Phase 3: Polish + +13. **Write additional edge-case tests** (empty FASTA, all records fail, parse errors in stream). +14. **Add logging** β€” `IOE.ChainFirstIOK(IO.Logf[...])` for each record's progress. +15. **Run lint + full test suite.** +16. **Benchmark:** time a 100-record FASTA with `scan` (sequential) vs. `concurrent-scan` (25 concurrency). + +### Phase 4: Optional Enhancements + +17. **Fail-fast mode** (`--fail-fast` flag) using `errgroup` for cancellation. +18. **Progress bar** β€” show "Processing 15/100 (3 failed)..." during execution. +19. **Retry logic** β€” retry failed submissions with exponential backoff. +20. **Rate limiting** β€” add `--rate-limit` flag for API rate limiting. + + +--- + +## 10. Dependencies + +### New Go dependency + +``` +golang.org/x/sync v0.x.x +``` + +Provides: +- `golang.org/x/sync/semaphore` β€” weighted semaphore for concurrency bounding + +(Note: `errgroup` is NOT needed β€” best-effort mode uses `sync.WaitGroup` from the standard library.) + +### Existing dependencies (unchanged) + +| Package | Version | Usage | +|---------|---------|-------| +| `github.com/IBM/fp-go/v2` | v2.3.11 | Functional pipeline (IOEither, Either, array, tuple, etc.) | +| `github.com/urfave/cli/v3` | v3.9.0 | CLI framework | +| `github.com/stretchr/testify` | v1.11.1 | Test assertions | + + +--- + +## 11. Risks & Mitigations + +| Risk | Likelihood | Mitigation | +|------|-----------|------------| +| **API rate limiting** β€” EBI may throttle 25 concurrent submissions | Medium | Add `--rate-limit` flag in Phase 4; test with small batches first | +| **Memory pressure** β€” 25 concurrent poll goroutines with HTTP clients | Low | Each goroutine is lightweight (~4KB stack); 25 goroutines β‰ˆ 100KB | +| **File write contention** β€” 25 concurrent file writes to same output dir | Low | Each record writes to a unique file name (`{SeqID}_{JobID}.json`); OS handles concurrent writes | +| **Test flakiness** β€” timing-dependent tests with semaphore | Low | Use short timeouts (50ms) and fast mock servers; avoid `time.Sleep` in tests | +| **Goroutine leak** β€” WaitGroup not properly waited | Low | `defer wg.Done()` pattern; `go vet` catches missing Done calls | +| **Backward compatibility** β€” existing users of sequential `scan` | Low | Sequential `scan` subcommand preserved unchanged; `concurrent-scan` is separate | + + +--- + +## 12. Decisions + +| # | Question | Decision | +|---|----------|----------| +| **Q1** | Concurrency limit | CLI flag `--concurrency` with default 25 | +| **Q2** | Error handling | Best-effort: collect per-record errors, continue processing | +| **Q3** | New subcommand or modify `scan`? | New `concurrent-scan` subcommand | +| **Q4** | Result ordering | Non-deterministic (completion order) | +| **Q5** | Summary reporting | Print summary with counts + failure details | +| **Q6** | Polling timeout | Per-job timeout (unchanged) | +| **Q7** | Approach | **C:** Semaphore-bounded worker pool with WaitGroup | + +--- + +*Document version: 2.0 β€” May 2026* +*Status: Design approved β€” ready for implementation* + +--- + +*Document version: 1.0 β€” May 2026* diff --git a/go.mod b/go.mod index d40369d..63cc974 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/IBM/fp-go/v2 v2.3.11 github.com/stretchr/testify v1.11.1 github.com/urfave/cli/v3 v3.9.0 + golang.org/x/sync v0.20.0 ) require ( diff --git a/go.sum b/go.sum index e4894d0..cd64aec 100644 --- a/go.sum +++ b/go.sum @@ -12,3 +12,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= diff --git a/internal/interpro/scan_concurrent.go b/internal/interpro/scan_concurrent.go new file mode 100644 index 0000000..fe0d177 --- /dev/null +++ b/internal/interpro/scan_concurrent.go @@ -0,0 +1,164 @@ +package interpro + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync" + + E "github.com/IBM/fp-go/v2/either" + F "github.com/IBM/fp-go/v2/function" + IOE "github.com/IBM/fp-go/v2/ioeither" + IOEF "github.com/IBM/fp-go/v2/ioeither/file" + ioehttp "github.com/IBM/fp-go/v2/ioeither/http" + T "github.com/IBM/fp-go/v2/tuple" + "github.com/urfave/cli/v3" + "golang.org/x/sync/semaphore" + + "github.com/dictybase/interpro-manager/internal/seqio" +) + +// processOneRecord executes the full submitβ†’pollβ†’downloadβ†’save pipeline +// for a single FASTA record. Composes the existing pipeline functions. +func processOneRecord(input SubmitInput) E.Either[error, string] { + return F.Pipe4( + input, + buildSubmitRequester, + IOE.Chain(pollJob), + IOE.Chain(downloadAndSave), + toEither[error, string], + ) +} + +// ConcurrentScan is the entry point for the concurrent-scan subcommand. +func ConcurrentScan(_ context.Context, cmd *cli.Command) error { + return F.Pipe8( + cmd, + extractConcurrentScanRequest, + validateScanRequest, + IOE.FromEither, + IOE.Map[error](func(scanr ScanRequest) SubmitArgs { + return T.MakeTuple2( + ioehttp.MakeClient( + &http.Client{Timeout: scanr.Timeout}), + scanr, + ) + }), + IOE.ChainFirst(func(args SubmitArgs) IOE.IOEither[error, string] { + return IOEF.MkdirAll(T.Second(args).OutputDir, outputDirPerm) + }), + IOE.Chain(streamFastaRecordsConcurrent), + toEither[error, []string], + E.Fold(wrapScanError, reportConcurrentResults), + ) +} + +func extractConcurrentScanRequest(cmd *cli.Command) ScanRequest { + return ScanRequest{ + FastaPath: cmd.String("fasta"), + Email: cmd.String("email"), + OutputDir: cmd.String("output"), + SeqType: cmd.String("seq-type"), + BaseURL: scanBaseURL, + PollInterval: cmd.Duration("poll-interval"), + Timeout: cmd.Duration("timeout"), + Concurrency: cmd.Int("concurrency"), + } +} + +func reportConcurrentResults(paths []string) error { + for _, p := range paths { + fmt.Printf("wrote %s\n", p) + } + return nil +} + +// streamFastaRecordsConcurrent reads FASTA records and processes them +// concurrently using a semaphore-bounded worker pool with best-effort +// error handling: individual record failures are collected, not fatal. +func streamFastaRecordsConcurrent(args SubmitArgs) IOE.IOEither[error, []string] { + return IOE.TryCatchError(func() ([]string, error) { + client := args.F1 + config := args.F2 + concurrency := config.Concurrency + if concurrency <= 0 { + concurrency = 25 + } + + sem := semaphore.NewWeighted(int64(concurrency)) + + ctx := context.Background() + + var wg sync.WaitGroup + var mu sync.Mutex + var results []string + var errs []string + + for res := range seqio.ParseFASTA(config.FastaPath) { + if E.IsLeft(res) { + mu.Lock() + errs = append(errs, fmt.Sprintf( + "fasta parse error: %v", + leftVal(res), + )) + mu.Unlock() + continue + } + rec := rightVal(res) + + if err := sem.Acquire(ctx, 1); err != nil { + return nil, fmt.Errorf( + "semaphore acquire: %w", err, + ) + } + + wg.Add(1) + go func(r seqio.Fasta) { + defer sem.Release(1) + defer wg.Done() + + input := T.MakeTuple3(client, config, r) + + result := processOneRecord(input) + if E.IsLeft(result) { + mu.Lock() + errs = append(errs, fmt.Sprintf( + "seq %s: %v", + extractSeqID(r), + leftVal(result), + )) + mu.Unlock() + } else { + mu.Lock() + results = append(results, rightVal(result)) + mu.Unlock() + } + }(rec) + } + + wg.Wait() + + if len(errs) > 0 { + return results, fmt.Errorf( + "%d record(s) failed:\n %s", + len(errs), strings.Join(errs, "\n "), + ) + } + return results, nil + }) +} + +func leftVal[A, B any](e E.Either[A, B]) A { + return E.Fold[A, B](F.Identity[A], func(_ B) A { + var zero A + return zero + })(e) +} + +func rightVal[A, B any](e E.Either[A, B]) B { + return E.Fold[A, B](func(_ A) B { + var zero B + return zero + }, F.Identity[B])(e) +} diff --git a/internal/interpro/scan_model.go b/internal/interpro/scan_model.go index 5d53eeb..76aaff4 100644 --- a/internal/interpro/scan_model.go +++ b/internal/interpro/scan_model.go @@ -18,6 +18,7 @@ type ScanRequest struct { BaseURL string PollInterval time.Duration Timeout time.Duration + Concurrency int } // SubmittedJob holds state after a successful API submission. From 86c631d68112a9585560a060417dc2ce49dc2250 Mon Sep 17 00:00:00 2001 From: Siddhartha Basu Date: Sun, 24 May 2026 16:31:35 +0000 Subject: [PATCH 2/4] refactor(interpro-cli): extract subcommand builders and replace magic number with constant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract each CLI subcommand (download, scan, concurrent-scan) into its own builder function to fix funlen lint issue in main. Replace literal 25 with defaultConcurrency constant to fix mnd lint issue. πŸ’˜ Generated with Crush Assisted-by: Crush:deepseek/deepseek-v4-flash --- cmd/interpro-cli/main.go | 235 +++++++++++++++++++++------------------ 1 file changed, 124 insertions(+), 111 deletions(-) diff --git a/cmd/interpro-cli/main.go b/cmd/interpro-cli/main.go index 706d926..f56472d 100644 --- a/cmd/interpro-cli/main.go +++ b/cmd/interpro-cli/main.go @@ -16,124 +16,137 @@ const ( defaultPageSize = 20 defaultPollInterval = 15 * time.Second defaultTimeout = 30 * time.Minute + defaultConcurrency = 25 ) +func downloadCommand() *cli.Command { + return &cli.Command{ + Name: "download", + Usage: "Fetch InterPro protein records for a taxonomy ID and save to TSV", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "taxon-id", + Aliases: []string{"t"}, + Value: defaultTaxonID, + Usage: "NCBI Taxonomy ID", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: defaultOutput, + Usage: "Output TSV file path", + }, + &cli.IntFlag{ + Name: "page-size", + Aliases: []string{"p"}, + Value: defaultPageSize, + Usage: "API page size", + }, + }, + Action: interpro.DownloadAndWrite, + } +} + +func scanCommand() *cli.Command { + return &cli.Command{ + Name: "scan", + Usage: "Submit protein sequences to InterProScan and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "fasta", + Aliases: []string{"f"}, + Usage: "Path to FASTA file (supports multi-FASTA)", + }, + &cli.StringFlag{ + Name: "email", + Aliases: []string{"e"}, + Sources: cli.EnvVars("EBI_EMAIL"), + Usage: "Email for EMBL-EBI Job Dispatcher (required)", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: ".", + Usage: "Output directory for JSON results", + }, + &cli.StringFlag{ + Name: "seq-type", + Aliases: []string{"s"}, + Value: "p", + Usage: "Sequence type: p (protein) or n (nucleotide)", + }, + &cli.DurationFlag{ + Name: "poll-interval", + Value: defaultPollInterval, + Usage: "How often to check job status", + }, + &cli.DurationFlag{ + Name: "timeout", + Value: defaultTimeout, + Usage: "Maximum time to wait for a single job", + }, + }, + Action: interpro.Scan, + } +} + +func concurrentScanCommand() *cli.Command { + return &cli.Command{ + Name: "concurrent-scan", + Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "fasta", + Aliases: []string{"f"}, + Usage: "Path to FASTA file (supports multi-FASTA)", + }, + &cli.StringFlag{ + Name: "email", + Aliases: []string{"e"}, + Sources: cli.EnvVars("EBI_EMAIL"), + Usage: "Email for EMBL-EBI Job Dispatcher (required)", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: ".", + Usage: "Output directory for JSON results", + }, + &cli.StringFlag{ + Name: "seq-type", + Aliases: []string{"s"}, + Value: "p", + Usage: "Sequence type: p (protein) or n (nucleotide)", + }, + &cli.DurationFlag{ + Name: "poll-interval", + Value: defaultPollInterval, + Usage: "How often to check job status", + }, + &cli.DurationFlag{ + Name: "timeout", + Value: defaultTimeout, + Usage: "Maximum time to wait for a single job", + }, + &cli.IntFlag{ + Name: "concurrency", + Aliases: []string{"c"}, + Value: defaultConcurrency, + Usage: "Maximum number of concurrent InterProScan jobs", + }, + }, + Action: interpro.ConcurrentScan, + } +} + func main() { cmd := &cli.Command{ Name: "interpro-manager", Usage: "CLI for interacting with the InterPro protein database", Commands: []*cli.Command{ - { - Name: "download", - Usage: "Fetch InterPro protein records for a taxonomy ID and save to TSV", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "taxon-id", - Aliases: []string{"t"}, - Value: defaultTaxonID, - Usage: "NCBI Taxonomy ID", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: defaultOutput, - Usage: "Output TSV file path", - }, - &cli.IntFlag{ - Name: "page-size", - Aliases: []string{"p"}, - Value: defaultPageSize, - Usage: "API page size", - }, - }, - Action: interpro.DownloadAndWrite, - }, - { - Name: "scan", - Usage: "Submit protein sequences to InterProScan and save JSON results", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "fasta", - Aliases: []string{"f"}, - Usage: "Path to FASTA file (supports multi-FASTA)", - }, - &cli.StringFlag{ - Name: "email", - Aliases: []string{"e"}, - Sources: cli.EnvVars("EBI_EMAIL"), - Usage: "Email for EMBL-EBI Job Dispatcher (required)", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: ".", - Usage: "Output directory for JSON results", - }, - &cli.StringFlag{ - Name: "seq-type", - Aliases: []string{"s"}, - Value: "p", - Usage: "Sequence type: p (protein) or n (nucleotide)", - }, - &cli.DurationFlag{ - Name: "poll-interval", - Value: defaultPollInterval, - Usage: "How often to check job status", - }, - &cli.DurationFlag{ - Name: "timeout", - Value: defaultTimeout, - Usage: "Maximum time to wait for a single job", - }, - }, - Action: interpro.Scan, - }, - { - Name: "concurrent-scan", - Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "fasta", - Aliases: []string{"f"}, - Usage: "Path to FASTA file (supports multi-FASTA)", - }, - &cli.StringFlag{ - Name: "email", - Aliases: []string{"e"}, - Sources: cli.EnvVars("EBI_EMAIL"), - Usage: "Email for EMBL-EBI Job Dispatcher (required)", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: ".", - Usage: "Output directory for JSON results", - }, - &cli.StringFlag{ - Name: "seq-type", - Aliases: []string{"s"}, - Value: "p", - Usage: "Sequence type: p (protein) or n (nucleotide)", - }, - &cli.DurationFlag{ - Name: "poll-interval", - Value: defaultPollInterval, - Usage: "How often to check job status", - }, - &cli.DurationFlag{ - Name: "timeout", - Value: defaultTimeout, - Usage: "Maximum time to wait for a single job", - }, - &cli.IntFlag{ - Name: "concurrency", - Aliases: []string{"c"}, - Value: 25, - Usage: "Maximum number of concurrent InterProScan jobs", - }, - }, - Action: interpro.ConcurrentScan, - }, + downloadCommand(), + scanCommand(), + concurrentScanCommand(), }, } From 3f3a64e63508b333d83e7c0fda17d9afa55a105e Mon Sep 17 00:00:00 2001 From: Siddhartha Basu Date: Sun, 24 May 2026 11:35:37 -0500 Subject: [PATCH 3/4] refactor(cli): inline command definitions in main.go Inline the `download`, `scan`, and `concurrent-scan` command definitions directly into the main application structure. Remove the now-redundant helper functions and the unused `defaultConcurrency` constant to simplify the codebase and reduce boilerplate. --- cmd/interpro-cli/main.go | 235 ++++++++++++++++++--------------------- 1 file changed, 111 insertions(+), 124 deletions(-) diff --git a/cmd/interpro-cli/main.go b/cmd/interpro-cli/main.go index f56472d..706d926 100644 --- a/cmd/interpro-cli/main.go +++ b/cmd/interpro-cli/main.go @@ -16,137 +16,124 @@ const ( defaultPageSize = 20 defaultPollInterval = 15 * time.Second defaultTimeout = 30 * time.Minute - defaultConcurrency = 25 ) -func downloadCommand() *cli.Command { - return &cli.Command{ - Name: "download", - Usage: "Fetch InterPro protein records for a taxonomy ID and save to TSV", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "taxon-id", - Aliases: []string{"t"}, - Value: defaultTaxonID, - Usage: "NCBI Taxonomy ID", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: defaultOutput, - Usage: "Output TSV file path", - }, - &cli.IntFlag{ - Name: "page-size", - Aliases: []string{"p"}, - Value: defaultPageSize, - Usage: "API page size", - }, - }, - Action: interpro.DownloadAndWrite, - } -} - -func scanCommand() *cli.Command { - return &cli.Command{ - Name: "scan", - Usage: "Submit protein sequences to InterProScan and save JSON results", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "fasta", - Aliases: []string{"f"}, - Usage: "Path to FASTA file (supports multi-FASTA)", - }, - &cli.StringFlag{ - Name: "email", - Aliases: []string{"e"}, - Sources: cli.EnvVars("EBI_EMAIL"), - Usage: "Email for EMBL-EBI Job Dispatcher (required)", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: ".", - Usage: "Output directory for JSON results", - }, - &cli.StringFlag{ - Name: "seq-type", - Aliases: []string{"s"}, - Value: "p", - Usage: "Sequence type: p (protein) or n (nucleotide)", - }, - &cli.DurationFlag{ - Name: "poll-interval", - Value: defaultPollInterval, - Usage: "How often to check job status", - }, - &cli.DurationFlag{ - Name: "timeout", - Value: defaultTimeout, - Usage: "Maximum time to wait for a single job", - }, - }, - Action: interpro.Scan, - } -} - -func concurrentScanCommand() *cli.Command { - return &cli.Command{ - Name: "concurrent-scan", - Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "fasta", - Aliases: []string{"f"}, - Usage: "Path to FASTA file (supports multi-FASTA)", - }, - &cli.StringFlag{ - Name: "email", - Aliases: []string{"e"}, - Sources: cli.EnvVars("EBI_EMAIL"), - Usage: "Email for EMBL-EBI Job Dispatcher (required)", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: ".", - Usage: "Output directory for JSON results", - }, - &cli.StringFlag{ - Name: "seq-type", - Aliases: []string{"s"}, - Value: "p", - Usage: "Sequence type: p (protein) or n (nucleotide)", - }, - &cli.DurationFlag{ - Name: "poll-interval", - Value: defaultPollInterval, - Usage: "How often to check job status", - }, - &cli.DurationFlag{ - Name: "timeout", - Value: defaultTimeout, - Usage: "Maximum time to wait for a single job", - }, - &cli.IntFlag{ - Name: "concurrency", - Aliases: []string{"c"}, - Value: defaultConcurrency, - Usage: "Maximum number of concurrent InterProScan jobs", - }, - }, - Action: interpro.ConcurrentScan, - } -} - func main() { cmd := &cli.Command{ Name: "interpro-manager", Usage: "CLI for interacting with the InterPro protein database", Commands: []*cli.Command{ - downloadCommand(), - scanCommand(), - concurrentScanCommand(), + { + Name: "download", + Usage: "Fetch InterPro protein records for a taxonomy ID and save to TSV", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "taxon-id", + Aliases: []string{"t"}, + Value: defaultTaxonID, + Usage: "NCBI Taxonomy ID", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: defaultOutput, + Usage: "Output TSV file path", + }, + &cli.IntFlag{ + Name: "page-size", + Aliases: []string{"p"}, + Value: defaultPageSize, + Usage: "API page size", + }, + }, + Action: interpro.DownloadAndWrite, + }, + { + Name: "scan", + Usage: "Submit protein sequences to InterProScan and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "fasta", + Aliases: []string{"f"}, + Usage: "Path to FASTA file (supports multi-FASTA)", + }, + &cli.StringFlag{ + Name: "email", + Aliases: []string{"e"}, + Sources: cli.EnvVars("EBI_EMAIL"), + Usage: "Email for EMBL-EBI Job Dispatcher (required)", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: ".", + Usage: "Output directory for JSON results", + }, + &cli.StringFlag{ + Name: "seq-type", + Aliases: []string{"s"}, + Value: "p", + Usage: "Sequence type: p (protein) or n (nucleotide)", + }, + &cli.DurationFlag{ + Name: "poll-interval", + Value: defaultPollInterval, + Usage: "How often to check job status", + }, + &cli.DurationFlag{ + Name: "timeout", + Value: defaultTimeout, + Usage: "Maximum time to wait for a single job", + }, + }, + Action: interpro.Scan, + }, + { + Name: "concurrent-scan", + Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "fasta", + Aliases: []string{"f"}, + Usage: "Path to FASTA file (supports multi-FASTA)", + }, + &cli.StringFlag{ + Name: "email", + Aliases: []string{"e"}, + Sources: cli.EnvVars("EBI_EMAIL"), + Usage: "Email for EMBL-EBI Job Dispatcher (required)", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: ".", + Usage: "Output directory for JSON results", + }, + &cli.StringFlag{ + Name: "seq-type", + Aliases: []string{"s"}, + Value: "p", + Usage: "Sequence type: p (protein) or n (nucleotide)", + }, + &cli.DurationFlag{ + Name: "poll-interval", + Value: defaultPollInterval, + Usage: "How often to check job status", + }, + &cli.DurationFlag{ + Name: "timeout", + Value: defaultTimeout, + Usage: "Maximum time to wait for a single job", + }, + &cli.IntFlag{ + Name: "concurrency", + Aliases: []string{"c"}, + Value: 25, + Usage: "Maximum number of concurrent InterProScan jobs", + }, + }, + Action: interpro.ConcurrentScan, + }, }, } From 9d78126a47616b8c78f21a5c8c3ac44637c5e754 Mon Sep 17 00:00:00 2001 From: Siddhartha Basu Date: Sun, 24 May 2026 11:45:51 -0500 Subject: [PATCH 4/4] refactor(cli): modularize command definitions in main.go Extract CLI command definitions into separate functions (`downloadCmd`, `scanCmd`, `concurrentScanCmd`) to improve code readability and maintainability. Add `defaultConcurrency` constant to centralize configuration. --- cmd/interpro-cli/main.go | 243 +++++++++++++++++++++------------------ 1 file changed, 129 insertions(+), 114 deletions(-) diff --git a/cmd/interpro-cli/main.go b/cmd/interpro-cli/main.go index 706d926..18aa0fb 100644 --- a/cmd/interpro-cli/main.go +++ b/cmd/interpro-cli/main.go @@ -14,131 +14,146 @@ const ( defaultTaxonID = "44689" defaultOutput = "interpro_proteins.tsv" defaultPageSize = 20 + defaultConcurrency = 25 defaultPollInterval = 15 * time.Second defaultTimeout = 30 * time.Minute ) func main() { - cmd := &cli.Command{ + if err := newCommand().Run(context.Background(), os.Args); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func newCommand() *cli.Command { + return &cli.Command{ Name: "interpro-manager", Usage: "CLI for interacting with the InterPro protein database", Commands: []*cli.Command{ - { - Name: "download", - Usage: "Fetch InterPro protein records for a taxonomy ID and save to TSV", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "taxon-id", - Aliases: []string{"t"}, - Value: defaultTaxonID, - Usage: "NCBI Taxonomy ID", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: defaultOutput, - Usage: "Output TSV file path", - }, - &cli.IntFlag{ - Name: "page-size", - Aliases: []string{"p"}, - Value: defaultPageSize, - Usage: "API page size", - }, - }, - Action: interpro.DownloadAndWrite, - }, - { - Name: "scan", - Usage: "Submit protein sequences to InterProScan and save JSON results", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "fasta", - Aliases: []string{"f"}, - Usage: "Path to FASTA file (supports multi-FASTA)", - }, - &cli.StringFlag{ - Name: "email", - Aliases: []string{"e"}, - Sources: cli.EnvVars("EBI_EMAIL"), - Usage: "Email for EMBL-EBI Job Dispatcher (required)", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: ".", - Usage: "Output directory for JSON results", - }, - &cli.StringFlag{ - Name: "seq-type", - Aliases: []string{"s"}, - Value: "p", - Usage: "Sequence type: p (protein) or n (nucleotide)", - }, - &cli.DurationFlag{ - Name: "poll-interval", - Value: defaultPollInterval, - Usage: "How often to check job status", - }, - &cli.DurationFlag{ - Name: "timeout", - Value: defaultTimeout, - Usage: "Maximum time to wait for a single job", - }, - }, - Action: interpro.Scan, - }, - { - Name: "concurrent-scan", - Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "fasta", - Aliases: []string{"f"}, - Usage: "Path to FASTA file (supports multi-FASTA)", - }, - &cli.StringFlag{ - Name: "email", - Aliases: []string{"e"}, - Sources: cli.EnvVars("EBI_EMAIL"), - Usage: "Email for EMBL-EBI Job Dispatcher (required)", - }, - &cli.StringFlag{ - Name: "output", - Aliases: []string{"o"}, - Value: ".", - Usage: "Output directory for JSON results", - }, - &cli.StringFlag{ - Name: "seq-type", - Aliases: []string{"s"}, - Value: "p", - Usage: "Sequence type: p (protein) or n (nucleotide)", - }, - &cli.DurationFlag{ - Name: "poll-interval", - Value: defaultPollInterval, - Usage: "How often to check job status", - }, - &cli.DurationFlag{ - Name: "timeout", - Value: defaultTimeout, - Usage: "Maximum time to wait for a single job", - }, - &cli.IntFlag{ - Name: "concurrency", - Aliases: []string{"c"}, - Value: 25, - Usage: "Maximum number of concurrent InterProScan jobs", - }, - }, - Action: interpro.ConcurrentScan, + downloadCmd(), + scanCmd(), + concurrentScanCmd(), + }, + } +} + +func downloadCmd() *cli.Command { + return &cli.Command{ + Name: "download", + Usage: "Fetch InterPro protein records for a taxonomy ID and save to TSV", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "taxon-id", + Aliases: []string{"t"}, + Value: defaultTaxonID, + Usage: "NCBI Taxonomy ID", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: defaultOutput, + Usage: "Output TSV file path", + }, + &cli.IntFlag{ + Name: "page-size", + Aliases: []string{"p"}, + Value: defaultPageSize, + Usage: "API page size", }, }, + Action: interpro.DownloadAndWrite, } +} - if err := cmd.Run(context.Background(), os.Args); err != nil { - fmt.Fprintf(os.Stderr, "error: %v\n", err) - os.Exit(1) +func scanCmd() *cli.Command { + return &cli.Command{ + Name: "scan", + Usage: "Submit protein sequences to InterProScan and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "fasta", + Aliases: []string{"f"}, + Usage: "Path to FASTA file (supports multi-FASTA)", + }, + &cli.StringFlag{ + Name: "email", + Aliases: []string{"e"}, + Sources: cli.EnvVars("EBI_EMAIL"), + Usage: "Email for EMBL-EBI Job Dispatcher (required)", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: ".", + Usage: "Output directory for JSON results", + }, + &cli.StringFlag{ + Name: "seq-type", + Aliases: []string{"s"}, + Value: "p", + Usage: "Sequence type: p (protein) or n (nucleotide)", + }, + &cli.DurationFlag{ + Name: "poll-interval", + Value: defaultPollInterval, + Usage: "How often to check job status", + }, + &cli.DurationFlag{ + Name: "timeout", + Value: defaultTimeout, + Usage: "Maximum time to wait for a single job", + }, + }, + Action: interpro.Scan, + } +} + +func concurrentScanCmd() *cli.Command { + return &cli.Command{ + Name: "concurrent-scan", + Usage: "Submit protein sequences to InterProScan concurrently and save JSON results", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "fasta", + Aliases: []string{"f"}, + Usage: "Path to FASTA file (supports multi-FASTA)", + }, + &cli.StringFlag{ + Name: "email", + Aliases: []string{"e"}, + Sources: cli.EnvVars("EBI_EMAIL"), + Usage: "Email for EMBL-EBI Job Dispatcher (required)", + }, + &cli.StringFlag{ + Name: "output", + Aliases: []string{"o"}, + Value: ".", + Usage: "Output directory for JSON results", + }, + &cli.StringFlag{ + Name: "seq-type", + Aliases: []string{"s"}, + Value: "p", + Usage: "Sequence type: p (protein) or n (nucleotide)", + }, + &cli.DurationFlag{ + Name: "poll-interval", + Value: defaultPollInterval, + Usage: "How often to check job status", + }, + &cli.DurationFlag{ + Name: "timeout", + Value: defaultTimeout, + Usage: "Maximum time to wait for a single job", + }, + &cli.IntFlag{ + Name: "concurrency", + Aliases: []string{"c"}, + Value: defaultConcurrency, + Usage: "Maximum number of concurrent InterProScan jobs", + }, + }, + Action: interpro.ConcurrentScan, } }