diff --git a/.github/workflows/claude-code-review.yml b/.github/workflows/claude-code-review.yml index b5e8cfd4d..0d3503722 100644 --- a/.github/workflows/claude-code-review.yml +++ b/.github/workflows/claude-code-review.yml @@ -12,12 +12,7 @@ on: jobs: claude-review: - # Optional: Filter by PR author - # if: | - # github.event.pull_request.user.login == 'external-contributor' || - # github.event.pull_request.user.login == 'new-developer' || - # github.event.pull_request.author_association == 'FIRST_TIME_CONTRIBUTOR' - + if: github.event.sender.type != 'Bot' runs-on: ubuntu-latest permissions: contents: read diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f08f58ff7..e1e53a0a2 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -1,120 +1,331 @@ -# This workflow will build a Java project with Maven -# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven +# PR builds: +# Unit, Integration, Performance - all in parallel with fail-fast +# (if unit tests fail at ~5 min, integration + performance are cancelled) +# +# Push builds (master): +# Full Kafka version matrix (3.1.0, 3.7.0, 3.9.1 + experimental 4.x) -# Tests disabled due to flakiness with under resourced github test machines. Confluent Jira works fine. Will fix later. name: Build and Test on: push: + branches: [ master ] pull_request: -# Cancel in-progress runs when a new commit is pushed to the same branch/PR +permissions: + contents: read + pull-requests: write + checks: write + concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.ref }} cancel-in-progress: true jobs: - # Fast build on push - uses default Kafka version from pom.xml - build: - if: github.event_name == 'push' - name: "Build (default AK)" + + # ── PR Builds ────────────────────────────────────────────────────────── + + # Pre-warm the Maven dependency cache so the three test jobs can start + # with everything local. Without this, each job races to download the + # same deps from Maven Central, and if Central is slow, they all stall. + # TODO: If this extra serial step doesn't consistently help, remove it + # and rely on per-job caching + longer timeouts instead. Trade-off is + # wall-clock time (adds ~1-2 min serial) vs reliability vs flaky networks. + prepare-deps: + if: github.event_name == 'pull_request' + name: "Prepare Maven Cache" runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + # Explicit cache steps below instead of setup-java's built-in + # `cache: maven`. Built-in uses actions/cache under the hood, + # which does NOT save when the primary key was a hit. Once an + # early run populates a partial cache (e.g. Central timed out + # on one POM), every subsequent run restores the same gap and + # can never write the completed version back. Rotating save + # key below forces a save every run. + - name: Restore Maven cache + uses: actions/cache/restore@v4 + with: + path: ~/.m2/repository + key: setup-java-Linux-x64-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + setup-java-Linux-x64-maven- + - name: Download all dependencies + run: ./mvnw --batch-mode -Pci dependency:go-offline -Dlicense.skip -DincludeScope=test -U + - name: Save Maven cache (rotating key) + if: always() + uses: actions/cache/save@v4 + with: + path: ~/.m2/repository + key: setup-java-Linux-x64-maven-${{ hashFiles('**/pom.xml') }}-${{ github.run_id }} + # All test suites in parallel -fail-fast cancels the rest if any fails + test: + if: github.event_name == 'pull_request' + needs: prepare-deps + strategy: + fail-fast: true + matrix: + include: + - suite: unit + name: "Unit Tests" + cmd: "bin/ci-unit-test.sh" + timeout: 15 + # TODO: tighten integration/performance to 30m once we have 5+ + # consecutive successful runs consistently under 20m. Current 60m + # buffer is for Maven Central network variance we've observed. + - suite: integration + name: "Integration Tests" + cmd: "bin/ci-integration-test.sh" + timeout: 60 + - suite: performance + name: "Performance Tests" + cmd: "bin/performance-test.sh" + timeout: 60 + name: "${{ matrix.name }}" + runs-on: ubuntu-latest + timeout-minutes: ${{ matrix.timeout }} steps: - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + # Explicit restore (no save) so we can fall back via + # restore-keys to the rotating-key cache prepare-deps saved. + # setup-java's built-in `cache: maven` has no restore-keys + # support, so it would miss prepare-deps' `...-{run_id}` key. + - name: Restore Maven cache + uses: actions/cache/restore@v4 + with: + path: ~/.m2/repository + key: setup-java-Linux-x64-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + setup-java-Linux-x64-maven- + - name: ${{ matrix.name }} + run: ${{ matrix.cmd }} + - name: Upload coverage to Codecov + if: always() + uses: codecov/codecov-action@v5 + with: + files: '**/target/site/jacoco/jacoco.xml,**/target/site/jacoco-it/jacoco.xml' + flags: ${{ matrix.suite }} + token: ${{ secrets.CODECOV_TOKEN }} - # - name: Setup JDK 1.9 - # uses: actions/setup-java@v1 - # with: - # java-version: 1.9 + # Duplicate code detection using both PMD CPD and jscpd engines. + # See https://github.com/astubbs/duplicate-code-cross-check + duplicate-detection: + if: github.event_name == 'pull_request' + name: "Duplicate Code Check" + runs-on: ubuntu-latest + timeout-minutes: 5 + permissions: + contents: read + pull-requests: write + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + - uses: astubbs/duplicate-code-cross-check@v1 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + directories: 'parallel-consumer-core/src parallel-consumer-vertx/src parallel-consumer-reactor/src parallel-consumer-mutiny/src' + cpd-max-duplication: '5' + jscpd-max-duplication: '4' - # - name: Show java 1.9 home - # /opt/hostedtoolcache/jdk/9.0.7/x64 - # run: which java + # File similarity detection - finds files that are semantically similar overall + file-similarity: + if: github.event_name == 'pull_request' + name: "File Similarity Check" + runs-on: ubuntu-latest + timeout-minutes: 5 + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + - uses: astubbs/duplicate-code-detection-tool@feat/base-vs-pr-comparison + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + directories: 'parallel-consumer-core/src,parallel-consumer-vertx/src,parallel-consumer-reactor/src,parallel-consumer-mutiny/src' + file_extensions: 'java' + ignore_below: 30 + fail_above: 80 + warn_above: 50 + one_comment: true + compare_with_base: true + max_increase: 10 - - name: Setup JDK 17 - uses: actions/setup-java@v5 + # SpotBugs static analysis -finds null derefs, concurrency issues, resource leaks + spotbugs: + if: github.event_name == 'pull_request' + name: "SpotBugs" + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 with: distribution: 'temurin' java-version: '17' cache: 'maven' + - name: Compile and run SpotBugs + run: ./mvnw --batch-mode -Pci compile spotbugs:spotbugs -Dlicense.skip -pl parallel-consumer-core -am + - name: Post SpotBugs annotations on PR + if: always() + uses: jwgmeligmeyling/spotbugs-github-action@v1.2 + with: + path: '**/target/spotbugsXml.xml' + token: ${{ secrets.GITHUB_TOKEN }} - # - name: Show java version - # run: java -version - - # - name: Show mvn version - # run: mvn -version - - # - name: Build with Maven on Java 13 - # run: mvn -B package --file pom.xml - - - # done automatically now - # - name: Cache Maven packages - # uses: actions/cache@v2.1.7 - # with: - # path: ~/.m2/repository - # key: ${{ runner.os }}-m2 - # restore-keys: ${{ runner.os }}-m2 + # Dependency vulnerability scanning -GitHub's own dependency review + dependency-scan: + if: github.event_name == 'pull_request' + name: "Dependency Vulnerabilities" + runs-on: ubuntu-latest + timeout-minutes: 5 + steps: + - uses: actions/checkout@v6 + - uses: actions/dependency-review-action@v4 + with: + fail-on-severity: high + comment-summary-in-pr: always - - name: Build and Test - run: bin/ci-build.sh + # Mutation testing (PIT) - verifies test assertions are meaningful. + # Non-blocking: posts a PR comment with mutation score but doesn't + # gate merge. TODO: move to self-hosted performance runner for speed. + mutation-testing: + if: github.event_name == 'pull_request' + name: "Mutation Testing (PIT)" + runs-on: ubuntu-latest + continue-on-error: true + timeout-minutes: 300 + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + cache: 'maven' + # Restore PIT history for incremental analysis — only re-mutates + # code that changed since last run. PIT writes its history to /tmp/ + # with a filename derived from groupId.artifactId.version, ignoring + # explicit -DhistoryInputLocation CLI flags when -DwithHistory is set. + - name: Restore PIT history cache + uses: actions/cache@v4 + with: + path: /tmp/*_pitest_history.bin + key: pit-history-${{ github.base_ref }}-${{ hashFiles('**/src/main/**/*.java') }} + restore-keys: | + pit-history-${{ github.base_ref }}- + pit-history- + # -DjvmArgs=-Xmx1g gives minion JVMs 1GB heap (default too small, + # causes MEMORY_ERROR / UNKNOWN_ERROR crashes). + # -DargLine= strips Jacoco's javaagent from minions (PIT has + # its own coverage). Separate from jvmArgs. + # -DtimeoutConstant/Factor generous per-mutation timeout for CI hardware + # -Dthreads=1 I/O-bound tests; parallelism adds contention not speed + # -DwithHistory incremental: skips re-mutating unchanged code. + # PIT writes history to /tmp/{groupId}.{artifactId}... + # which we cache above via actions/cache. + # targetClasses: internal.* only — the core engine where mutations matter most. + # Broadening to io.confluent.parallelconsumer.* hit GitHub's 6h job cap (#41). + - name: Run PIT mutation testing + run: | + ./mvnw --batch-mode -Pci test-compile org.pitest:pitest-maven:mutationCoverage \ + -Dlicense.skip \ + -DtargetClasses="io.confluent.parallelconsumer.internal.*" \ + -DtargetTests="io.confluent.parallelconsumer.*" \ + -DjvmArgs=-Xmx1g \ + -DargLine= \ + -DtimeoutConstant=30000 -DtimeoutFactor=3.0 \ + -Dthreads=1 \ + -DwithHistory \ + -pl parallel-consumer-core -am + - name: Upload PIT report + if: always() + uses: actions/upload-artifact@v4 + with: + name: pit-report + path: '**/target/pit-reports/**' + - name: Post PIT summary to PR + if: always() + uses: actions/github-script@v7 + with: + script: | + const fs = require('fs'); + const { execSync } = require('child_process'); + const files = execSync('find . -path "*/pit-reports/*/mutations.csv" -type f').toString().trim().split('\n').filter(f => f); + let body; + if (files.length === 0) { + body = `## :x: Mutation Testing (PIT) Report\n\n`; + body += `**PIT did not produce a report.** Most commonly this means a test failed in the baseline (PIT runs all tests unmodified first to establish green) and PIT aborted before mutating. See the "Run PIT mutation testing" step logs for the failing test, then either fix it or add it to \`-DexcludedTestClasses\` in the workflow.\n`; + } else { + let killed = 0, survived = 0, noCov = 0, total = 0; + for (const f of files) { + const lines = fs.readFileSync(f, 'utf8').split('\n').filter(l => l.trim()); + for (const line of lines) { + total++; + if (line.includes('KILLED')) killed++; + else if (line.includes('SURVIVED')) survived++; + else if (line.includes('NO_COVERAGE')) noCov++; + } + } + const score = total > 0 ? ((killed / total) * 100).toFixed(1) : '0'; + body = `## Mutation Testing (PIT) Report\n\n`; + body += `| Metric | Value |\n|--------|-------|\n`; + body += `| Mutations generated | ${total} |\n`; + body += `| Killed (detected) | ${killed} |\n`; + body += `| Survived (missed) | ${survived} |\n`; + body += `| No coverage | ${noCov} |\n`; + body += `| **Mutation score** | **${score}%** |\n\n`; + body += `Full HTML report available as artifact: \`pit-report\`\n`; + } + const comments = await github.rest.issues.listComments({ owner: context.repo.owner, repo: context.repo.repo, issue_number: context.issue.number }); + const existing = comments.data.find(c => c.body.includes('Mutation Testing (PIT) Report')); + if (existing) { + await github.rest.issues.updateComment({ owner: context.repo.owner, repo: context.repo.repo, comment_id: existing.id, body }); + } else { + await github.rest.issues.createComment({ owner: context.repo.owner, repo: context.repo.repo, issue_number: context.issue.number, body }); + } -# - name: Archive test results -# if: ${{ always() }} -# uses: actions/upload-artifact@v2 -# with: -# name: test-reports -# path: target/**-reports/* -# retention-days: 14 -# -# - name: Archive surefire test results -# if: ${{ always() }} -# uses: actions/upload-artifact@v2 -# with: -# name: test-reports -# path: target/surefire-reports/* -# retention-days: 14 + # ── Push Builds (master) ─────────────────────────────────────────────── - # Full Kafka version matrix on pull requests - build-matrix: - if: github.event_name == 'pull_request' + # Full Kafka version matrix as safety net + build: + if: github.event_name == 'push' strategy: fail-fast: false matrix: - # Why not? because we can. - # 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1 don't work - needs zstd and some kafka client libs. - # Doesn't mean it couldn't be modified slightly to work... - #ak: [ 2.5.1, 2.6.1, 2.7.0, 2.8.1, 3.0.1, 3.1.0 ] - # 25 and 26 include a dep with a vulnerability which ossindex fails the build for - ak: [ 2.8.1, 3.1.0, 3.5.0, 3.7.0, 3.9.1 ] - #ak: [ 2.7.0 ] - #jdk: [ '-P jvm8-release -Djvm8.location=/opt/hostedtoolcache/Java_Zulu_jdk/8.0.332-9/x64', '' ] - # TG currently targets 11, so can't run the tests on 8 https://github.com/astubbs/truth-generator/issues/114 + ak: [ 3.1.0, 3.7.0, 3.9.1 ] experimental: [ false ] - name: [ "Stable AK version" ] + name: [ "Stable" ] include: - # AK 2.4 not supported - # - ak: "'[2.4.1,2.5)'" # currently failing - # experimental: true - # name: "Oldest AK breaking version 2.4.1+ (below 2.5.0) expected to fail" - - ak: "'[3.1.0,4)'" + - ak: "'[3.9.1,5)'" experimental: true - name: "Newest AK version 3.1.0+?" - + name: "Newest AK (4.x?)" continue-on-error: ${{ matrix.experimental }} name: "AK: ${{ matrix.ak }} (${{ matrix.name }})" runs-on: ubuntu-latest - + timeout-minutes: 30 steps: - uses: actions/checkout@v6 - - - name: Setup JDK 17 - uses: actions/setup-java@v5 + - uses: actions/setup-java@v5 with: distribution: 'temurin' java-version: '17' cache: 'maven' - - name: Build and Test run: bin/ci-build.sh ${{ matrix.ak }} + - name: Upload coverage to Codecov + if: always() + uses: codecov/codecov-action@v5 + with: + files: '**/target/site/jacoco/jacoco.xml,**/target/site/jacoco-it/jacoco.xml' + flags: ak-${{ matrix.ak }} + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/performance.yml b/.github/workflows/performance.yml new file mode 100644 index 000000000..b33de3a14 --- /dev/null +++ b/.github/workflows/performance.yml @@ -0,0 +1,82 @@ +# Performance test suite, run on a self-hosted Windows runner with Docker Desktop. +# +# These tests are tagged @Tag("performance") and excluded from the regular CI +# build because they need substantial hardware (CPU, memory, disk). They run +# on dedicated machines where the user has labelled their runner with the +# "performance" custom label. +# +# Triggers: +# - workflow_dispatch (manual) - primary trigger +# - schedule (weekly) - automated regression check +# - NOT on PRs from forks - self-hosted runners + untrusted code = bad +# +# See docs/SELF_HOSTED_RUNNER.md for one-time runner setup instructions. + +name: Performance Tests + +on: + workflow_dispatch: + inputs: + kafka_version: + description: 'Kafka version to test against (default: project default)' + required: false + type: string + default: '' + schedule: + # Weekly on Sunday at 02:00 UTC + - cron: '0 2 * * 0' + +concurrency: + # Only run one performance test at a time per branch - they're slow and resource-heavy + group: performance-${{ github.ref }} + cancel-in-progress: false + +permissions: + contents: read + +jobs: + performance: + name: "Performance suite (self-hosted)" + # Targets a self-hosted runner labelled "performance" running Windows. + # The "self-hosted" label is automatic; "windows" and "performance" are + # added when the runner is registered. See docs/SELF_HOSTED_RUNNER.md. + runs-on: [self-hosted, windows, performance] + timeout-minutes: 180 + + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Setup JDK 17 + uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + # Don't cache here - self-hosted runners persist .m2 across runs already + cache: '' + + - name: Show environment + shell: cmd + run: | + java -version + docker --version + docker info + + - name: Run performance tests + shell: cmd + env: + KAFKA_VERSION: ${{ inputs.kafka_version }} + run: | + if defined KAFKA_VERSION ( + call bin\performance-test.cmd -Dkafka.version=%KAFKA_VERSION% + ) else ( + call bin\performance-test.cmd + ) + + - name: Upload test reports + if: always() + uses: actions/upload-artifact@v7 + with: + name: performance-reports-${{ github.run_number }} + path: '**/target/*-reports/*.xml' + retention-days: 30 diff --git a/.mvn/maven.config b/.mvn/maven.config new file mode 100644 index 000000000..bfe087449 --- /dev/null +++ b/.mvn/maven.config @@ -0,0 +1,6 @@ +-Dmaven.wagon.http.connectionTimeout=10000 +-Dmaven.wagon.http.readTimeout=120000 +-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 +-Dmaven.wagon.http.retryHandler.count=3 +-Daether.connector.connectTimeout=10000 +-Daether.connector.requestTimeout=120000 diff --git a/AGENTS.md b/AGENTS.md index 690edccfd..6db0fb961 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -47,6 +47,7 @@ bin/ci-build.sh 3.9.1 - **Integration tests**: `mvn verify` / failsafe plugin. Source in `src/test-integration/java/`. Uses TestContainers with `confluentinc/cp-kafka` Docker image. - **Test exclusion patterns**: `**/integrationTest*/**/*.java` and `**/*IT.java` are excluded from surefire, included in failsafe. - **Kafka version matrix**: CI tests against multiple Kafka versions via `-Dkafka.version=X.Y.Z`. +- **Performance tests**: Tagged `@Tag("performance")` and excluded from regular CI by default. They run on a self-hosted runner via `.github/workflows/performance.yml` (see `docs/SELF_HOSTED_RUNNER.md`). Run locally with `bin/performance-test.sh` (or `bin/performance-test.cmd` on Windows). Override the test group selection with Maven properties: `-Dincluded.groups=performance` to run only perf, `-Dexcluded.groups=` to run everything. ## Known Issues @@ -56,10 +57,101 @@ bin/ci-build.sh 3.9.1 - **Lombok**: Used extensively (builders, getters, logging). IntelliJ Lombok plugin required. - **EditorConfig**: Enforced via `.editorconfig` - 4-space indent for Java, 120 char line length. -- **License headers**: Managed by `license-maven-plugin` (Mycila). Use `-Dlicense.skip` locally to skip checks. +- **License headers**: Managed by `license-maven-plugin` (Mycila). See "License headers" section below. - **Google Truth**: Used for test assertions alongside JUnit 5 and Mockito. +## License headers + +The Mycila `license-maven-plugin` enforces a Confluent copyright header on all source files. It uses git-derived years via `${license.git.copyrightYears}`. + +**Skipping the check** (for any Maven goal): +``` +./mvnw -Dlicense.skip +``` + +**When to skip:** +- Running builds inside a git worktree — the git-years lookup fails with `Bare Repository has neither a working tree, nor an index` +- Local iteration where you don't want years auto-bumped on touched files +- Any command other than the canonical `mvn install` flow when copyright drift would create noise in `git status` + +The default behavior on macOS dev machines is `format` mode (auto-fixes headers) via the `license-format` profile (auto-activated). The `ci` profile flips this to `check` mode (fails the build on drift). Both `bin/build.sh` and `bin/ci-build.sh` already pass `-Dlicense.skip` for this reason. + +**When NOT to skip:** +- You're intentionally running `mvn license:format` to update headers +- You want to verify CI's check would pass before pushing + +## Agent Rules + +### Git Safety +- **NEVER commit or push without explicitly asking the user first.** Wait for approval. This is the #1 rule. +- **When creating a stacked PR, include `depends on #N` in the PR description** (where `#N` is the parent PR it stacks on). This fork runs a PR dependency gating action (see `.github/workflows/check-dependencies.yml`) which blocks child PRs from merging until the parent is merged. One `depends on` line per parent. Keep the list accurate if the chain changes. +- Branch off master for upstream contributor cherry-picks so PRs show only their change. +- Never commit without tests and documentation in the same pass. +- Run tests before committing. If they fail, fix them first. +- When you fix something or finish implementing something, record what lessons you learnt. + +### Development Discipline +- **Skateboard first.** Build the simplest end-to-end thing that works. Before starting a feature, ask: "Is this blocking the next public milestone?" If not, flag it and move on. +- **Never paper over the real problem** - make the proper fix. +- Don't propose workarounds that require user action when the software can solve it. If the software has enough information to derive the right answer, it should just do it. +- If constructing data in memory that is eventually going to be saved, save it as soon as it's created. Don't delay in case the programme crashes or the user exits. + +### Code Quality +- **Be DRY.** Reuse existing functions. Don't copy code - refactor where necessary. Refactor out common patterns. +- Never weaken test assertions - classify exceptions instead of ignoring them. +- Wire components through PCModule DI - don't bypass the dependency injection system. +- Validate user input - don't let bad input cause silent failures. +- Handle errors visibly - don't swallow exceptions. +- Give things meaningful names that describe what they do. Never use random or generic names. + +### Test Discipline +- Search for existing test harnesses and utilities before creating new ones. +- Run the complete test suite periodically, not just targeted tests. +- Maintain good high-level test coverage. Only get detailed on particularly complex functions that benefit from fine-grained testing. + +### CI and Automation +- Always set up continuous integration, code coverage, and automated dependency checking. +- Make scripts for common end-user requirements with helpful, suggestive CLI interfaces. + +### Documentation +- Keep a diary of major plans and their milestones. +- **Keep a developer-facing product specification** that outlines product features, functionality, and implementation architecture - separately from end-user documentation. With agentic programming, the developer can lose sight of architecture and implementation details. This document exposes the interesting, novel, and important implementation decisions so the developer maintains a clear mental model of the system even when agents are doing most of the coding. +- Keep end-user documentation updated. +- Keep documentation tables of contents updated. + +### Communication +- Use precise terminology - if the project defines specific terms, use them consistently. Don't use ambiguous words. +- Don't write with em dash characters. + +### Rule Sync +- Keep this AGENTS.md in sync with any global CLAUDE.md rules. If you have rules in your global config that are missing here, suggest to the user that they be added. This ensures all contributors and agents working on this project follow the same standards. + +### Working Directory +- Always run commands from the project root directory. +- Use `./mvnw` or `bin/*.sh` scripts - don't cd into submodules. +- Use `-pl module-name -am` for module-specific builds. + ## CI -- **GitHub Actions**: `.github/workflows/maven.yml` - runs on push/PR to master with Kafka version matrix. -- **Semaphore** (Confluent internal): `.semaphore/semaphore.yml` - primary CI for upstream. +PR builds run these jobs in parallel (fail-fast cancels others if any fails): + +| Job | Script / Tool | Purpose | +|-----|--------------|---------| +| **Unit Tests** | `bin/ci-unit-test.sh` | Surefire tests, no Docker | +| **Integration Tests** | `bin/ci-integration-test.sh` | Failsafe tests, TestContainers | +| **Performance Tests** | `bin/performance-test.sh` | `@Tag("performance")` volume tests | +| **SpotBugs** | Maven spotbugs plugin | Static analysis for bugs | +| **Duplicate Code Check** | PMD CPD | Detect Java copy-paste blocks (base-vs-PR comparison) | +| **Dependency Vulnerabilities** | GitHub dependency-review-action | CVE scanning | +| **Mutation Testing (PIT)** | pitest-maven | Test quality verification | + +Push builds (master): Full Kafka version matrix (3.1.0, 3.7.0, 3.9.1 + experimental [3.9.1,5) for 4.x). + +- **Code coverage**: JaCoCo → [Codecov](https://app.codecov.io/gh/astubbs/parallel-consumer). PRs fail if overall coverage drops by more than 1%. +- **Semaphore** (Confluent internal): `.semaphore/semaphore.yml` — primary CI for upstream. + +### Required secrets + +| Secret | Purpose | +|--------|---------| +| `CODECOV_TOKEN` | Codecov upload token — required because branch protection is enabled. Get it from [Codecov settings](https://app.codecov.io/gh/astubbs/parallel-consumer/settings). | diff --git a/README.adoc b/README.adoc index f3c48ea86..0ed2f6c13 100644 --- a/README.adoc +++ b/README.adoc @@ -283,7 +283,7 @@ The user just has to provide a function to extract from the message the HTTP cal === Illustrative Performance Example -.(see link:./parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VolumeTests.java[VolumeTests.java]) +.(see link:./parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java[VeryLargeMessageVolumeTest.java]) These performance comparison results below, even though are based on real performance measurement results, are for illustrative purposes. To see how the performance of the tool is related to instance counts, partition counts, key distribution and how it would relate to the vanilla client. Actual results will vary wildly depending upon the setup being deployed into. @@ -1341,6 +1341,59 @@ Note:: See https://github.com/confluentinc/parallel-consumer/issues/162[issue #162] and this https://stackoverflow.com/questions/4786881/why-is-test-jar-dependency-required-for-mvn-compile[Stack Overflow question]. +=== Build Scripts + +Helper scripts are in the `bin/` directory: + +[qanda] +Quick local build (compile + unit tests):: +`bin/build.sh` + +Unit tests only (no Docker needed):: +`bin/ci-unit-test.sh` + +Integration tests only (requires Docker for TestContainers):: +`bin/ci-integration-test.sh` + +Full CI build with all tests (unit + integration):: +`bin/ci-build.sh` + +CI build against a specific Kafka version:: +`bin/ci-build.sh 3.9.1` + +Performance test suite (also `bin/performance-test.cmd` on Windows):: +`bin/performance-test.sh` + +All `ci-*` scripts use the `-Pci` Maven profile which enables license checking and disables parallel test execution. The GitHub Actions CI workflow uses these scripts, so running them locally reproduces the CI environment. + +=== Performance Tests + +Tests tagged `@Tag("performance")` are excluded from the regular CI build because they need substantial hardware. They run on a dedicated self-hosted runner via `.github/workflows/performance.yml` (manual trigger or weekly schedule). + +To run the performance suite locally, use `bin/performance-test.sh`. To set up your own self-hosted runner for these tests, see link:./docs/SELF_HOSTED_RUNNER.md[docs/SELF_HOSTED_RUNNER.md]. + +=== Releasing + +The `pom.xml` version is the source of truth for publishing — there is no `maven-release-plugin` step. + +On every push to `master`, `.github/workflows/publish.yml` deploys to Maven Central: + +* If the version ends in `-SNAPSHOT` → publishes a snapshot +* If the version does not end in `-SNAPSHOT` → publishes a full release, creates a `v` git tag, and creates a GitHub release + +To cut a release: + +. Open a PR removing `-SNAPSHOT` from `` in the parent `pom.xml` (e.g. `0.6.0.0-SNAPSHOT` → `0.6.0.0`) +. Merge it to master → CI publishes the release +. Open another PR bumping to the next snapshot (e.g. `0.6.0.1-SNAPSHOT`) and merge + +Required GitHub repository secrets: + +* `MAVEN_CENTRAL_USERNAME` — Sonatype Central Portal token username +* `MAVEN_CENTRAL_PASSWORD` — Sonatype Central Portal token password +* `MAVEN_GPG_PRIVATE_KEY` — Armored GPG private key for signing artifacts +* `MAVEN_GPG_PASSPHRASE` — Passphrase for the GPG key + === Testing The project has good automated test coverage, of all features. diff --git a/bin/ci-build.sh b/bin/ci-build.sh index 2d497b96d..e485214db 100755 --- a/bin/ci-build.sh +++ b/bin/ci-build.sh @@ -22,4 +22,6 @@ fi -Pci \ clean verify \ $KAFKA_VERSION_ARG \ - -Dlicense.skip + -Dlicense.skip \ + -Dexcluded.groups=performance \ + -Dsurefire.rerunFailingTestsCount=2 diff --git a/bin/ci-integration-test.sh b/bin/ci-integration-test.sh new file mode 100755 index 000000000..03e996cae --- /dev/null +++ b/bin/ci-integration-test.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# +# Copyright (C) 2020-2026 Confluent, Inc. +# + +# Run integration tests only (failsafe, requires Docker for TestContainers). +# Skips unit tests to avoid duplicate work. +# Usage: bin/ci-integration-test.sh + +set -euo pipefail + +./mvnw --batch-mode \ + -Pci \ + clean verify \ + -DskipUTs=true \ + -Dlicense.skip \ + -Dexcluded.groups=performance \ + -Dsurefire.rerunFailingTestsCount=2 diff --git a/bin/ci-unit-test.sh b/bin/ci-unit-test.sh new file mode 100755 index 000000000..90ff19e8c --- /dev/null +++ b/bin/ci-unit-test.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# +# Copyright (C) 2020-2026 Confluent, Inc. +# + +# Run unit tests only (surefire, no Docker/TestContainers needed). +# Usage: bin/ci-unit-test.sh + +set -euo pipefail + +./mvnw --batch-mode \ + -Pci \ + clean test \ + -Dlicense.skip \ + -Dexcluded.groups=performance \ + -Dsurefire.rerunFailingTestsCount=2 diff --git a/bin/performance-test.cmd b/bin/performance-test.cmd new file mode 100644 index 000000000..803604baf --- /dev/null +++ b/bin/performance-test.cmd @@ -0,0 +1,18 @@ +@REM Copyright (C) 2020-2026 Confluent, Inc. +@REM +@REM Run only the performance test suite (tests tagged @Tag("performance")). +@REM These are excluded from the regular CI build because they take a long time +@REM and need substantial hardware. Used by the self-hosted Windows runner. +@REM +@REM Usage: bin\performance-test.cmd [extra-maven-args...] + +@echo off +setlocal + +call mvnw.cmd --batch-mode ^ + -Pci ^ + clean verify ^ + -Dincluded.groups=performance ^ + -Dexcluded.groups= ^ + -Dlicense.skip ^ + %* diff --git a/bin/performance-test.sh b/bin/performance-test.sh new file mode 100755 index 000000000..87ceda19d --- /dev/null +++ b/bin/performance-test.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# +# Copyright (C) 2020-2026 Confluent, Inc. +# + +# Run only the performance test suite (tests tagged @Tag("performance")). +# These are excluded from the regular CI build because they take a long time +# and need substantial hardware. The self-hosted runner workflow +# (.github/workflows/performance.yml) calls this script. +# +# Usage: bin/performance-test.sh [extra-maven-args...] + +set -euo pipefail + +./mvnw --batch-mode \ + -Pci \ + clean verify \ + -Dincluded.groups=performance \ + -Dexcluded.groups= \ + -Dlicense.skip \ + "$@" diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 000000000..0b0bba041 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,11 @@ +coverage: + status: + project: + default: + # Fail if overall coverage drops from the base branch + target: auto + threshold: 1% + patch: + default: + # Don't enforce a minimum on new code — just track it + informational: true diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java index 1ad4f6aa0..0b482ce8b 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java @@ -7,6 +7,7 @@ import io.confluent.parallelconsumer.state.PartitionStateManager; import lombok.NonNull; import lombok.Value; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; @@ -18,6 +19,7 @@ * * @see BrokerPollSystem#partitionAssignmentEpoch */ +@Slf4j @Value public class EpochAndRecordsMap { @@ -27,6 +29,15 @@ public EpochAndRecordsMap(ConsumerRecords poll, PartitionStateManager { var records = poll.records(partition); Long epochOfPartition = pm.getEpochOfPartition(partition); + if (epochOfPartition == null) { + // Race: poll() returned records for a partition before onPartitionsAssigned() + // has fired. This is more likely with Kafka 2.x's eager rebalance protocol. + // Safe to skip — these records haven't been committed, so Kafka will re-deliver + // them on the next poll after the assignment callback completes. + log.warn("Skipping {} records for partition {} — no epoch assigned yet. " + + "Records will be re-delivered on next poll after assignment completes.", records.size(), partition); + return; + } RecordsAndEpoch entry = new RecordsAndEpoch(partition, epochOfPartition, records); recordMap.put(partition, entry); }); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index fd47dd552..c2c55cb5d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -248,12 +248,13 @@ private void resetOffsetMapAndRemoveWork(Collection allRemovedPa } /** - * @return the current epoch of the partition + * @return the current epoch of the partition, or null if not yet assigned */ public Long getEpochOfPartition(TopicPartition partition) { return partitionsAssignmentEpochs.get(partition); } + private void incrementPartitionAssignmentEpoch(final Collection partitions) { for (final TopicPartition partition : partitions) { Long epoch = partitionsAssignmentEpochs.getOrDefault(partition, PartitionState.KAFKA_OFFSET_ABSENCE); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java index 57275a8b3..f244e6e57 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java @@ -55,8 +55,33 @@ public abstract class BrokerIntegrationTest { */ public static KafkaContainer kafkaContainer = createKafkaContainer(null); + /** + * Derives the Confluent Platform version from the Apache Kafka client version so that + * the broker under test matches the client. The CI matrix overrides {@code kafka.version} + * via {@code -Dkafka.version=X.Y.Z}, so we read it at runtime from the client jar. + *

