diff --git a/.gitignore b/.gitignore index 259113f73..ea59400a3 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,4 @@ build/ !**/src/test/**/build/ ### VS Code ### -.vscode/ +.vscode/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index cc6f2e042..de31b9fe7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,9 +6,8 @@ RUN mvn dependency:go-offline COPY src src RUN mvn package -FROM openjdk:17-jdk-slim +FROM eclipse-temurin:17-alpine-3.22 COPY --from=build /app/target/*.jar /high-load-course.jar CMD ["java", "-jar", "/high-load-course.jar"] - diff --git a/README.md b/README.md index 56d36970f..bc19c4280 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # Template for the HighLoad course + This project is based on [Tiny Event Sourcing library](https://github.com/andrsuh/tiny-event-sourcing) ### Run PostgreSql + This example uses Postgres as an implementation of the Event store. You can see it in `pom.xml`: ``` @@ -12,7 +14,8 @@ This example uses Postgres as an implementation of the Event store. You can see ``` -Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file that we have in the root of the project. +Thus, you have to run Postgres in order to test this example. Postgres service is included in `docker-compose` file +that we have in the root of the project. # More comprehensive information about the course, project, how to run tests is here: @@ -21,19 +24,25 @@ https://andrsuh.notion.site/2595d535059281d8a815c2cb3875c376?source=copy_link https://andrsuh.notion.site/2625d5350592801aaf88c7c95302d10c?source=copy_link ### Run the infrastructure + Set of the services you need to start developing and testing process is following: -- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also serves as a third-party payment system. + +- Bombardier - service that is in charge of emulation the store's clients activity (creates the incoming load). Also + serves as a third-party payment system. - Postgres DBMS - Prometheus + Grafana - metrics collection and visualization services You can run all beforementioned services by the following command: + ``` docker compose -f docker-compose.yml up ``` ### Run the application -To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and re-run it immediately in the IDE. +To make the application run you can start the main class `OnlineShopApplication`. It is not being launched as a docker +contained to simplify and speed up the devevopment process as it is easier for you to refactor the application and +re-run it immediately in the IDE. ### If you want to pull changes from the main repository into your fork diff --git a/grafana/provisioning/dashboards/ServicesStatistic.json b/grafana/provisioning/dashboards/ServicesStatistic.json index 684b97269..e30ee04a3 100644 --- a/grafana/provisioning/dashboards/ServicesStatistic.json +++ b/grafana/provisioning/dashboards/ServicesStatistic.json @@ -1345,7 +1345,6 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "exemplar": true, "expr": "rate(stage_duration_ok_sum{service=~\"$service\", result=\"success\"}[1m]) / rate(stage_duration_ok_count{service=~\"$service\", result=\"success\"}[1m])", "interval": "", "legendFormat": "{{stage}}", @@ -1785,7 +1784,7 @@ "exemplar": true, "expr": "rate(external_sys_duration_count{service=~\"$service\"}[30s])", "hide": false, - "interval": "", + "instant": false, "legendFormat": "{{accountName}} - {{outcome}}", "range": true, "refId": "C" @@ -1923,7 +1922,7 @@ "exemplar": true, "expr": "rate(external_sys_duration_sum{service=~\"$service\"}[1m]) / rate(external_sys_duration_count{service=~\"$service\"}[1m])", "hide": false, - "interval": "", + "instant": false, "legendFormat": "{{accountName}} - {{outcome}}", "range": true, "refId": "C" @@ -2024,37 +2023,13 @@ } } ] - }, - { - "__systemRef": "hideSeriesFrom", - "matcher": { - "id": "byNames", - "options": { - "mode": "exclude", - "names": [ - "payOrder - " - ], - "prefix": "All except:", - "readOnly": true - } - }, - "properties": [ - { - "id": "custom.hideFrom", - "value": { - "legend": false, - "tooltip": false, - "viz": true - } - } - ] } ] }, "gridPos": { "h": 8, "w": 12, - "x": 12, + "x": 0, "y": 108 }, "id": 43, @@ -2065,9 +2040,7 @@ ], "displayMode": "table", "placement": "right", - "showLegend": true, - "sortBy": "Max", - "sortDesc": true + "showLegend": true }, "tooltip": { "mode": "single", @@ -2082,7 +2055,7 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "expr": "rate(http_request_latent_sum{service=~\"$service\"}[15s]) / rate(http_request_latent_count{service=~\"$service\"}[15s])", + "expr": "sum(rate(http_request_latent_sum{service=~\"$service\"}[15s]) / rate(http_request_latent_count{service=~\"$service\"}[15s]))", "hide": false, "instant": false, "legendFormat": "{{method}} - {{result}}", @@ -2390,7 +2363,6 @@ "uid": "PBFA97CFB590B2093" }, "editorMode": "code", - "exemplar": true, "expr": "sum by (name) (executor_active_threads{})", "interval": "", "legendFormat": "{{name}}", @@ -2665,210 +2637,6 @@ }, "overrides": [] }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 38 - }, - "id": 95, - "options": { - "legend": { - "calcs": [ - "max" - ], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "multi", - "sort": "none" - } - }, - "pluginVersion": "11.4.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum by (job) (system_cpu_usage{service=~\"$service\"})", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{job}} - System CPU Usage", - "range": true, - "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum by (job) (process_cpu_usage{service=~\"$service\"})", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "{{job}} -Process CPU Usage", - "range": true, - "refId": "B" - } - ], - "title": "CPU Usage", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 17, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "s" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 38 - }, - "id": 42, - "options": { - "legend": { - "calcs": [], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "11.4.0", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "expr": "sum(rate(jvm_gc_pause_seconds_sum{}[5m])) by (job)", - "instant": false, - "legendFormat": "{{job}}", - "range": true, - "refId": "A" - } - ], - "title": "GC delay", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 17, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "short" - }, - "overrides": [] - }, "gridPos": { "h": 8, "w": 12, @@ -2916,14 +2684,14 @@ "h": 1, "w": 24, "x": 0, - "y": 6 + "y": 7 }, "id": 97, "panels": [ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3007,7 +2775,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "editorMode": "code", "expr": "sum by (job) (jetty_threads_config_max{})", @@ -3025,7 +2793,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3109,7 +2877,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "editorMode": "code", "expr": "sum by (job) (jetty_threads_config_min{service=~\"$service\"})", @@ -3127,7 +2895,7 @@ { "datasource": { "type": "prometheus", - "uid": "PBFA97CFB590B2093" + "uid": "P1C574E4B1E20B3B3" }, "fieldConfig": { "defaults": { @@ -3300,7 +3068,7 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" @@ -3400,13 +3168,12 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" } }, - "links": [], "mappings": [], "thresholds": { "mode": "absolute", @@ -3421,7 +3188,7 @@ } ] }, - "unit": "s" + "unit": "none" }, "overrides": [] }, @@ -3468,8 +3235,706 @@ ], "title": "Jetty Statistics", "type": "row" - } - ], + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 101, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "История изменения количества задач в очереди", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Задачи в очереди", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 16, + "x": 8, + "y": 8 + }, + "id": 103, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "payment_processing_planned_total - payment_processing_started_total", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "История задач в очереди", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Скорость входа в ожидание и выхода из ожидания (requests/sec)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "reqps" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Скорость увеличения очереди" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Скорость числа задач в обработке" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "blue", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 104, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m])", + "legendFormat": "Попало в очередь: {{accountName}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_started_total[1m])", + "legendFormat": "Приняты в обработку (вышли из очереди): {{accountName}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_completed_total[1m])", + "legendFormat": "Обработаны: {{accountName}}", + "refId": "C" + } + ], + "title": "Throughput (в очереди vs в обработке vs обработаны)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Если значение > 0, задачи накапливаются быстрее, чем обрабатываются", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": true, + "axisColorMode": "text", + "axisLabel": "Δ запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "scheme", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "area" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 105, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "expr": "rate(payment_processing_planned_total[1m]) - rate(payment_processing_started_total[1m])", + "legendFormat": "{{accountName}}", + "refId": "A" + } + ], + "title": "Скорость накопления задач (увеличения очереди)", + "type": "timeseries" + } + ], + "title": "Payment Processing - Active Tasks", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 108, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.75, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p75 {{accountName}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.80, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p80 {{accountName}}", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.85, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "legendFormat": "p85 {{accountName}}", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.90, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "hide": false, + "legendFormat": "p90 {{accountName}}", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(payment_external_system_request_latency_seconds_bucket[1m])) by (le, accountName))", + "hide": false, + "legendFormat": "p95 {{accountName}}", + "range": true, + "refId": "E" + } + ], + "title": "Payment Processing Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Количество повторных запросов к платежной системе в секунду", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Запросов/сек", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + }, + { + "color": "red", + "value": 5 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 9 + }, + "id": 109, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "rate(payment_request_retried_total{accountName=~\".*\"}[1m])", + "legendFormat": "Повторные запросы: {{accountName}}", + "range": true, + "refId": "A" + } + ], + "title": "Payment Request Retries Rate (per second)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "description": "Общее количество повторных запросов к платежной системе", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "Количество", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 100 + }, + { + "color": "red", + "value": 500 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 9 + }, + "id": 110, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "payment_request_retried_total{accountName=~\".*\"}", + "legendFormat": "Всего повторов: {{accountName}}", + "range": true, + "refId": "A" + } + ], + "title": "Payment Request Retries Total Count", + "type": "timeseries" + }], "preload": false, "refresh": "5s", "schemaVersion": 40, @@ -3516,4 +3981,5 @@ "uid": "KVr-Vmpnz", "version": 4, "weekStart": "" -} \ No newline at end of file +} + diff --git a/pom.xml b/pom.xml index 5724b2568..e244cb6aa 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,5 @@ - 4.0.0 @@ -15,7 +15,7 @@ OnlineShop Application for resilience and highly-loaded applications course - + 2.2.0 1.9.0 4.12.0 @@ -25,7 +25,7 @@ 3.1.8 - + io.github.resilience4j resilience4j-ratelimiter @@ -67,15 +67,15 @@ ${jetty.version} - - - - + + + + - - - - + + + + com.fasterxml.jackson.module diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index 936c7edd9..ce045546f 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -6,16 +6,16 @@ scrape_configs: - job_name: 'bombardier-docker-network-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['bombardier:1234'] + - targets: [ 'bombardier:1234' ] - job_name: 'bombardier-host-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:1234'] + - targets: [ 'host.docker.internal:1234' ] - job_name: 'online-store-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:8081'] + - targets: [ 'host.docker.internal:8081' ] - job_name: 'online-shop-job' metrics_path: '/actuator/prometheus' static_configs: - - targets: ['host.docker.internal:18081'] \ No newline at end of file + - targets: [ 'host.docker.internal:18081' ] \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt index 9ac22e094..f311c6fdf 100644 --- a/src/main/kotlin/ru/quipy/OnlineShopApplication.kt +++ b/src/main/kotlin/ru/quipy/OnlineShopApplication.kt @@ -1,10 +1,12 @@ package ru.quipy +import kotlinx.coroutines.asCoroutineDispatcher import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import ru.quipy.common.utils.NamedThreadFactory +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -13,7 +15,7 @@ class OnlineShopApplication { val log: Logger = LoggerFactory.getLogger(OnlineShopApplication::class.java) companion object { - val appExecutor = Executors.newFixedThreadPool(64, NamedThreadFactory("main-app-executor")) + val appExecutor: ExecutorService = Executors.newFixedThreadPool(64, NamedThreadFactory("main-app-executor")) } } diff --git a/src/main/kotlin/ru/quipy/apigateway/APIController.kt b/src/main/kotlin/ru/quipy/apigateway/APIController.kt index 6f23fa18d..251ee553e 100644 --- a/src/main/kotlin/ru/quipy/apigateway/APIController.kt +++ b/src/main/kotlin/ru/quipy/apigateway/APIController.kt @@ -2,23 +2,27 @@ package ru.quipy.apigateway import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.web.bind.annotation.* +import ru.quipy.common.utils.LeakingBucketRateLimiter +import ru.quipy.common.utils.RateLimiter +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.orders.repository.OrderRepository import ru.quipy.payments.logic.OrderPayer +import java.time.Duration import java.util.* +import kotlin.random.Random @RestController -class APIController { +class APIController( + private val orderRepository: OrderRepository, + private val orderPayer: OrderPayer, + @field:Qualifier("parallelLimiter") + private val rateLimiter: RateLimiter = LeakingBucketRateLimiter(1100, Duration.ofSeconds(1), 3300) +) { val logger: Logger = LoggerFactory.getLogger(APIController::class.java) - @Autowired - private lateinit var orderRepository: OrderRepository - - @Autowired - private lateinit var orderPayer: OrderPayer - @PostMapping("/users") fun createUser(@RequestBody req: CreateUserRequest): User { return User(UUID.randomUUID(), req.name) @@ -56,13 +60,16 @@ class APIController { @PostMapping("/orders/{orderId}/payment") fun payOrder(@PathVariable orderId: UUID, @RequestParam deadline: Long): PaymentSubmissionDto { + if (!rateLimiter.tick()) { + val retryAfterMs = 10L + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) + } val paymentId = UUID.randomUUID() val order = orderRepository.findById(orderId)?.let { orderRepository.save(it.copy(status = OrderStatus.PAYMENT_IN_PROGRESS)) it } ?: throw IllegalArgumentException("No such order $orderId") - val createdAt = orderPayer.processPayment(orderId, order.price, paymentId, deadline) return PaymentSubmissionDto(createdAt, paymentId) } diff --git a/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt new file mode 100644 index 000000000..7d2c38fe8 --- /dev/null +++ b/src/main/kotlin/ru/quipy/apigateway/GlobalExceptionHandler.kt @@ -0,0 +1,37 @@ +package ru.quipy.apigateway + +import org.slf4j.LoggerFactory +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.ExceptionHandler +import org.springframework.web.bind.annotation.RestControllerAdvice +import ru.quipy.exceptions.TooLongRequestException +import ru.quipy.exceptions.TooManyRequestsException +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +@RestControllerAdvice +class GlobalExceptionHandler( +) { + companion object { + val logger = LoggerFactory.getLogger(GlobalExceptionHandler::class.java) + private var currentRetryAfterSeconds = 1 + private var exp_base = 2; + private const val RESET_WINDOW_MS = 30000L + } + + private val rejectedRequestsCount = AtomicInteger(0) + private val lastRejectionTime = AtomicLong(0) + @ExceptionHandler(TooLongRequestException::class) + fun handleTooManyRequests(exception: TooLongRequestException): ResponseEntity { + return ResponseEntity.status(200).body("your request very long, i am so sorry") + } + + @ExceptionHandler(TooManyRequestsException::class) + fun handleTooManyRequestsRetriable(exception: TooManyRequestsException): ResponseEntity { + return ResponseEntity + .status(HttpStatus.TOO_MANY_REQUESTS) + .header("Retry-After", "10") + .body("too many requests") + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt index 6ff3092ab..4f3f64050 100644 --- a/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt +++ b/src/main/kotlin/ru/quipy/common/utils/SlidingWindowRateLimiter.kt @@ -10,8 +10,6 @@ import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock class SlidingWindowRateLimiter( private val rate: Long, @@ -33,12 +31,29 @@ class SlidingWindowRateLimiter( } } - fun tickBlocking() { + fun tickBlockingAsync() { while (!tick()) { Thread.sleep(10) } } + fun tickBlockingWithTimeout(timeout: Long): Boolean { + val timeStarted = System.currentTimeMillis() + while (System.currentTimeMillis() - timeStarted < timeout && !tick()) { + Thread.sleep(2) + } + return System.currentTimeMillis() - timeStarted < timeout + } + + suspend fun tickBlockingAsync(timeout: Long): Boolean { + val timeStarted = System.currentTimeMillis() + while (System.currentTimeMillis() - timeStarted < timeout && !tick()) { + delay(2) + } + return System.currentTimeMillis() - timeStarted < timeout + } + + data class Measure( val value: Long, val timestamp: Long @@ -64,6 +79,7 @@ class SlidingWindowRateLimiter( queue.take() } }.invokeOnCompletion { th -> if (th != null) logger.error("Rate limiter release job completed", th) } + companion object { private val logger: Logger = LoggerFactory.getLogger(SlidingWindowRateLimiter::class.java) } diff --git a/src/main/kotlin/ru/quipy/exceptions/RateLimitWasBreached.kt b/src/main/kotlin/ru/quipy/exceptions/RateLimitWasBreached.kt new file mode 100644 index 000000000..8469ea9a3 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/RateLimitWasBreached.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class RateLimitWasBreached : RuntimeException() { +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt b/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt new file mode 100644 index 000000000..c50d9bc48 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooLongRequestEcexption.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class TooLongRequestException() : RuntimeException() { +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt new file mode 100644 index 000000000..3f063849a --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsException.kt @@ -0,0 +1,3 @@ +package ru.quipy.exceptions + +class TooManyRequestsException(val retryAfterMs: Long) : RuntimeException() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt new file mode 100644 index 000000000..4c95b4b30 --- /dev/null +++ b/src/main/kotlin/ru/quipy/exceptions/TooManyRequestsRetriableException.kt @@ -0,0 +1,4 @@ +package ru.quipy.exceptions + +class TooManyRequestsRetriableException(val deadline: Long) : RuntimeException(){ +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt index 767f23b3e..0b7560804 100644 --- a/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt +++ b/src/main/kotlin/ru/quipy/orders/subscribers/PaymentSubscriber.kt @@ -1,5 +1,6 @@ package ru.quipy.orders.subscribers +import io.micrometer.core.instrument.Metrics import jakarta.annotation.PostConstruct import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -19,6 +20,7 @@ class PaymentSubscriber { val logger: Logger = LoggerFactory.getLogger(PaymentSubscriber::class.java) + val paymentSucceededCounter = Metrics.counter("succeeded.payments", "account", "acc-7") @Autowired lateinit var subscriptionsManager: AggregateSubscriptionsManager @@ -34,7 +36,7 @@ class PaymentSubscriber { retryConf = RetryConf(1, RetryFailedStrategy.SKIP_EVENT) ) { `when`(PaymentProcessedEvent::class) { event -> - appExecutor.submit { + appExecutor.run { logger.trace( "Payment results. OrderId ${event.orderId}, succeeded: ${event.success}, txId: ${event.transactionId}, reason: ${event.reason}, duration: ${ Duration.ofMillis( @@ -42,6 +44,7 @@ class PaymentSubscriber { ).toSeconds() }, spent in queue: ${event.spentInQueueDuration.toSeconds()}" ) + paymentSucceededCounter.increment() } } } diff --git a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt index eceb90cff..17814f09f 100644 --- a/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt +++ b/src/main/kotlin/ru/quipy/payments/config/PaymentAccountsConfig.kt @@ -3,17 +3,24 @@ package ru.quipy.payments.config import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import io.micrometer.core.instrument.MeterRegistry import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService import ru.quipy.payments.api.PaymentAggregate -import ru.quipy.payments.logic.* +import ru.quipy.payments.logic.PaymentAccountProperties +import ru.quipy.payments.logic.PaymentAggregateState +import ru.quipy.payments.logic.PaymentExternalSystemAdapter +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl import java.net.URI import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse +import java.time.Duration import java.util.* +import java.util.concurrent.Semaphore @Configuration @@ -36,7 +43,18 @@ class PaymentAccountsConfig { lateinit var allowedAccounts: List @Bean - fun accountAdapters(paymentService: EventSourcingService): List { + fun rateLimit(): SlidingWindowRateLimiter { + return SlidingWindowRateLimiter( + rate = 5000L, + window = Duration.ofMillis(1000), + ) + } + @Bean + fun accountAdapters( + paymentService: EventSourcingService, + meterRegistry: MeterRegistry, + rateLimiter: SlidingWindowRateLimiter + ): List { val request = HttpRequest.newBuilder() .uri(URI("http://${paymentProviderHostPort}/external/accounts?serviceName=$serviceName&token=$token")) .GET() @@ -57,7 +75,10 @@ class PaymentAccountsConfig { it, paymentService, paymentProviderHostPort, - token + token, + meterRegistry, + Semaphore(it.parallelRequests), + rateLimiter ) } } diff --git a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt index a5909b85b..ac1e4bd1b 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/OrderPayer.kt @@ -1,44 +1,54 @@ package ru.quipy.payments.logic +import io.micrometer.core.instrument.MeterRegistry import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.CallerBlockingRejectedExecutionHandler import ru.quipy.common.utils.NamedThreadFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate import java.util.* import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import kotlin.random.Random @Service -class OrderPayer { +class OrderPayer(val rateLimiter : SlidingWindowRateLimiter, meterRegistry: MeterRegistry) { companion object { val logger: Logger = LoggerFactory.getLogger(OrderPayer::class.java) } + private val plannedRequests = meterRegistry.counter("payment.processing.planned", "accountName", "acc-13") + + private val paymentExecutor = ThreadPoolExecutor( + 4000, + 4000, + 100L, + TimeUnit.MILLISECONDS, + LinkedBlockingQueue(200_000), + NamedThreadFactory("payment-submission-executor") + ) + @Autowired private lateinit var paymentESService: EventSourcingService @Autowired private lateinit var paymentService: PaymentService - private val paymentExecutor = ThreadPoolExecutor( - 16, - 16, - 0L, - TimeUnit.MILLISECONDS, - LinkedBlockingQueue(8_000), - NamedThreadFactory("payment-submission-executor"), - CallerBlockingRejectedExecutionHandler() - ) fun processPayment(orderId: UUID, amount: Int, paymentId: UUID, deadline: Long): Long { val createdAt = System.currentTimeMillis() + if (!rateLimiter.tick()) { + val retryAfterMs = 10L + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) + } paymentExecutor.submit { + plannedRequests.increment() val createdEvent = paymentESService.create { it.create( paymentId, @@ -46,7 +56,8 @@ class OrderPayer { amount ) } - logger.trace("Payment ${createdEvent.paymentId} for order $orderId created.") + + logger.trace("Payment {} for order {} created.", createdEvent.paymentId, orderId) paymentService.submitPaymentRequest(paymentId, amount, createdAt, deadline) } diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt new file mode 100644 index 000000000..87e6f4103 --- /dev/null +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentCallback.kt @@ -0,0 +1,102 @@ +package ru.quipy.payments.logic + +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.Counter +import kotlinx.coroutines.sync.Semaphore +import okhttp3.Call +import okhttp3.Callback +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.Response +import ru.quipy.core.EventSourcingService +import ru.quipy.payments.api.PaymentAggregate +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.logger +import ru.quipy.payments.logic.PaymentExternalSystemAdapterImpl.Companion.mapper +import java.io.InterruptedIOException +import java.net.SocketTimeoutException +import java.util.UUID +import java.util.concurrent.TimeUnit +import kotlin.math.pow + + +class PaymentCallback(val startedRequestsCounter: Counter, val semaphore: Semaphore, val accountName: String, val retryCount: Int, val paymentId: UUID, val transactionId: UUID, val paymentESService: EventSourcingService, val client: OkHttpClient, val request: Request, val timer: Timer, val deadline: Long, val timeBeforeCall: Long) : Callback { + override fun onFailure(call: Call, e: java.io.IOException) { + + logger.debug("fail in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", paymentId, retryCount, deadline, now(), e) + when (e) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "socket timeout") + } + } + + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + + // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. + // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "interrupted IO") + } + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "io exception") + } + } + } + + if (retryCount + 1 >= 3) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + startedRequestsCounter.increment() + semaphore.release() + return + } + + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + kotlin.random.Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + startedRequestsCounter.increment() + semaphore.release() + return + } + val nCall = client.newCall(request) + nCall.enqueue(PaymentCallback(startedRequestsCounter, semaphore, accountName, retryCount + 1, paymentId, transactionId, paymentESService, client, request, timer, deadline, now())) + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + + override fun onResponse(call: Call, response: Response) { + startedRequestsCounter.increment() + try { + logger.warn("Free space in semaphore: {}", semaphore.availablePermits) + logger.info( + "success in callback for payment: {}, retry count: {}, deadline: {}, in time: {}", + paymentId, + retryCount, + deadline, + now() + ) + val rawBody = response.body?.string() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + } finally { + semaphore.release() + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt index 5cb12106a..89943d358 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentExternalServiceImpl.kt @@ -2,15 +2,26 @@ package ru.quipy.payments.logic import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.registerKotlinModule -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.RequestBody +import io.micrometer.core.instrument.MeterRegistry +import okio.EOFException import org.slf4j.LoggerFactory +import ru.quipy.common.utils.SlidingWindowRateLimiter import ru.quipy.core.EventSourcingService +import ru.quipy.exceptions.TooManyRequestsException import ru.quipy.payments.api.PaymentAggregate +import java.io.InterruptedIOException import java.net.SocketTimeoutException +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse import java.time.Duration import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit +import kotlin.math.pow +import kotlin.random.Random // Advice: always treat time as a Duration @@ -19,76 +30,89 @@ class PaymentExternalSystemAdapterImpl( private val paymentESService: EventSourcingService, private val paymentProviderHostPort: String, private val token: String, + meterRegistry: MeterRegistry, + private val parallelLimiter: Semaphore, + private val rateLimiter: SlidingWindowRateLimiter ) : PaymentExternalSystemAdapter { - companion object { val logger = LoggerFactory.getLogger(PaymentExternalSystemAdapter::class.java) - - val emptyBody = RequestBody.create(null, ByteArray(0)) val mapper = ObjectMapper().registerKotlinModule() } + private val time_95_percentile = 1_000 + + + + // 2025-11-20T20:30:35.780+03:00 INFO 56644 --- [alhost:1234/...] ru.quipy.core.EventSourcingService : Optimistic lock exception. Failed to save event records id: [7dca693e-e811-4b7f-8bce-23e13d952c04-4] + private val startedRequests = + meterRegistry.counter("payment.processing.started", "accountName", properties.accountName) + private val requestsRetried = + meterRegistry.counter("payment.request.retried", "accountName", properties.accountName) + private val timer = + meterRegistry.timer("payment.external.system.request.latency", "accountName", properties.accountName) private val serviceName = properties.serviceName private val accountName = properties.accountName private val requestAverageProcessingTime = properties.averageProcessingTime private val rateLimitPerSec = properties.rateLimitPerSec private val parallelRequests = properties.parallelRequests - private val client = OkHttpClient.Builder().build() + + + private val httpClient = HttpClient + .newBuilder() + .executor(Executors.newFixedThreadPool(parallelRequests)) + .version(HttpClient.Version.HTTP_2) + .build() override fun performPaymentAsync(paymentId: UUID, amount: Int, paymentStartedAt: Long, deadline: Long) { logger.warn("[$accountName] Submitting payment request for payment $paymentId") - val transactionId = UUID.randomUUID() // Вне зависимости от исхода оплаты важно отметить что она была отправлена. // Это требуется сделать ВО ВСЕХ СЛУЧАЯХ, поскольку эта информация используется сервисом тестирования. - paymentESService.update(paymentId) { - it.logSubmission(success = true, transactionId, now(), Duration.ofMillis(now() - paymentStartedAt)) - } - - logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - - try { - val request = Request.Builder().run { - url("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount") - post(emptyBody) - }.build() - - client.newCall(request).execute().use { response -> - val body = try { - mapper.readValue(response.body?.string(), ExternalSysResponse::class.java) - } catch (e: Exception) { - logger.error("[$accountName] [ERROR] Payment processed for txId: $transactionId, payment: $paymentId, result code: ${response.code}, reason: ${response.body?.string()}") - ExternalSysResponse(transactionId.toString(), paymentId.toString(),false, e.message) - } - - logger.warn("[$accountName] Payment processed for txId: $transactionId, payment: $paymentId, succeeded: ${body.result}, message: ${body.message}") - - // Здесь мы обновляем состояние оплаты в зависимости от результата в базе данных оплат. - // Это требуется сделать ВО ВСЕХ ИСХОДАХ (успешная оплата / неуспешная / ошибочная ситуация) + try { paymentESService.update(paymentId) { - it.logProcessing(body.result, now(), transactionId, reason = body.message) + it.logSubmission( + success = true, + transactionId, + now(), + Duration.ofMillis(now() - paymentStartedAt) + ) } + logger.info("[$accountName] Log submission recorded for $paymentId") + } catch (e: Exception) { + logger.error("[$accountName] Failed to record log submission for $paymentId", e) } - } catch (e: Exception) { - when (e) { - is SocketTimeoutException -> { - logger.error("[$accountName] Payment timeout for txId: $transactionId, payment: $paymentId", e) - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = "Request timeout.") - } - } - else -> { - logger.error("[$accountName] Payment failed for txId: $transactionId, payment: $paymentId", e) + logger.info("[$accountName] Submit: $paymentId , txId: $transactionId") - paymentESService.update(paymentId) { - it.logProcessing(false, now(), transactionId, reason = e.message) - } - } + val remaining = deadline - now() + val minRequiredTime = requestAverageProcessingTime.toMillis() + if (remaining < minRequiredTime) { + logger.warn("[$accountName] Not enough time for payment $paymentId: ${remaining}ms remaining, need ${minRequiredTime}ms") + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "not enough time") } + val retryAfterMs = minRequiredTime - remaining + Random.nextLong(100) + throw TooManyRequestsException(retryAfterMs) + } + + // Таймаут = оставшееся время до дедлайна, но не больше time_95_percentile + val requestTimeout = minOf(remaining, time_95_percentile.toLong()).coerceAtLeast(100) + + if (!rateLimiter.tick()) { + val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) + logger.warn("[$accountName] Rate limit exceeded for payment $paymentId, retry after ${retryAfterMs}ms") + throw TooManyRequestsException(retryAfterMs) } + + val request = HttpRequest.newBuilder() + .uri(URI("http://$paymentProviderHostPort/external/process?serviceName=$serviceName&token=$token&accountName=$accountName&transactionId=$transactionId&paymentId=$paymentId&amount=$amount")) + .timeout(Duration.ofMillis(requestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + + completeAction(0, request, paymentId, transactionId, deadline) } override fun price() = properties.price @@ -97,6 +121,133 @@ class PaymentExternalSystemAdapterImpl( override fun name() = properties.accountName + fun completeAction( + retryCount: Long, + request: HttpRequest, + paymentId: UUID, + transactionId: UUID, + deadline: Long + ) { + val timeBeforeCall = now() + + if (!parallelLimiter.tryAcquire(deadline - now(), TimeUnit.MILLISECONDS)) { + val retryAfterMs = (requestAverageProcessingTime.toMillis() / parallelRequests * 10).coerceIn( + 10, 100 + ) + Random.nextLong(10) + + throw TooManyRequestsException(retryAfterMs) + } + startedRequests.increment() + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete { response, throwable -> + if (throwable != null) { + val e = throwable.cause + var isRetriable = true + when (throwable.cause) { + is SocketTimeoutException -> { + logger.warn("[$accountName] attempt ${retryCount + 1} timeout: $paymentId", e) + } + + is InterruptedIOException -> { + logger.warn("[$accountName] interrupted: $paymentId", e) + } + + is EOFException -> { + logger.warn("[$accountName] eof exception in: $paymentId", e) + } + + else -> { + logger.warn("[$accountName] io error: $paymentId", e) + isRetriable = false + } + } + + if (retryCount + 1 >= 3) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Max attempts reached") + } + parallelLimiter.release() + } else { + val backoff = ((2.0.pow(retryCount.toDouble()) * 25).toLong() + Random.nextLong(10)) + val capped = backoff.coerceAtMost(deadline - now() - 5) + if (capped <= 0) { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Deadline expired") + } + } else { + if (isRetriable) { + parallelLimiter.release() + scheduleRetry( + retryCount, request, paymentId, transactionId, deadline, capped + ) + } else { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Non-retriable exception") + } + parallelLimiter.release() + } + } + } + } else { + try { + logger.warn("Free space in semaphore: {}", parallelLimiter.availablePermits()) + logger.info( + "success in callback for payment: {}, retry count: {}, in time: {}", + paymentId, + retryCount, + now() + ) + val rawBody = response.body() + val parsed = try { + mapper.readValue(rawBody, ExternalSysResponse::class.java) + } catch (ex: Exception) { + ExternalSysResponse(transactionId.toString(), paymentId.toString(), false, ex.message) + } + + paymentESService.update(paymentId) { + it.logProcessing(parsed.result, now(), transactionId, parsed.message) + } + timer.record(now() - timeBeforeCall, TimeUnit.MILLISECONDS) + } catch (e: Exception) { + logger.error("[$accountName] Error processing payment $paymentId", e) + } finally { + parallelLimiter.release() + } + } + } + } + + private fun scheduleRetry( + retryCount: Long, request: HttpRequest, paymentId: UUID, + transactionId: UUID, deadline: Long, delay: Long + ) { + requestsRetried.increment() + logger.info("Completing retry. All retry count - {}", requestsRetried.count()) + parallelLimiter.release() + Thread.sleep(delay) + val remainingTime = deadline - now() + if (remainingTime < requestAverageProcessingTime.toMillis()) { + try { + paymentESService.update(paymentId) { + it.logProcessing(false, now(), transactionId, "Not enough time for retry") + } + } catch (e: Exception) { + logger.error("[$accountName] Failed to record retry failure for $paymentId", e) + } + } else { + val newRequestTimeout = remainingTime.coerceIn(100, requestAverageProcessingTime.toMillis() * 2) + val newRequest = HttpRequest.newBuilder() + .uri(request.uri()) + .timeout(Duration.ofMillis(newRequestTimeout)) + .POST(HttpRequest.BodyPublishers.noBody()) + .build() + if (!rateLimiter.tick()) { + val retryAfterMs = (1000L / rateLimitPerSec * 10).coerceIn(10, 100) + Random.nextLong(10) + throw TooManyRequestsException(retryAfterMs) + } + completeAction(retryCount + 1, newRequest, paymentId, transactionId, deadline) + } + } } -public fun now() = System.currentTimeMillis() \ No newline at end of file +fun now() = System.currentTimeMillis() \ No newline at end of file diff --git a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt index 1c24e5a72..7aad9b847 100644 --- a/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt +++ b/src/main/kotlin/ru/quipy/payments/logic/PaymentServiceImpl.kt @@ -1,16 +1,8 @@ package ru.quipy.payments.logic import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import ru.quipy.common.utils.NamedThreadFactory -import ru.quipy.core.EventSourcingService -import ru.quipy.payments.api.PaymentAggregate -import java.time.Duration import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock @Service diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 33d51a58b..7c2e843a0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,29 +2,42 @@ server.address=0.0.0.0 server.port=8081 server.http2.enabled=true spring.main.allow-bean-definition-overriding=true - # MongoDB properties spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=online-shop - # Tiny event sourcing library properties event.sourcing.auto-scan-enabled=true event.sourcing.scan-package=ru.quipy event.sourcing.snapshots-enabled=false event.sourcing.sagas-enabled=false - # Postgres event store properties spring.datasource.hikari.jdbc-url=jdbc:postgresql://${POSTGRES_ADDRESS:localhost}:${POSTGRES_PORT:65432}/postgres spring.datasource.hikari.username=tiny_es spring.datasource.hikari.password=tiny_es +logger.level.ru.quipy.payments.logic.PaymentCallback=DEBUG spring.datasource.hikari.leak-detection-threshold=2000 - management.metrics.web.server.request.autotime.percentiles=0.95 management.metrics.export.prometheus.enabled=true -management.endpoints.web.exposure.include=info,health,prometheus,metrics - +management.endpoints.web.exposure.include=prometheus,health,info +management.metrics.distribution.percentiles-histogram.http.server.requests=true +management.metrics.distribution.percentiles-histogram.payment.external.system.request.latency=true payment.service-name=${PAYMENT_SERVICE_NAME} payment.token=${PAYMENT_TOKEN} -payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} -payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} \ No newline at end of file +# payment.accounts=${PAYMENT_ACCOUNTS:acc-12,acc-20} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-3} +#payment.accounts=${PAYMENT_ACCOUNTS:acc-12} +payment.accounts=${PAYMENT_ACCOUNTS:acc-13} +# payment.accounts=${PAYMENT_ACCOUNTS:acc-18} +payment.hostPort=${PAYMENT_HOST:localhost}:${PAYMENT_PORT:1234} +spring.datasource.hikari.maximum-pool-size=20_000 +spring.datasource.hikari.minimum-idle=50 +spring.datasource.hikari.connection-timeout=5000 +server.jetty.threads.min=100 +server.jetty.threads.max=20000 +server.jetty.threads.max-queue-capacity=100000 +server.jetty.threads.acceptors=16 +server.jetty.threads.selectors=32 +server.jetty.connection-idle-timeout=60000 + + diff --git a/test-local-run.http b/test-local-run.http index 7be0e4f73..dc8dbeedf 100644 --- a/test-local-run.http +++ b/test-local-run.http @@ -12,4 +12,4 @@ Content-Type: application/json ### Stop running test to save time and resources # @timeout 120 -POST http://localhost:1234/test/stop/{{serviceName}} \ No newline at end of file +POST http://localhost:4321/test/stop/"{{serviceName}}" \ No newline at end of file diff --git a/test-on-prem-run.http b/test-on-prem-run.http index 584edc0b5..dd5765987 100644 --- a/test-on-prem-run.http +++ b/test-on-prem-run.http @@ -7,13 +7,13 @@ Content-Type: application/json "serviceName": "{{serviceName}}", "token": "{{token}}", "branch": "main", - "accounts": "acc-3", - "ratePerSecond": 11, - "testCount": 1200, + "accounts": "acc-12,acc-20", + "ratePerSecond": 2, + "testCount": 10, "processingTimeMillis": 80000, "onPremises": true } ### Stop running test to save credits # @timeout 120 -POST http://77.234.215.138:31234/test/stop/{{serviceName}} \ No newline at end of file +POST http://77.234.215.138:31234/test/stop/"{{serviceName}}" \ No newline at end of file