diff --git a/.gitignore b/.gitignore index 259113f73..ea59400a3 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,4 @@ build/ !**/src/test/**/build/ ### VS Code ### -.vscode/ +.vscode/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index cc6f2e042..de31b9fe7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,9 +6,8 @@ RUN mvn dependency:go-offline COPY src src RUN mvn package -FROM openjdk:17-jdk-slim +FROM eclipse-temurin:17-alpine-3.22 COPY --from=build /app/target/*.jar /high-load-course.jar CMD ["java", "-jar", "/high-load-course.jar"] - diff --git a/README.md b/README.md index 56d36970f..bc19c4280 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # Template for the HighLoad course + This project is based on [Tiny Event Sourcing library](https://github.com/andrsuh/tiny-event-sourcing) ### Run PostgreSql + This example uses Postgres as an implementation of the Event store. You can see it in `pom.xml`: ``` @@ -12,7 +14,8 @@ This example uses Postgres as an implementation of the Event store. You can see ``` -Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file that we have in the root of the project. +Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file +that we have in the root of the project. # More comprehensive information about the course, project, how to run tests is here: @@ -21,19 +24,25 @@ https://andrsuh.notion.site/2595d535059281d8a815c2cb3875c376?source=copy_link https://andrsuh.notion.site/2625d5350592801aaf88c7c95302d10c?source=copy_link ### Run the infrastructure + Set of the services you need to start developing and testing process is following: -- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also serves as a third-party payment system. + +- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also + serves as a third-party payment system. - Postgres DBMS - Prometheus + Grafana - metrics collection and visualization services You can run all beforementioned services by the following command: + ``` docker compose -f docker-compose.yml up ``` ### Run the application -To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and re-run it immediately in the IDE. +To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker +contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and +re-run it immediately in the IDE. ### If you want to pull changes from the main repository into your fork diff --git a/grafana/provisioning/dashboards/ServicesStatistic.json b/grafana/provisioning/dashboards/ServicesStatistic.json index 684b97269..a77405988 100644 --- a/grafana/provisioning/dashboards/ServicesStatistic.json +++ b/grafana/provisioning/dashboards/ServicesStatistic.json @@ -38,6 +38,104 @@ }, "id": 27, "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "bars", + "fillOpacity": 23, + "gradientMode": "hue", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "noValue": "0", + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 9 + }, + "id": 109, + "options": { + "legend": { + "calcs": [], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "sum(increase(payment_external_retry_count_total[30s])) by (accountName)", + "legendFormat": "{{accountName}}", + "range": true, + "refId": "A" + } + ], + "title": "Payment Retry Count (30s)", + "type": "timeseries" + }, { "datasource": { "type": "prometheus", @@ -745,6 +843,152 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, + "description": "Время ожидания задачи в очереди перед обработкой (перцентили)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Латентность (с)", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 9 + }, + "id": 130, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(bombardier_in_queue_latency_seconds_bucket[30s])) by (le))", + "legendFormat": "p50", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.75, sum(rate(bombardier_in_queue_latency_seconds_bucket[30s])) by (le))", + "legendFormat": "p75", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.90, sum(rate(bombardier_in_queue_latency_seconds_bucket[30s])) by (le))", + "legendFormat": "p90", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(bombardier_in_queue_latency_seconds_bucket[30s])) by (le))", + "legendFormat": "p95", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum(rate(bombardier_in_queue_latency_seconds_bucket[30s])) by (le))", + "legendFormat": "p99", + "range": true, + "refId": "E" + } + ], + "title": "Bombardier — In Queue Latency (перцентили)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, "fieldConfig": { "defaults": { "color": { @@ -1345,7 +1589,6 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "exemplar": true, "expr": "rate(stage_duration_ok_sum{service=~\"$service\", result=\"success\"}[1m]) / rate(stage_duration_ok_count{service=~\"$service\", result=\"success\"}[1m])", "interval": "", "legendFormat": "{{stage}}", @@ -1785,7 +2028,7 @@ "exemplar": true, "expr": "rate(external_sys_duration_count{service=~\"$service\"}[30s])", "hide": false, - "interval": "", + "instant": false, "legendFormat": "{{accountName}} - {{outcome}}", "range": true, "refId": "C" @@ -1923,7 +2166,7 @@ "exemplar": true, "expr": "rate(external_sys_duration_sum{service=~\"$service\"}[1m]) / rate(external_sys_duration_count{service=~\"$service\"}[1m])", "hide": false, - "interval": "", + "instant": false, "legendFormat": "{{accountName}} - {{outcome}}", "range": true, "refId": "C" @@ -2024,37 +2267,13 @@ } } ] - }, - { - "__systemRef": "hideSeriesFrom", - "matcher": { - "id": "byNames", - "options": { - "mode": "exclude", - "names": [ - "payOrder - " - ], - "prefix": "All except:", - "readOnly": true - } - }, - "properties": [ - { - "id": "custom.hideFrom", - "value": { - "legend": false, - "tooltip": false, - "viz": true - } - } - ] } ] }, "gridPos": { "h": 8, "w": 12, - "x": 12, + "x": 0, "y": 108 }, "id": 43, @@ -2065,9 +2284,7 @@ ], "displayMode": "table", "placement": "right", - "showLegend": true, - "sortBy": "Max", - "sortDesc": true + "showLegend": true }, "tooltip": { "mode": "single", @@ -2082,7 +2299,7 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "expr": "rate(http_request_latent_sum{service=~\"$service\"}[15s]) / rate(http_request_latent_count{service=~\"$service\"}[15s])", + "expr": "sum(rate(http_request_latent_sum{service=~\"$service\"}[15s]) / rate(http_request_latent_count{service=~\"$service\"}[15s]))", "hide": false, "instant": false, "legendFormat": "{{method}} - {{result}}", @@ -2390,7 +2607,6 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "exemplar": true, "expr": "sum by (name) (executor_active_threads{})", "interval": "", "legendFormat": "{{name}}", @@ -2669,20 +2885,18 @@ "h": 8, "w": 12, "x": 0, - "y": 38 + "y": 46 }, - "id": 95, + "id": 41, "options": { "legend": { - "calcs": [ - "max" - ], + "calcs": [], "displayMode": "table", "placement": "right", "showLegend": true }, "tooltip": { - "mode": "multi", + "mode": "single", "sort": "none" } }, @@ -2694,211 +2908,9 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "expr": "sum by (job) (system_cpu_usage{service=~\"$service\"})", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{job}} - System CPU Usage", - "range": true, - "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum by (job) (process_cpu_usage{service=~\"$service\"})", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{job}} -Process CPU Usage", - "range": true, - "refId": "B" - } - ], - "title": "CPU Usage", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 17, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "s" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 38 - }, - "id": 42, - "options": { - "legend": { - "calcs": [], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "11.4.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum(rate(jvm_gc_pause_seconds_sum{}[5m])) by (job)", - "instant": false, - "legendFormat": "{{job}}", - "range": true, - "refId": "A" - } - ], - "title": "GC delay", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 17, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "short" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 46 - }, - "id": 41, - "options": { - "legend": { - "calcs": [], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "11.4.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum by (job) (process_files_open_files{})", - "instant": false, - "legendFormat": "{{job}}", + "expr": "sum by (job) (process_files_open_files{})", + "instant": false, + "legendFormat": "{{job}}", "range": true, "refId": "A" } @@ -2916,14 +2928,14 @@ "h": 1, "w": 24, "x": 0, - "y": 6 + "y": 7 }, "id": 97, "panels": [ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3007,7 +3019,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "editorMode": "code", "expr": "sum by (job) (jetty_threads_config_max{})", @@ -3025,7 +3037,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3109,7 +3121,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "editorMode": "code", "expr": "sum by (job) (jetty_threads_config_min{service=~\"$service\"})", @@ -3127,7 +3139,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3300,7 +3312,7 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" @@ -3400,13 +3412,12 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" } }, - "links": [], "mappings": [], "thresholds": { "mode": "absolute", @@ -3421,7 +3432,7 @@ } ] }, - "unit": "s" + "unit": "none" }, "overrides": [] }, @@ -3468,8 +3479,1184 @@ ], "title": "Jetty Statistics", "type": "row" - } - ], + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 101, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "История изменения количества задач в очереди", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Задачи в очереди", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 16, + "x": 8, + "y": 8 + }, + "id": 103, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "payment_processing_planned_total - payment_processing_started_total", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "История задач в очереди", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Скорость входа в ожидание и выхода из ожидания (requests/sec)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "reqps" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Скорость увеличения очереди" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Скорость числа задач в обработке" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "blue", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 104, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m])", + "legendFormat": "Попало в очередь: {{accountName}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_started_total[1m])", + "legendFormat": "Приняты в обработку (вышли из очереди): {{accountName}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_completed_total[1m])", + "legendFormat": "Обработаны: {{accountName}}", + "refId": "C" + } + ], + "title": "Throughput (в очереди vs в обработке vs обработаны)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Если значение > 0, задачи накапливаются быстрее, чем обрабатываются", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": true, + "axisColorMode": "text", + "axisLabel": "Δ запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "scheme", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 105, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m]) - rate(payment_processing_started_total[1m])", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "Скорость накопления задач (увеличения очереди)", + "type": "timeseries" + } + ], + "title": "Payment Processing - Active Tasks", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 108, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.75, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p75 {{accountName}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.80, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p80 {{accountName}}", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.85, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p85 {{accountName}}", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.90, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "hide": false, + "legendFormat": "p90 {{accountName}}", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "hide": false, + "legendFormat": "p95 {{accountName}}", + "range": true, + "refId": "E" + } + ], + "title": "Payment Processing Latency", + "type": "timeseries" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 9 + }, + "id": 110, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Текущее число активных (выполняемых) задач в ThreadPoolExecutor платежей", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Активные задачи", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 80 + }, + { + "color": "red", + "value": 100 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "id": 111, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "payment_executor_active_tasks", + "legendFormat": "Active tasks", + "range": true, + "refId": "A" + } + ], + "title": "Executor — Параллельные задачи (active threads)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Текущее число задач, ожидающих в очереди ThreadPoolExecutor", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Задачи в очереди", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 10000 + }, + { + "color": "red", + "value": 40000 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "id": 112, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "payment_executor_queue_size", + "legendFormat": "Queue size", + "range": true, + "refId": "A" + } + ], + "title": "Executor — Задачи в очереди (queue size)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Число HTTP-запросов, отправленных на платёжный аккаунт и ожидающих ответа (in-flight)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "In-flight запросы", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 18 + }, + "id": 113, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "payment_account_inflight_requests", + "legendFormat": "{{accountName}}", + "range": true, + "refId": "A" + } + ], + "title": "In-flight запросы на платёжные аккаунты", + "type": "timeseries" + } + ], + "title": "Payment Executor & Inflight Metrics", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 10 + }, + "id": 120, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Число запросов, обрабатываемых API контроллером в данный момент", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Запросы", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 11 + }, + "id": 121, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "api_gateway_controller_requests", + "legendFormat": "{{accountName}}", + "range": true, + "refId": "A" + } + ], + "title": "API Gateway — Запросы контроллера", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Число запросов, находящихся на ретрае в данный момент (по аккаунту)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Retry запросы", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 5 + }, + { + "color": "red", + "value": 20 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 11 + }, + "id": 122, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "payment_account_retry_requests", + "legendFormat": "{{accountName}}", + "range": true, + "refId": "A" + } + ], + "title": "Payment Account — Retry запросы", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Латентность выполнения задач в OrderPayer — время от создания до завершения (перцентили)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Латентность (с)", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 19 + }, + "id": 123, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(order_payer_exec_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p50 {{accountName}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.90, sum(rate(order_payer_exec_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p90 {{accountName}}", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(order_payer_exec_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p95 {{accountName}}", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum(rate(order_payer_exec_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p99 {{accountName}}", + "range": true, + "refId": "D" + } + ], + "title": "Order Payer — Латентность выполнения задач (перцентили)", + "type": "timeseries" + } + ], + "title": "API Gateway, Retry & Order Payer Metrics", + "type": "row" + }], "preload": false, "refresh": "5s", "schemaVersion": 40, @@ -3516,4 +4703,5 @@ "uid": "KVr-Vmpnz", "version": 4, "weekStart": "" -} \ No newline at end of file +} + diff --git a/grafana/provisioning/datasources/datasource.yml b/grafana/provisioning/datasources/datasource.yml index b37f2a42c..ea5d482ab 100644 --- a/grafana/provisioning/datasources/datasource.yml +++ b/grafana/provisioning/datasources/datasource.yml @@ -13,6 +13,8 @@ datasources: - name: Prometheus # datasource type. Required type: prometheus + # specific unique identifier to match dashboard references + uid: PBFA97CFB590B2093 # access mode. direct or proxy. Required access: proxy # org id. will default to orgId 1 if not specified diff --git a/pom.xml b/pom.xml index 5724b2568..554b0bdcd 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ - 4.0.0 @@ -15,7 +15,7 @@ OnlineShop Application for resilience and highly-loaded applications course - + 2.2.0 1.9.0 4.12.0 @@ -25,7 +25,7 @@ 3.1.8 - + io.github.resilience4j resilience4j-ratelimiter @@ -67,15 +67,15 @@ ${jetty.version} - - - - + + + + - - - - + + + + com.fasterxml.jackson.module @@ -171,6 +171,7 @@ -Xjsr305=strict + -Xannotation-default-target=param-property spring diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index 936c7edd9..ce045546f 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -6,16 +6,16 @@ scrape_configs: - job_name: 'bombardier-docker-network-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['bombardier:1234'] + - targets: [ 'bombardier:1234' ] - job_name: 'bombardier-host-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:1234'] + - targets: [ 'host.docker.internal:1234' ] - job_name: 'online-store-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:8081'] + - targets: [ 'host.docker.internal:8081' ] - job_name: 'online-shop-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:18081'] \ No newline at end of file + - targets: [ 'host.docker.internal:18081' ] \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6f23fa18d..875f3dc00 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -1,23 +1,30 @@ package ru.quipy.apigateway +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation.* import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer +import ru.quipy.payments.logic.PaymentAccountProperties import java.util.* +import java.util.concurrent.atomic.AtomicInteger @RestController -class APIController { +class APIController( + private val orderRepository: OrderRepository, + private val orderPayer: OrderPayer, + meterRegistry: MeterRegistry, + accountProperties: List +) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) + val controllerRequests = AtomicInteger(0) - @Autowired - private lateinit var orderRepository: OrderRepository - - @Autowired - private lateinit var orderPayer: OrderPayer + init { + meterRegistry.gauge("api.gateway.controller.requests", listOf(Tag.of("accountName", accountProperties.joinToString { it.accountName + " " })), controllerRequests) { it.toDouble() } + } @PostMapping("/users") fun createUser(@RequestBody req: CreateUserRequest): User { @@ -56,13 +63,13 @@ class APIController { @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + controllerRequests.incrementAndGet() val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) it } ?: throw IllegalArgumentException("No such order $orderId") - val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) return PaymentSubmissionDto(createdAt, paymentId) } diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt new file mode 100644 index 000000000..30c28fbca --- /dev/null +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -0,0 +1,33 @@ +package ru.quipy.apigateway + +import org.slf4j.LoggerFactory +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.ExceptionHandler +import org.springframework.web.bind.annotation.RestControllerAdvice +import ru.quipy.exceptions.TooLongRequestException +import ru.quipy.exceptions.TooManyRequestsException +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +@RestControllerAdvice +class GlobalExceptionHandler( +) { + companion object { + val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) + private var currentRetryAfterSeconds = 1 + private var exp_base = 2; + private const val RESET_WINDOW_MS = 30000L + } + + private val rejectedRequestsCount = AtomicInteger(0) + private val lastRejectionTime = AtomicLong(0) + @ExceptionHandler(TooLongRequestException::class) + fun handleTooManyRequests(exception: TooLongRequestException): ResponseEntity { + return ResponseEntity.status(200).body("your request very long, i am so sorry") + } + @ExceptionHandler(TooManyRequestsException::class) + fun handleTooManyRequestsRetriable(exception: TooManyRequestsException): ResponseEntity { + return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).header("Retry-After", "1000").body("to many requests") + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt index 383c047f7..3f4c2cf97 100644 --- a/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/LeakingBucketRateLimiter.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.slf4j.Logger import org.slf4j.LoggerFactory +import ru.quipy.payments.logic.now import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.LinkedBlockingQueue @@ -22,6 +23,15 @@ class LeakingBucketRateLimiter( return queue.offer(1) } + fun tickBlocking(timeout: Long): Boolean { + val startedAt = now() + var checkBlocking = tick() + while (now() - startedAt < timeout && !checkBlocking) { + checkBlocking = tick() + } + return checkBlocking + } + private val releaseJob = rateLimiterScope.launch { while (true) { delay(window.toMillis()) diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 6ff3092ab..c5bd36a0a 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -1,5 +1,7 @@ package ru.quipy.common.utils +import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Deadline +import com.github.dockerjava.zerodep.shaded.org.apache.hc.core5.util.Timeout import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.delay @@ -10,8 +12,6 @@ import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock class SlidingWindowRateLimiter( private val rate: Long, @@ -39,6 +39,15 @@ class SlidingWindowRateLimiter( } } + fun tickBlocking(timeout: Long): Boolean { + val timeStarted = System.currentTimeMillis() + while (System.currentTimeMillis()-timeStarted < timeout && !tick()) { + Thread.sleep(2) + } + return System.currentTimeMillis()-timeStarted < timeout + } + + data class Measure( val value: Long, val timestamp: Long @@ -64,6 +73,7 @@ class SlidingWindowRateLimiter( queue.take() } }.invokeOnCompletion { th -> if (th != null) logger.error("Rate limiter release job completed", th) } + companion object { private val logger: Logger = LoggerFactory.getLogger(SlidingWindowRateLimiter::class.java) } diff --git a/src/main/kotlin/ru/quipy/exceptions/RateLimitWasBreached.kt b/src/main/kotlin/ru/quipy/exceptions/RateLimitWasBreached.kt new file mode 100644 index 000000000..8469ea9a3 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/RateLimitWasBreached.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class RateLimitWasBreached : RuntimeException() { +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt b/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt new file mode 100644 index 000000000..c50d9bc48 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class TooLongRequestException() : RuntimeException() { +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt new file mode 100644 index 000000000..cacd67810 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -0,0 +1,3 @@ +package ru.quipy.exceptions + +class TooManyRequestsException(val deadline: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt new file mode 100644 index 000000000..4c95b4b30 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class TooManyRequestsRetriableException(val deadline: Long) : RuntimeException(){ +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index 767f23b3e..a4c312eaf 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -1,5 +1,6 @@ package ru.quipy.orders.subscribers +import io.micrometer.core.instrument.Metrics import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -19,6 +20,7 @@ class PaymentSubscriber { val logger: Logger = LoggerFactory.getLogger(PaymentSubscriber::class.java) + val paymentSucceededCounter = Metrics.counter("succeeded.payments", "account", "acc-7") @Autowired lateinit var subscriptionsManager: AggregateSubscriptionsManager @@ -42,6 +44,7 @@ class PaymentSubscriber { ).toSeconds() }, spent in queue: ${event.spentInQueueDuration.toSeconds()}" ) + paymentSucceededCounter.increment() } } } diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index eceb90cff..97b0bf95c 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -3,18 +3,31 @@ package ru.quipy.payments.config import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler +import ru.quipy.common.utils.LeakingBucketRateLimiter +import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.logic.* +import ru.quipy.payments.logic.PaymentAccountProperties +import ru.quipy.payments.logic.PaymentAggregateState +import ru.quipy.payments.logic.PaymentExternalSystemAdapter +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl import java.net.URI import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse +import java.time.Duration import java.util.* - +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.Semaphore +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit @Configuration class PaymentAccountsConfig { @@ -22,6 +35,9 @@ class PaymentAccountsConfig { private val javaClient = HttpClient.newBuilder().build() private val mapper = ObjectMapper().registerKotlinModule().registerModules(JavaTimeModule()) } + val logger: Logger = LoggerFactory.getLogger(PaymentAccountsConfig::class.java) + + var rateCheckWindow: Duration = Duration.ofMillis(1000) @Value("\${payment.hostPort}") lateinit var paymentProviderHostPort: String @@ -36,28 +52,113 @@ class PaymentAccountsConfig { lateinit var allowedAccounts: List @Bean - fun accountAdapters(paymentService: EventSourcingService): List { + fun warehouseIfUnfinishedWork( + accountProperties: List, + meterRegistry: io.micrometer.core.instrument.MeterRegistry, + ): ThreadPoolExecutor { + val corePoolSize = 1000 + val maximumPoolSize = 1000 + val queueSize = 100_000 + val keepAliveTime = 0 + logger.info("Thread Pool Properties: core pool size - {}, maximum pool size - {}, queue size - {}, keepAliveTime - {}", corePoolSize, maximumPoolSize, queueSize, keepAliveTime) + val executor = ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + 0, + TimeUnit.MILLISECONDS, + LinkedBlockingQueue(100_000), + NamedThreadFactory("payment-submission-executor"), + CallerBlockingRejectedExecutionHandler() + ) + + meterRegistry.gauge("payment.executor.active.tasks", executor) { it.activeCount.toDouble() } + meterRegistry.gauge("payment.executor.queue.size", executor) { it.queue.size.toDouble() } + + return executor + } + + @Bean + fun parallelLimiter( + accountProperties: List + ): Semaphore { + val parallelRequests = accountProperties.minOf { it.parallelRequests } + logger.info("Semaphore Properties: permits count - {}", parallelRequests) + return Semaphore(parallelRequests) + } + +// @Bean +// fun burstRateLimiter( +// accountProperties: List, +// @Value("#{'\${payment.processingTimeMillis}'.split(',')}") +// processingTimeMillis: Int +// ): LeakingBucketRateLimiter { +// val bucketSize = 6000 +// val rate = accountProperties.minOf { it.rateLimitPerSec }.toLong() +// logger.info("Burst Rate Limiter Properties: bucket size - {}, rate - {}", bucketSize, rate) +// return LeakingBucketRateLimiter( +// rate = rate, +// window = rateCheckWindow, +// bucketSize = bucketSize +// ) +// } + + @Bean + fun smoothOutIncoming( + accountProperties: List, + ): SlidingWindowRateLimiter { + val rate = accountProperties.minOf { it.rateLimitPerSec }.toLong() + logger.info("Incoming Rate Limiter Properties: rate - {}", rate) + return SlidingWindowRateLimiter( + rate = rate, + window = rateCheckWindow, + ) + } + + @Bean + fun accountProperties(): List { val request = HttpRequest.newBuilder() .uri(URI("http://${paymentProviderHostPort}/external/accounts?serviceName=$serviceName&token=$token")) + .timeout(Duration.ofSeconds(10)) .GET() .build() val resp = javaClient.send(request, HttpResponse.BodyHandlers.ofString()) - println("\nPayment accounts list:") - return mapper.readValue>( + val accounts: List = mapper.readValue>( resp.body(), - mapper.typeFactory.constructCollectionType(List::class.java, PaymentAccountProperties::class.java) - ) - .filter { it.accountName in allowedAccounts } - .map { it.copy(enabled = true) } + mapper.typeFactory.constructCollectionType( + List::class.java, + PaymentAccountProperties::class.java + ) + ).filter { it.accountName in allowedAccounts } + + return accounts + } + + @Bean + fun accountAdapters( + paymentService: EventSourcingService, + meterRegistry: io.micrometer.core.instrument.MeterRegistry, + accountProperties: List + ): List { + return accountProperties + .map { + if (it.accountName == "acc-22") { + it.copy(enabled = true, hedgingEnabled = true, hedgeDelayMillis = 150L) + } else { + it.copy(enabled = true) + } + } .onEach(::println) .map { PaymentExternalSystemAdapterImpl( it, paymentService, paymentProviderHostPort, - token + token, + meterRegistry, + parallelLimiter(accountProperties), + rateLimiter = smoothOutIncoming(accountProperties) ) } } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..33df792f6 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,55 +1,74 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler -import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate import java.util.* -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.CompletableFuture import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @Service -class OrderPayer { +class OrderPayer( + val rateLimiter: SlidingWindowRateLimiter, + @Qualifier("warehouseIfUnfinishedWork") + val paymentExecutor: ThreadPoolExecutor, + meterRegistry: MeterRegistry, + accountProperties: List + ) { companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } + val inExecTimer = meterRegistry.timer("order.payer.exec.latency", "accountName", accountProperties.joinToString { it.accountName + " " }) + + private val plannedCounter: Counter = meterRegistry.counter("payment.processing.planned") + private val startedCounter: Counter = meterRegistry.counter("payment.processing.started") + private val completedCounter: Counter = meterRegistry.counter("payment.processing.completed") + @Autowired private lateinit var paymentESService: EventSourcingService @Autowired private lateinit var paymentService: PaymentService - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0L, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) - fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() - paymentExecutor.submit { - val createdEvent = paymentESService.create { - it.create( - paymentId, - orderId, - amount - ) + plannedCounter.increment() + + CompletableFuture + .runAsync( + { + startedCounter.increment() + paymentESService.create { + it.create(paymentId, orderId, amount) + } + }, + paymentExecutor, + ) + .thenRunAsync( + { + inExecTimer.record(now() - createdAt, TimeUnit.MILLISECONDS) + paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) + }, + paymentExecutor, + ) + .whenComplete { _, ex -> + if (ex != null) { + logger.warn("process stopped before submit for payment: $paymentId", ex) + } + completedCounter.increment() } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") - paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) - } return createdAt } + } \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt index 2389e3cb3..8aafdaa33 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentAggregateCommands.kt @@ -1,5 +1,6 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.Metrics import ru.quipy.payments.api.PaymentCreatedEvent import ru.quipy.payments.api.PaymentProcessedEvent import ru.quipy.payments.api.PaymentSubmittedEvent @@ -27,9 +28,12 @@ fun PaymentAggregateState.logProcessing( transactionId: UUID? = null, reason: String? = null ): PaymentProcessedEvent { - val submittedAt = this.submissions[transactionId ?: UUID.randomUUID()]?.timeStarted ?: 0 - val spentInQueueDuration = this.submissions[transactionId ?: UUID.randomUUID()]?.spentInQueue ?: Duration.ofMillis(0) - + val startCallInMap = now() + val submission = transactionId?.let { this.submissions[it] } + val endCallInMap = now() + val submittedAt = submission?.timeStarted ?: 0 + val spentInQueueDuration = submission?.spentInQueue ?: Duration.ofMillis(0) + Metrics.timer("bombardier.in.queue.latency").record(Duration.ofMillis(endCallInMap-startCallInMap)) return PaymentProcessedEvent( this.getId(), success, this.orderId, submittedAt, processedAt, this.amount!!, transactionId, reason, spentInQueueDuration ) diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 5cb12106a..6590e20c7 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,15 +2,35 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.RequestBody +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import org.slf4j.Logger import org.slf4j.LoggerFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException + import ru.quipy.payments.api.PaymentAggregate +import java.io.EOFException +import java.io.IOException +import java.io.InterruptedIOException import java.net.SocketTimeoutException +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse import java.time.Duration import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import kotlin.math.pow +import kotlin.random.Random // Advice: always treat time as a Duration @@ -19,73 +39,138 @@ class PaymentExternalSystemAdapterImpl( private val paymentESService: EventSourcingService, private val paymentProviderHostPort: String, private val token: String, + meterRegistry: MeterRegistry, + private val parallelLimiter: Semaphore, + private val rateLimiter: SlidingWindowRateLimiter ) : PaymentExternalSystemAdapter { - companion object { - val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) - - val emptyBody = RequestBody.create(null, ByteArray(0)) - val mapper = ObjectMapper().registerKotlinModule() - } - private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime + private val connectTimeoutMillis = 200L private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests + private val inFlightRequests = AtomicInteger(0) + private val retryRequests = AtomicInteger(0) + private val scope = CoroutineScope(Dispatchers.Default) - private val client = OkHttpClient.Builder().build() + private val hedgeEnabled = properties.hedgingEnabled + private val hedgeDelayMillis = properties.hedgeDelayMillis ?: (requestAverageProcessingTime.toMillis() / 2).coerceAtLeast(50L) - override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { - logger.warn("[$accountName] Submitting payment request for payment $paymentId") + private val timer = + meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) + private val retryCounter = + meterRegistry.counter("payment.external.retry.count", "accountName", properties.accountName) + private val retryExecutor: ScheduledExecutorService = Executors.newScheduledThreadPool(50) + private val httpClient = HttpClient + .newBuilder() + .executor(Executors.newFixedThreadPool(500)) + .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofMillis(connectTimeoutMillis)) + .build() - val transactionId = UUID.randomUUID() + init { + meterRegistry.gauge( + "payment.account.inflight.requests", + listOf(Tag.of("accountName", properties.accountName)), + inFlightRequests + ) { it.toDouble() } + meterRegistry.gauge( + "payment.account.retry.requests", + listOf(Tag.of("accountName", properties.accountName)), + retryRequests + ) { it.toDouble() } - // Вне зависимости от исхода оплаты важно отметить что она была отправлена. - // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. - paymentESService.update(paymentId) { - it.logSubmission(success = true, transactionId, now(), Duration.ofMillis(now() - paymentStartedAt)) + try { + val warmupUri = URI("http://$paymentProviderHostPort/external/accounts?serviceName=$serviceName&token=$token") + val warmupRequest = HttpRequest.newBuilder() + .uri(warmupUri) + .timeout(Duration.ofSeconds(5)) + .GET() + .build() + val futures = (1..10).map { + httpClient.sendAsync(warmupRequest, HttpResponse.BodyHandlers.ofString()) + } + futures.forEach { future -> + try { future.join() } catch (_: Exception) {} + } + logger.info("[$accountName] HTTP client warmup complete") + } catch (e: Exception) { + logger.warn("[$accountName] HTTP warmup failed (non-fatal)", e) } + } - logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - - try { - val request = Request.Builder().run { - url("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount") - post(emptyBody) - }.build() - - client.newCall(request).execute().use { response -> - val body = try { - mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) - } catch (e: Exception) { - logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") - ExternalSysResponse(transactionId.toString(), paymentId.toString(),false, e.message) - } - logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") + override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { + val transactionId = UUID.randomUUID() - // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. - // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + val remainingTimeAtStart = deadline - now() + if (remainingTimeAtStart <= 50L) { + try { paymentESService.update(paymentId) { - it.logProcessing(body.result, now(), transactionId, reason = body.message) + it.logProcessing(false, now(), transactionId, "Deadline already expired") } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record deadline expiry for $paymentId", e) } - } catch (e: Exception) { - when (e) { - is SocketTimeoutException -> { - logger.error("[$accountName] Payment timeout for txId: $transactionId, payment: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = "Request timeout.") - } - } + return + } - else -> { - logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e) + val initialTimeout = remainingTimeAtStart - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = e.message) - } + val baseUri = + "http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount" + val request = HttpRequest.newBuilder() + .uri(URI(baseUri)) + .timeout(Duration.ofMillis(initialTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + val completed = AtomicBoolean(false) + + if (!rateLimiter.tickBlocking(50)) { + throw TooManyRequestsException(deadline) + } + + parallelLimiter.acquire() + sendAttempt(0, request, paymentId, transactionId, deadline, completed, allowHedge = true) + + if (hedgeEnabled) { + val hedgeDelays = listOf(100L, 250L, 450L, 700L, 950L) + for (delay in hedgeDelays) { + try { + retryExecutor.schedule({ + try { + if (completed.get()) return@schedule + val remainingBeforeHedge = deadline - now() + if (remainingBeforeHedge <= 100L) return@schedule + + val hedgeTimeout = remainingBeforeHedge + val hedgeRequest = HttpRequest.newBuilder() + .uri(URI(baseUri)) + .POST(HttpRequest.BodyPublishers.noBody()) + .timeout(Duration.ofMillis(hedgeTimeout)) + .build() + + if (!rateLimiter.tickBlocking(15)) { + return@schedule + } + + parallelLimiter.acquire() + sendAttempt( + 0, + hedgeRequest, + paymentId, + transactionId, + deadline, + completed, + allowHedge = false + ) + } catch (e: Exception) { + logger.warn("[$accountName] Failed to send hedged request for $paymentId", e) + } + }, delay, TimeUnit.MILLISECONDS) + } catch (e: Exception) { + logger.warn("[$accountName] Failed to schedule hedged request", e) } } } @@ -97,6 +182,182 @@ class PaymentExternalSystemAdapterImpl( override fun name() = properties.accountName + fun sendAttempt( + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + deadline: Long, + completed: AtomicBoolean, + allowHedge: Boolean + ) { + inFlightRequests.incrementAndGet() + val timeBeforeCall = now() + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete { response, throwable -> + parallelLimiter.release() + inFlightRequests.decrementAndGet() + if (retryCount > 0) { + retryRequests.decrementAndGet() + } + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + + if (completed.get()) { + return@whenComplete + } + + if (throwable != null) { + val e = throwable.cause + var isRetriable = true + when (throwable.cause) { + is SocketTimeoutException -> { + } + + is InterruptedIOException -> { + } + + is EOFException -> { + } + + is IOException -> { + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + isRetriable = false + } + } + + if (retryCount + 1 >= 3) { + if (completed.compareAndSet(false, true)) { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + } catch (ex: Exception) { + logger.error("[$accountName] Failed to record max attempts for $paymentId", ex) + } + } + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + if (completed.compareAndSet(false, true)) { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + } catch (ex: Exception) { + logger.error("[$accountName] Failed to record deadline expiry for $paymentId", ex) + } + } + } else { + if (isRetriable) { + if (!completed.get()) { + retryRequests.incrementAndGet() + retryCounter.increment() + scheduleRetry( + retryCount, request, paymentId, transactionId, deadline, capped, completed + ) + } + } else { + if (completed.compareAndSet(false, true)) { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Non-retriable exception") + } + } catch (ex: Exception) { + logger.error("[$accountName] Failed to record non-retriable for $paymentId", ex) + } + } + } + } + } + } else { + try { + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + if (completed.compareAndSet(false, true)) { + try { + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + } catch (e: Exception) { + logger.error("[$accountName] Error updating ES for payment $paymentId", e) + } + } + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } + } + } + } + + private fun scheduleRetry( + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, deadline: Long, delay: Long, completed: AtomicBoolean + ) { + retryExecutor.schedule({ + if (completed.get()) return@schedule + val remainingTime = deadline - now() + if (remainingTime < 100L) { + try { + if (completed.compareAndSet(false, true)) { + paymentESService.update(paymentId) { + it.logProcessing( + false, + now(), + transactionId, + "Not enough time for retry" + ) + } + } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record retry failure for $paymentId", e) + } + } else { + val newRequestTimeout = remainingTime.coerceAtLeast(50L) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .POST(HttpRequest.BodyPublishers.noBody()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .build() + if (!rateLimiter.tickBlocking(15)) { +// if (completed.compareAndSet(false, true)) { +// try { +// paymentESService.update(paymentId) { +// it.logProcessing(false, now(), transactionId, "Rate limit prevented retry") +// } +// } catch (e: Exception) { +// logger.error("[$accountName] Failed to record rate-limited retry for $paymentId", e) +// } +// } + return@schedule + } + parallelLimiter.acquire() + sendAttempt( + retryCount + 1, + newRequest, + paymentId, + transactionId, + deadline, + completed, + allowHedge = false + ) + } + }, delay, TimeUnit.MILLISECONDS) + } + + + companion object { + val logger: Logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) + val mapper = ObjectMapper().registerKotlinModule() + } } -public fun now() = System.currentTimeMillis() \ No newline at end of file + +fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt index 255db77dd..abe4ee41c 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentService.kt @@ -37,6 +37,8 @@ data class PaymentAccountProperties( val price: Int, val averageProcessingTime: Duration = Duration.ofSeconds(11), val enabled: Boolean, + val hedgingEnabled: Boolean = false, + val hedgeDelayMillis: Long? = null ) /** diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..80e0a0369 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,29 +2,39 @@ server.address=0.0.0.0 server.port=8081 server.http2.enabled=true spring.main.allow-bean-definition-overriding=true - # MongoDB properties spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=online-shop - # Tiny event sourcing library properties event.sourcing.auto-scan-enabled=true event.sourcing.scan-package=ru.quipy event.sourcing.snapshots-enabled=false event.sourcing.sagas-enabled=false - # Postgres event store properties spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es +server.jetty.max-connections=10000000 spring.datasource.hikari.leak-detection-threshold=2000 - management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true -management.endpoints.web.exposure.include=info,health,prometheus,metrics - +management.metrics.distribution.percentiles-histogram.bombardier.in.queue.latency=true +management.endpoints.web.exposure.include=prometheus,health,info +management.metrics.distribution.percentiles-histogram.http.server.requests=true +management.metrics.distribution.percentiles-histogram.payment.external.system.request.latency=true payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} -payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file +# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-3} +#payment.accounts=${PAYMENT_ACCOUNTS:acc-7} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-18} +payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} + +# For 8 case +#payment.accounts=${PAYMENT_ACCOUNTS:acc-9} +payment.accounts=${PAYMENT_ACCOUNTS:acc-22} +payment.processingTimeMillis=1000 +payment.maximumPoolSize=50_000 + + diff --git a/test-local-run.http b/test-local-run.http index 7be0e4f73..7e8744fd6 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -1,15 +1,16 @@ -### Run test locally +### Test 11 run... POST http://localhost:1234/test/run Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "ratePerSecond": 1, - "testCount": 100, - "processingTimeMillis": 80000 + "ratePerSecond": 100, + "testCount": 20000, + "processingTimeMillis": 1500 } + ### Stop running test to save time and resources # @timeout 120 -POST http://localhost:1234/test/stop/{{serviceName}} \ No newline at end of file +POST http://localhost:4321/test/stop/"{{serviceName}}" diff --git a/test-on-prem-run.http b/test-on-prem-run.http index 584edc0b5..071c60802 100644 --- a/test-on-prem-run.http +++ b/test-on-prem-run.http @@ -6,11 +6,11 @@ Content-Type: application/json { "serviceName": "{{serviceName}}", "token": "{{token}}", - "branch": "main", - "accounts": "acc-3", - "ratePerSecond": 11, - "testCount": 1200, - "processingTimeMillis": 80000, + "branch": "feature/kuro-11-lab", + "accounts": "acc-22", + "ratePerSecond": 100, + "testCount": 20000, + "processingTimeMillis": 1500, "onPremises": true }