+ * Mapping: CP major = AK major + 4 (e.g., AK 2.8 → CP 6.2, AK 3.9 → CP 7.9). + */ + static String deriveCpKafkaImage() { + String akVersion = org.apache.kafka.common.utils.AppInfoParser.getVersion(); + log.info("Kafka client version detected: {}", akVersion); + + // Parse major.minor from the AK version + String[] parts = akVersion.split("\\."); + int akMajor = Integer.parseInt(parts[0]); + int akMinor = Integer.parseInt(parts[1]); + + // CP major = AK major + 4, CP minor = AK minor + int cpMajor = akMajor + 4; + int cpMinor = akMinor; + + String cpImage = "confluentinc/cp-kafka:" + cpMajor + "." + cpMinor + ".0"; + log.info("Using CP Kafka image: {} (derived from AK {})", cpImage, akVersion); + return cpImage; + } + public static KafkaContainer createKafkaContainer(String logSegmentSize) { - KafkaContainer base = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")) + KafkaContainer base = new KafkaContainer(DockerImageName.parse(deriveCpKafkaImage())) .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") //transaction.state.log.replication.factor .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") //transaction.state.log.min.isr .withEnv("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1") //transaction.state.log.num.partitions diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java index 2e507eb6d..877434a04 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.assertj.core.util.Lists; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import pl.tlinkowski.unij.api.UniLists; @@ -44,6 +45,7 @@ * Mocked out comparative volume tests */ @Slf4j +@Tag("performance") class LargeVolumeInMemoryTests extends ParallelEoSStreamProcessorTestBase { @SneakyThrows diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java index 56efc1e4f..5db19804e 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java @@ -17,6 +17,7 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.SoftAssertions; import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -34,6 +35,7 @@ import static pl.tlinkowski.unij.api.UniLists.of; @Slf4j +@Tag("performance") class MultiInstanceHighVolumeTest extends BrokerIntegrationTest { public List consumedKeys = Collections.synchronizedList(new ArrayList<>()); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/PartitionOrderProcessingTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/PartitionOrderProcessingTest.java index 91a5d8706..9a9660f2d 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/PartitionOrderProcessingTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/PartitionOrderProcessingTest.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.Test; import pl.tlinkowski.unij.api.UniSets; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -96,8 +97,18 @@ void allPartitionsAreProcessedInParallel() { partitionCounts.get(recordContexts.getSingleConsumerRecord().partition()).getAndIncrement(); ThreadUtils.sleepQuietly(10); // introduce a bit of processing delay - to make sure polling backpressure kicks in. }); - await().until(() -> partitionCounts.values().stream().mapToInt(AtomicInteger::get).sum() > 500); // wait until we process some messages to get the counts in. - Assertions.assertTrue(partitionCounts.values().stream().allMatch(v -> v.get() > 0), "Expect all partitions to have some messages processed, actual partitionCounts:" + partitionCounts); + // Wait for BOTH conditions: enough total messages AND all partitions represented. + // Previously the await only checked total > 500, then the assertion checked all + // partitions — a race, because Kafka may deliver from one partition first. + // Moving the partition check into the await lets Awaitility retry until + // all partitions have been reached. + await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> { + int total = partitionCounts.values().stream().mapToInt(AtomicInteger::get).sum(); + Assertions.assertTrue(total > 500, + "Expect > 500 total messages processed, actual: " + total); + Assertions.assertTrue(partitionCounts.values().stream().allMatch(v -> v.get() > 0), + "Expect all partitions to have some messages processed, actual partitionCounts:" + partitionCounts); + }); } @@ -129,7 +140,8 @@ void allPartitionsAreNotProcessedInParallel() { partitionCounts.get(recordContexts.getSingleConsumerRecord().partition()).getAndIncrement(); ThreadUtils.sleepQuietly(10); // introduce a bit of processing delay - to make sure polling backpressure kicks in. }); - await().until(() -> partitionCounts.values().stream().mapToInt(AtomicInteger::get).sum() > 500); // wait until we process some messages to get the counts in. + // 120s explicit timeout — bare await() used shaded Awaitility's 10s default, too tight for CI. + await().atMost(Duration.ofSeconds(120)).until(() -> partitionCounts.values().stream().mapToInt(AtomicInteger::get).sum() > 500); Assertions.assertFalse(partitionCounts.values().stream().allMatch(v -> v.get() > 0), "Expect some processing thread starving and not all partition counts to have some messages processed, actual partitionCounts:" + partitionCounts); } diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java index 6c32c8c09..f27922288 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java @@ -25,6 +25,7 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.SoftAssertions; import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.*; @@ -51,6 +52,7 @@ * RuntimeException when running with very high options in 0.2.0.0 (Bitset too long to encode) #35 */ @Slf4j +@Tag("performance") public class VeryLargeMessageVolumeTest extends BrokerIntegrationTest { int HIGH_MAX_POLL_RECORDS_CONFIG = 10_000; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java index 918eb0657..652cf88ec 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import pl.tlinkowski.unij.api.UniLists; @@ -144,6 +145,12 @@ protected ParallelConsumerOptions.ParallelConsumerOptionsBuilder @AfterEach public void close() { + // Reset Awaitility's global thread-local timeout state so per-test overrides + // (e.g. setDefaultTimeout) don't leak into other tests under non-deterministic + // test order (PIT baseline/mutations surface this; surefire's default ordering + // happens to mask it). Runs even if the test body threw. + Awaitility.reset(); + // don't try to close if error'd (at least one test purposefully creates an error to tests error handling) - we // don't want to bubble up an error here that we expect from here. if (!parentParallelConsumer.isClosedOrFailed()) { diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithCommitTimeoutException.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithCommitTimeoutException.java index 7593375b7..2c443eae8 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithCommitTimeoutException.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithCommitTimeoutException.java @@ -87,39 +87,47 @@ public synchronized void commitSync(Map offse parallelConsumer.onPartitionsAssigned(of(tp)); mockConsumer.updateBeginningOffsets(startOffsets); - // - new Thread() { - public void run() { - addRecords(mockConsumer); - } - }.start(); - - // - ConcurrentLinkedQueue> records = new ConcurrentLinkedQueue<>(); - parallelConsumer.poll(recordContexts -> { - recordContexts.forEach(recordContext -> { - log.warn("Processing: {}", recordContext); - records.add(recordContext); + // Daemon thread: must NOT survive past this test method, or when it wakes + // from sleep it'll addRecord() on a closed mockConsumer and throw an + // uncaught exception that PIT attributes to whatever test is running next + // in the same minion JVM. We also interrupt it and close PC in the finally + // block. + Thread recordAdder = new Thread(() -> addRecords(mockConsumer), "commit-timeout-record-adder"); + recordAdder.setDaemon(true); + recordAdder.start(); + + try { + // + ConcurrentLinkedQueue> records = new ConcurrentLinkedQueue<>(); + parallelConsumer.poll(recordContexts -> { + recordContexts.forEach(recordContext -> { + log.warn("Processing: {}", recordContext); + records.add(recordContext); + }); }); - }); - // temporarily set the wait timeout - Awaitility.setDefaultTimeout(Duration.ofSeconds(50)); - // - Awaitility.await().untilAsserted(() -> { - assertThat(records).hasSize(10); - }); - - Awaitility.reset(); + // Scope the timeout locally (don't mutate Awaitility's global default — + // that was leaking across tests if the assertion below throws before reset()). + Awaitility.await().atMost(Duration.ofSeconds(50)).untilAsserted(() -> { + assertThat(records).hasSize(10); + }); + } finally { + recordAdder.interrupt(); + parallelConsumer.close(); + } } private void addRecords(MockConsumer mockConsumer) { - for(int i = 0; i < 10; i++) { - mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, i, "key", "value")); + for (int i = 0; i < 10; i++) { try { + mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, i, "key", "value")); Thread.sleep(1000L); + } catch (IllegalStateException e) { + // mockConsumer was closed - test has ended, stop quietly + return; } catch (InterruptedException e) { - throw new RuntimeException(e); + Thread.currentThread().interrupt(); + return; } } } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java index e12480913..9813296df 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java @@ -85,39 +85,49 @@ public synchronized void commitSync(Map offse parallelConsumer.onPartitionsAssigned(of(tp)); mockConsumer.updateBeginningOffsets(startOffsets); - // - new Thread() { - public void run() { - addRecords(mockConsumer); - } - }.start(); + // Daemon thread: must NOT survive past this test method, or when it wakes + // from sleep it'll addRecord() on a closed mockConsumer and throw an + // uncaught exception that PIT attributes to whatever test is running next + // in the same minion JVM. We also interrupt it explicitly in the finally + // block to stop the loop promptly. + Thread recordAdder = new Thread(() -> addRecords(mockConsumer), "early-close-record-adder"); + recordAdder.setDaemon(true); + recordAdder.start(); - // - ConcurrentLinkedQueue> records = new ConcurrentLinkedQueue<>(); - parallelConsumer.poll(recordContexts -> { - recordContexts.forEach(recordContext -> { - log.warn("Processing: {}", recordContext); - records.add(recordContext); - }); - }); try { - Thread.sleep(5000L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + // + ConcurrentLinkedQueue> records = new ConcurrentLinkedQueue<>(); + parallelConsumer.poll(recordContexts -> { + recordContexts.forEach(recordContext -> { + log.warn("Processing: {}", recordContext); + records.add(recordContext); + }); + }); + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - log.info("Trying to close..."); - parallelConsumer.close(); // request close after 5 seconds - log.info("Close successful!"); + log.info("Trying to close..."); + parallelConsumer.close(); // request close after 5 seconds + log.info("Close successful!"); + } finally { + recordAdder.interrupt(); + } } private void addRecords(MockConsumer mockConsumer) { - for(int i = 0; i < 100000; i++) { - mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, i, "key", "value")); + for (int i = 0; i < 100000; i++) { try { + mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, i, "key", "value")); Thread.sleep(1000L); + } catch (IllegalStateException e) { + // mockConsumer was closed - test has ended, stop quietly + return; } catch (InterruptedException e) { - throw new RuntimeException(e); + Thread.currentThread().interrupt(); + return; } } } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java index c324119f1..5825bab89 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.testcontainers.shaded.org.awaitility.Awaitility; @@ -27,14 +28,16 @@ import static pl.tlinkowski.unij.api.UniLists.of; /** - * Test that PC can survive for a temporary SaslAuthenticationException. + * Test that PC can survive a temporary SaslAuthenticationException. * - * In this test, MockConsumer starts to throw SaslAuthenticationException from the beginning until 20 seconds later. + * In this test, MockConsumer throws SaslAuthenticationException from the beginning for 8 seconds, then + * goes back to normal. * - * After that MockConsumer will back to normal. - * - * The saslAuthenticationRetryTimeout is set to 25 seconds. It is expected to resume normal after 20 seconds and will - * be able to consume all produced messages. + * The saslAuthenticationRetryTimeout is set to 30 seconds (generous margin over the 8s outage window) so + * PC has room to recover. The whole test fits comfortably inside PIT's per-test baseline coverage budget + * (which caps around ~80s). The earlier 20s outage + 25s retry version intermittently failed PIT's baseline + * because the total runtime scraped that cap. Test still verifies the same property: PC recovers if the + * retry budget exceeds the outage window. * @author Shilin Wu */ @Slf4j @@ -43,12 +46,25 @@ class MockConsumerTestWithSaslAuthenticationException { private final String topic = MockConsumerTestWithSaslAuthenticationException.class.getSimpleName(); + // Field so @AfterEach can close it. This class doesn't extend + // AbstractParallelEoSStreamProcessorTestBase, so no base-class cleanup runs. + private ParallelEoSStreamProcessor parallelConsumer; + + @AfterEach + void close() { + if (parallelConsumer != null && !parallelConsumer.isClosedOrFailed()) { + parallelConsumer.close(); + } + } + /** * Test that the mock consumer works as expected */ @Test void mockConsumer() { - final AtomicLong failUntil = new AtomicLong(System.currentTimeMillis() + 20000L); + // 8s mock-failure window (was 20s) — keeps total test runtime well within PIT's baseline + // per-test budget while still triggering PC's SASL retry path meaningfully. + final AtomicLong failUntil = new AtomicLong(System.currentTimeMillis() + 8000L); var mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public synchronized ConsumerRecords poll(Duration timeout) { @@ -74,9 +90,11 @@ public synchronized void commitSync(Map offse // var options = ParallelConsumerOptions.builder() .consumer(mockConsumer) - .saslAuthenticationRetryTimeout(Duration.ofSeconds(25L)) // set retry to 25 seconds. + // 30s retry budget over an 8s mock-failure window — generous margin (22s) for + // PC's recovery poll even under PIT's slower JVM. + .saslAuthenticationRetryTimeout(Duration.ofSeconds(30L)) .build(); - var parallelConsumer = new ParallelEoSStreamProcessor(options); + parallelConsumer = new ParallelEoSStreamProcessor<>(options); parallelConsumer.subscribe(of(topic)); // MockConsumer is not a correct implementation of the Consumer contract - must manually rebalance++ - or use LongPollingMockConsumer @@ -96,14 +114,13 @@ public synchronized void commitSync(Map offse }); }); - // temporarily set the wait timeout - Awaitility.setDefaultTimeout(Duration.ofSeconds(50)); - // - Awaitility.await().untilAsserted(() -> { + // Scope the timeout locally (don't mutate Awaitility's global default — that was leaking + // across tests under PIT's different ordering, since this class doesn't have base-class + // Awaitility.reset() cleanup). + // 45s: 8s mock-failure window + retry + PIT's JVM slowdown, with headroom. + Awaitility.await().atMost(Duration.ofSeconds(45)).untilAsserted(() -> { assertThat(records).hasSize(3); }); - - Awaitility.reset(); } private void addRecords(MockConsumer mockConsumer) { diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/PCMetricsTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/PCMetricsTest.java index c85cafcef..a93d0c5ff 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/PCMetricsTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/PCMetricsTest.java @@ -91,7 +91,9 @@ void metricsRegisterBinding() { }); // metrics show processing is complete - await().untilAsserted(() -> { + // 120s budget (was default 10s) - matches the atMost budgets elsewhere in this method, + // and gives headroom under PIT's instrumented JVM processing 1500 records. + await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> { log.info("counterP0: {}, counterP1: {}", counterP0.get(), counterP1.get()); log.info(registry.getMetersAsString()); assertThat(registeredGaugeValueFor(PCMetricsDef.NUM_PAUSED_PARTITIONS)).isEqualTo(2); @@ -177,7 +179,7 @@ void metricsRegisterBinding() { numberToBlockAt.set(5000); latchPartition0.countDown(); latchPartition1.countDown(); - await().untilAsserted(() -> { + await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> { assertThat(counterP0.get()).isEqualTo(quantityP0); }); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSSStreamProcessorRebalancedTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSSStreamProcessorRebalancedTest.java index 224549f12..2c358e078 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSSStreamProcessorRebalancedTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSSStreamProcessorRebalancedTest.java @@ -9,7 +9,6 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -30,10 +29,6 @@ public void setupAsyncConsumerTestBase() { setupClients(); } - @AfterEach() - public void close() { - } - @ParameterizedTest @EnumSource(CommitMode.class) @SneakyThrows diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMapRaceTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMapRaceTest.java new file mode 100644 index 000000000..e329e6015 --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMapRaceTest.java @@ -0,0 +1,139 @@ +package io.confluent.parallelconsumer.internal; + +/*- + * Copyright (C) 2020-2026 Confluent, Inc. and contributors + */ + +import io.confluent.parallelconsumer.state.ModelUtils; +import io.confluent.parallelconsumer.state.PartitionStateManager; +import io.confluent.parallelconsumer.state.ShardManager; +import io.confluent.parallelconsumer.state.WorkContainer; +import io.confluent.parallelconsumer.state.WorkManager; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import pl.tlinkowski.unij.api.UniLists; +import pl.tlinkowski.unij.api.UniMaps; + +import java.util.List; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; + +/** + * Verifies that the epoch initialization race is handled safely: + *

    + *
  1. poll() returns records for a partition before onPartitionsAssigned() fires
  2. + *
  3. Records are safely skipped (no NPE crash)
  4. + *
  5. onPartitionsAssigned() fires, establishing the epoch and partition state
  6. + *
  7. Next poll creates valid work at the correct epoch
  8. + *
+ * This race is more likely with Kafka 2.x's eager rebalance protocol. + */ +@Slf4j +class EpochAndRecordsMapRaceTest { + + ModelUtils mu = new ModelUtils(); + WorkManager wm; + ShardManager sm; + PartitionStateManager pm; + + String topic = "topic"; + TopicPartition tp = new TopicPartition(topic, 0); + + @BeforeEach + void setup() { + PCModuleTestEnv module = mu.getModule(); + wm = module.workManager(); + sm = wm.getSm(); + pm = wm.getPm(); + // Deliberately NOT calling onPartitionsAssigned — simulating the race + } + + /** + * Core race scenario: poll returns records before onPartitionsAssigned fires. + * Records should be safely skipped (no NPE), and the map should be empty. + */ + @Test + void pollBeforeAssignmentShouldSkipRecordsNotCrash() { + // No onPartitionsAssigned called — epoch map is empty + assertThat(pm.getEpochOfPartition(tp)).isNull(); + + // poll() returns records for the unassigned partition + ConsumerRecords poll = new ConsumerRecords<>(UniMaps.of(tp, UniLists.of( + new ConsumerRecord<>(topic, 0, 0, "key", "value"), + new ConsumerRecord<>(topic, 0, 1, "key", "value") + ))); + + // This should NOT throw NPE — records are skipped + EpochAndRecordsMap recordsMap = new EpochAndRecordsMap<>(poll, pm); + + // Records should NOT be in the map (skipped due to missing epoch) + assertThat(recordsMap.count()).isEqualTo(0); + assertThat(recordsMap.partitions()).isEmpty(); + } + + /** + * Full lifecycle: poll before assignment (skipped) → assignment fires → re-poll succeeds. + * Proves records are recovered after the assignment callback completes. + */ + @Test + void fullLifecycleRecordsRecoveredAfterAssignment() { + // Step 1: poll returns records before onPartitionsAssigned — safely skipped + ConsumerRecords firstPoll = new ConsumerRecords<>(UniMaps.of(tp, UniLists.of( + new ConsumerRecord<>(topic, 0, 0, "key-0", "value"), + new ConsumerRecord<>(topic, 0, 1, "key-1", "value") + ))); + EpochAndRecordsMap firstRecords = new EpochAndRecordsMap<>(firstPoll, pm); + + // Records were skipped — nothing to register + assertThat(firstRecords.count()).isEqualTo(0); + + // Step 2: onPartitionsAssigned fires (late) — epoch and partition state established + wm.onPartitionsAssigned(UniLists.of(tp)); + long epoch = pm.getEpochOfPartition(tp); + assertThat(epoch).isEqualTo(0L); + + // Step 3: Re-poll — Kafka re-delivers the same records (they were never committed) + ConsumerRecords secondPoll = new ConsumerRecords<>(UniMaps.of(tp, UniLists.of( + new ConsumerRecord<>(topic, 0, 0, "key-0", "value"), + new ConsumerRecord<>(topic, 0, 1, "key-1", "value") + ))); + EpochAndRecordsMap secondRecords = new EpochAndRecordsMap<>(secondPoll, pm); + + // Records should now be accepted with the correct epoch + assertThat(secondRecords.count()).isEqualTo(2); + assertThat(secondRecords.records(tp).getEpochOfPartitionAtPoll()).isEqualTo(0L); + + // Step 4: Register and verify work is created + wm.registerWork(secondRecords); + List> work = sm.getWorkIfAvailable(10); + assertWithMessage("Work should be available after assignment + re-poll") + .that(work).hasSize(2); + for (var wc : work) { + assertThat(wc.getEpoch()).isEqualTo(0L); + } + } + + /** + * When epoch is already present (normal case), records are processed normally. + */ + @Test + void normalCaseWithPreExistingEpochIsUnaffected() { + // Normal flow: onPartitionsAssigned first + wm.onPartitionsAssigned(UniLists.of(tp)); + assertThat(pm.getEpochOfPartition(tp)).isEqualTo(0L); + + // poll returns records — should use existing epoch + ConsumerRecords poll = new ConsumerRecords<>(UniMaps.of(tp, UniLists.of( + new ConsumerRecord<>(topic, 0, 0, "key", "value") + ))); + EpochAndRecordsMap recordsMap = new EpochAndRecordsMap<>(poll, pm); + + assertThat(recordsMap.count()).isEqualTo(1); + assertThat(recordsMap.records(tp).getEpochOfPartitionAtPoll()).isEqualTo(0L); + } +} diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/ProducerManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/ProducerManagerTest.java index f716f151a..120167277 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/ProducerManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/ProducerManagerTest.java @@ -19,6 +19,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -76,7 +78,17 @@ class ProducerManagerTest { void setup() { setup(ParallelConsumerOptions.builder() .commitMode(PERIODIC_TRANSACTIONAL_PRODUCER) - .commitLockAcquisitionTimeout(ofSeconds(2))); + // 10s (was 2s): 2s is too tight on a CI JVM under PIT instrumentation. + .commitLockAcquisitionTimeout(ofSeconds(10))); + } + + // This class doesn't extend AbstractParallelEoSStreamProcessorTestBase, so + // nothing else resets Awaitility between tests. Not closing pc here on purpose: + // buildModule() overrides close() as a no-op so each test manages its own pc + // lifecycle explicitly (by design, to inspect mid-commit state). + @AfterEach + void tearDown() { + Awaitility.reset(); } private void setup(ParallelConsumerOptions.ParallelConsumerOptionsBuilder optionsBuilder) { @@ -318,7 +330,9 @@ void producedRecordsCantBeInTransactionWithoutItsOffsetDirect() { { var msg = "wait for first record to finish"; log.debug(msg); - await(msg).untilAsserted(() -> assertThat(pc.getWorkMailBox()).hasSize(1)); + // 20s (was default 10s): tight under PIT's instrumented JVM + await(msg).atMost(ofSeconds(20)) + .untilAsserted(() -> assertThat(pc.getWorkMailBox()).hasSize(1)); } // send another record, register the work @@ -338,7 +352,7 @@ void producedRecordsCantBeInTransactionWithoutItsOffsetDirect() { // blocks, as offset 1 is blocked sending and so cannot acquire commit lock var msg = "Ensure expected produce lock is now held by blocked worker thread"; log.debug(msg); - await(msg).untilTrue(blockedOn1); + await(msg).atMost(ofSeconds(20)).untilTrue(blockedOn1); var commitBlocks = new BlockedThreadAsserter(); @@ -354,12 +368,13 @@ void producedRecordsCantBeInTransactionWithoutItsOffsetDirect() { }, () -> { log.debug("Unblocking offset processing offset1Mutex..."); offset1Mutex.countDown(); - }, ofSeconds(10)); + }, ofSeconds(20)); // was 10s; too tight under PIT // - await().untilAsserted(() -> Truth.assertWithMessage("commit should now have unlocked and returned") - .that(commitBlocks.functionHasCompleted()) - .isTrue()); + await().atMost(ofSeconds(20)) + .untilAsserted(() -> Truth.assertWithMessage("commit should now have unlocked and returned") + .that(commitBlocks.functionHasCompleted()) + .isTrue()); final int nextExpectedOffset = 2; // as only first of two work completed diff --git a/parallel-consumer-vertx/src/test-integration/java/io/confluent/parallelconsumer/vertx/integrationTests/VertxConcurrencyIT.java b/parallel-consumer-vertx/src/test-integration/java/io/confluent/parallelconsumer/vertx/integrationTests/VertxConcurrencyIT.java index 31838f9b8..836e4f64f 100644 --- a/parallel-consumer-vertx/src/test-integration/java/io/confluent/parallelconsumer/vertx/integrationTests/VertxConcurrencyIT.java +++ b/parallel-consumer-vertx/src/test-integration/java/io/confluent/parallelconsumer/vertx/integrationTests/VertxConcurrencyIT.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.parallel.Isolated; import org.testcontainers.junit.jupiter.Testcontainers; import pl.tlinkowski.unij.api.UniMaps; @@ -40,6 +41,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static com.github.tomakehurst.wiremock.client.WireMock.*; @@ -134,6 +136,7 @@ static void close() { * would expect. */ @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) @SneakyThrows void testVertxConcurrency() { var commitMode = PERIODIC_CONSUMER_ASYNCHRONOUS; @@ -208,7 +211,7 @@ void testVertxConcurrency() { var failureMessage = msg("Mock server receives {} requests in parallel from vertx engine", expectedMessageCount / 2); try { - waitAtMost(ofSeconds(20)) + waitAtMost(ofSeconds(120)) .pollInterval(ofMillis(200)) .alias(failureMessage) .untilAsserted(() -> { @@ -217,11 +220,12 @@ void testVertxConcurrency() { }); } catch (ConditionTimeoutException e) { fail(failureMessage + "\n" + e.getMessage()); + } finally { + // Always release the latch — if the test fails, this prevents WireMock threads + // from hanging for 30s each on the unreleased latch + log.info("{} requests received by server, releasing server response lock.", requestsReceivedOnServer.size()); + LatchTestUtils.release(responseLock); } - log.info("{} requests received in parallel by server, releasing server response lock.", requestsReceivedOnServer.size()); - - // all requests were received in parallel, so unlock the server to respond to all of them - LatchTestUtils.release(responseLock); // assertNumberOfThreads(); diff --git a/pom.xml b/pom.xml index 78a81326a..eb5f50a70 100644 --- a/pom.xml +++ b/pom.xml @@ -81,12 +81,23 @@ ${skipTests} ${skipTests} + + + performance + 5.0.0 1.18.28 1.1.1 3.2.5 + 4.8.6 + 4.8.6.6 + 1.17.4 + 1.2.2 2.0.13 @@ -706,6 +717,11 @@ ${skipTests} ${skipITs} methods + + ${included.groups} + ${excluded.groups} @@ -725,8 +741,50 @@ report + + prepare-agent-integration + + prepare-agent-integration + + + + report-integration + post-integration-test + + report-integration + + + + com.github.spotbugs + spotbugs-maven-plugin + ${spotbugs-maven-plugin.version} + + Max + Medium + true + + + + com.github.spotbugs + spotbugs + ${spotbugs.version} + + + + + org.pitest + pitest-maven + ${pitest.version} + + + org.pitest + pitest-junit5-plugin + ${pitest-junit5.version} + + + org.apache.maven.plugins maven-enforcer-plugin @@ -950,18 +1008,21 @@ + - - confluent - https://packages.confluent.io/maven/ - central https://repo1.maven.org/maven2/ - jitpack.io - https://jitpack.io + confluent + https://packages.confluent.io/maven/ astubbs-truth-generator diff --git a/src/docs/README_TEMPLATE.adoc b/src/docs/README_TEMPLATE.adoc index c1db195ab..301fe0119 100644 --- a/src/docs/README_TEMPLATE.adoc +++ b/src/docs/README_TEMPLATE.adoc @@ -281,7 +281,7 @@ The user just has to provide a function to extract from the message the HTTP cal === Illustrative Performance Example -.(see link:./parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VolumeTests.java[VolumeTests.java]) +.(see link:./parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java[VeryLargeMessageVolumeTest.java]) These performance comparison results below, even though are based on real performance measurement results, are for illustrative purposes. To see how the performance of the tool is related to instance counts, partition counts, key distribution and how it would relate to the vanilla client. Actual results will vary wildly depending upon the setup being deployed into. @@ -1100,13 +1100,50 @@ Helper scripts are in the `bin/` directory: Quick local build (compile + unit tests):: `bin/build.sh` +Unit tests only (no Docker needed):: +`bin/ci-unit-test.sh` + +Integration tests only (requires Docker for TestContainers):: +`bin/ci-integration-test.sh` + Full CI build with all tests (unit + integration):: `bin/ci-build.sh` CI build against a specific Kafka version:: `bin/ci-build.sh 3.9.1` -The GitHub Actions CI workflow uses `bin/ci-build.sh`, so running it locally reproduces the CI environment. +Performance test suite (also `bin/performance-test.cmd` on Windows):: +`bin/performance-test.sh` + +All `ci-*` scripts use the `-Pci` Maven profile which enables license checking and disables parallel test execution. The GitHub Actions CI workflow uses these scripts, so running them locally reproduces the CI environment. + +=== Performance Tests + +Tests tagged `@Tag("performance")` are excluded from the regular CI build because they need substantial hardware. They run on a dedicated self-hosted runner via `.github/workflows/performance.yml` (manual trigger or weekly schedule). + +To run the performance suite locally, use `bin/performance-test.sh`. To set up your own self-hosted runner for these tests, see link:./docs/SELF_HOSTED_RUNNER.md[docs/SELF_HOSTED_RUNNER.md]. + +=== Releasing + +The `pom.xml` version is the source of truth for publishing — there is no `maven-release-plugin` step. + +On every push to `master`, `.github/workflows/publish.yml` deploys to Maven Central: + +* If the version ends in `-SNAPSHOT` → publishes a snapshot +* If the version does not end in `-SNAPSHOT` → publishes a full release, creates a `v` git tag, and creates a GitHub release + +To cut a release: + +. Open a PR removing `-SNAPSHOT` from `` in the parent `pom.xml` (e.g. `0.6.0.0-SNAPSHOT` → `0.6.0.0`) +. Merge it to master → CI publishes the release +. Open another PR bumping to the next snapshot (e.g. `0.6.0.1-SNAPSHOT`) and merge + +Required GitHub repository secrets: + +* `MAVEN_CENTRAL_USERNAME` — Sonatype Central Portal token username +* `MAVEN_CENTRAL_PASSWORD` — Sonatype Central Portal token password +* `MAVEN_GPG_PRIVATE_KEY` — Armored GPG private key for signing artifacts +* `MAVEN_GPG_PASSPHRASE` — Passphrase for the GPG key === Testing