Skip to content
Open
100 changes: 100 additions & 0 deletions orchestrator/internal/service/github/fork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package github

import (
"context"
"fmt"
"log"
"net/http"
"os/exec"
"strings"
"time"
)

// ForkResponse — минимальные поля форка из GitHub API.
type ForkResponse struct {
FullName string `json:"full_name"`
CloneURL string `json:"clone_url"`
HTMLURL string `json:"html_url"`
DefaultBranch string `json:"default_branch"`
Owner struct {
Login string `json:"login"`
} `json:"owner"`
}

// IsPermissionError — распознаёт ошибку отсутствия прав на запись в репозиторий
// (push возвращает HTTP 403 / "Permission to ... denied"). Используется, чтобы
// переключиться на создание PR из форка (issue #85).
func IsPermissionError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "403") ||
strings.Contains(msg, "permission to") ||
strings.Contains(msg, "permission denied") ||
strings.Contains(msg, "denied to") ||
strings.Contains(msg, "write access to repository not granted")
}

// ForkRepository — создаёт форк upstream-репозитория в аккаунт бота.
// GitHub отвечает 202 Accepted; форк создаётся асинхронно (см. WaitForRepository).
func (c *Client) ForkRepository(ctx context.Context, owner, repo string) (*ForkResponse, error) {
var fork ForkResponse
path := fmt.Sprintf("/repos/%s/%s/forks", owner, repo)
if err := c.doJSON(ctx, "POST", path, struct{}{}, &fork, http.StatusAccepted); err != nil {
return nil, fmt.Errorf("failed to fork repository: %w", err)
}
log.Printf("Forked %s/%s -> %s", owner, repo, fork.FullName)
return &fork, nil
}

// WaitForRepository — ждёт, пока репозиторий (форк) станет доступен через API.
// Форк создаётся асинхронно, поэтому push сразу после fork может упасть.
func (c *Client) WaitForRepository(ctx context.Context, owner, repo string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for attempt := 0; ; attempt++ {
if _, err := c.GetRepository(ctx, owner, repo); err == nil {
return nil
}
if time.Now().After(deadline) {
return fmt.Errorf("repository %s/%s not ready after %s", owner, repo, timeout)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}
}
}

// PushBranchToRemote — пушит ветку в произвольный remote URL (например, форк),
// используя временный remote. Токен встраивается в URL для авторизации.
func (c *Client) PushBranchToRemote(ctx context.Context, dir, branch, remoteURL, remoteName string) error {
if branch == "" {
return fmt.Errorf("branch is required")
}
if remoteName == "" {
remoteName = "octra-fork"
}
// Снимаем старый remote с тем же именем, если остался от прошлого прогона.
rm := exec.CommandContext(ctx, "git", "remote", "remove", remoteName)
rm.Dir = dir
rm.Run() // ошибку игнорируем — remote мог не существовать

auth := c.authenticatedGitURL(remoteURL)
add := exec.CommandContext(ctx, "git", "remote", "add", remoteName, auth)
add.Dir = dir
if out, err := add.CombinedOutput(); err != nil {
return fmt.Errorf("git remote add failed: %w - %s", err, c.sanitize(string(out)))
}

push := exec.CommandContext(ctx, "git", "push", "-u", remoteName, branch, "--force")
push.Dir = dir
out, err := push.CombinedOutput()
if err != nil {
log.Printf("git push to fork output: %s", c.sanitize(string(out)))
return fmt.Errorf("git push to fork failed: %w - %s", err, c.sanitize(string(out)))
}
log.Printf("git push to fork output: %s", c.sanitize(string(out)))
return nil
}
27 changes: 27 additions & 0 deletions orchestrator/internal/service/github/fork_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package github

import (
"errors"
"testing"
)

func TestIsPermissionError(t *testing.T) {
cases := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"403 status", errors.New("git push branch failed: exit status 128 - The requested URL returned error: 403"), true},
{"permission denied phrase", errors.New("remote: Permission to Payel-git-ol/Octra.git denied to Octra-git."), true},
{"write access not granted", errors.New("remote: Write access to repository not granted."), true},
{"unrelated error", errors.New("fatal: could not resolve host"), false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := IsPermissionError(tc.err); got != tc.want {
t.Fatalf("IsPermissionError(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}
24 changes: 24 additions & 0 deletions orchestrator/internal/service/groupchat/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type Orchestrator struct {
terminateFn TerminationCondition
mu sync.RWMutex
eventCh chan Event

errsMu sync.Mutex
errs []error
}

func NewOrchestrator(maxRounds int) *Orchestrator {
Expand All @@ -29,6 +32,25 @@ func NewOrchestrator(maxRounds int) *Orchestrator {
}
}

// recordError — сохраняет ошибку агента, чтобы вызывающий код мог понять,
// что воркер реально провалился (раньше ошибки только эмитились в events и
// молча терялись, из-за чего менеджер репортил success при пустом результате — issue #85).
func (o *Orchestrator) recordError(err error) {
if err == nil {
return
}
o.errsMu.Lock()
o.errs = append(o.errs, err)
o.errsMu.Unlock()
}

// Errors — возвращает ошибки агентов, накопленные за время прогона.
func (o *Orchestrator) Errors() []error {
o.errsMu.Lock()
defer o.errsMu.Unlock()
return append([]error(nil), o.errs...)
}

func (o *Orchestrator) SetSelector(s SpeakerSelector) {
o.selector = s
}
Expand Down Expand Up @@ -197,6 +219,7 @@ func (o *Orchestrator) runSingle(ctx context.Context, agentID string, round int)
conv := o.SnapshotConversation()
messages, err := agent.Process(ctx, conv)
if err != nil {
o.recordError(fmt.Errorf("agent %s: %w", agentID, err))
o.setAgentStatus(agentID, AgentError)
o.emit(Event{Type: EventError, AgentID: agentID, Error: err.Error(), Round: round})
return
Expand Down Expand Up @@ -237,6 +260,7 @@ func (o *Orchestrator) runConcurrentRound(ctx context.Context, round int) {

messages, err := a.Process(ctx, conv)
if err != nil {
o.recordError(fmt.Errorf("agent %s: %w", agentID, err))
o.setAgentStatus(agentID, AgentError)
o.emit(Event{Type: EventError, AgentID: agentID, Error: err.Error(), Round: round})
return
Expand Down
46 changes: 42 additions & 4 deletions orchestrator/internal/service/rules/boss/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"strings"
"time"

"orchestrator/internal/service/git"
gh "orchestrator/internal/service/github"
Expand Down Expand Up @@ -95,16 +96,28 @@ func (s *Service) createPullRequest(ctx context.Context, task *models.Task, proj
log.Printf("Failed to checkout pull request branch: %v", err)
return "", fmt.Sprintf("Failed to prepare pull request branch %q: %v", target.BranchName, err)
}
headRef := target.BranchName
if err := s.githubClient.PushBranch(ctx, projectPath, target.BranchName); err != nil {
log.Printf("Failed to push pull request branch: %v", err)
return "", fmt.Sprintf("Failed to push pull request branch %q: %v", target.BranchName, err)
// 403 → нет прав на запись в upstream. Раньше пайплайн просто падал и PR
// не создавался (issue #85). Теперь форкаем репозиторий и пушим в форк.
if gh.IsPermissionError(err) {
log.Printf("Push denied (no write access to %s/%s) — falling back to fork-based pull request", target.Owner, target.Repo)
forkHead, forkErr := s.pushBranchToFork(ctx, projectPath, target)
if forkHead == "" {
return "", forkErr
}
headRef = forkHead
} else {
log.Printf("Failed to push pull request branch: %v", err)
return "", fmt.Sprintf("Failed to push pull request branch %q: %v", target.BranchName, err)
}
}
prTitle := pullRequestTitle(task, target)
pr, err := s.githubClient.CreatePullRequest(ctx, gh.PullRequestRequest{
Owner: target.Owner,
Repo: target.Repo,
Title: prTitle,
Head: target.BranchName,
Head: headRef,
Base: firstNonEmpty(target.BaseBranch, "main"),
Body: pullRequestBody(task, target),
})
Expand Down Expand Up @@ -132,6 +145,32 @@ func (s *Service) createPullRequest(ctx context.Context, task *models.Task, proj
return pr.HTMLURL, ""
}

// pushBranchToFork — форкает upstream-репозиторий и пушит ветку в форк.
// Возвращает head-ссылку вида "forkOwner:branch" для кросс-репозиторного PR
// и пустую причину при успехе, либо ("", reason) при ошибке. Вызывается, когда
// у бота нет прав на запись в upstream.
func (s *Service) pushBranchToFork(ctx context.Context, projectPath string, target *gh.IssueTarget) (string, string) {
fork, err := s.githubClient.ForkRepository(ctx, target.Owner, target.Repo)
if err != nil {
log.Printf("Fork fallback failed: %v", err)
return "", fmt.Sprintf("Failed to fork %s/%s for pull request: %v", target.Owner, target.Repo, err)
}
if err := s.githubClient.WaitForRepository(ctx, fork.Owner.Login, target.Repo, 60*time.Second); err != nil {
log.Printf("Fork not ready: %v", err)
return "", fmt.Sprintf("Fork %s/%s was not ready in time: %v", fork.Owner.Login, target.Repo, err)
}
forkURL := fork.CloneURL
if forkURL == "" {
forkURL = fmt.Sprintf("https://github.com/%s/%s.git", fork.Owner.Login, target.Repo)
}
if err := s.githubClient.PushBranchToRemote(ctx, projectPath, target.BranchName, forkURL, "octra-fork"); err != nil {
log.Printf("Failed to push branch to fork: %v", err)
return "", fmt.Sprintf("Failed to push branch %q to fork %s/%s: %v", target.BranchName, fork.Owner.Login, target.Repo, err)
}
log.Printf("Pushed branch to fork %s/%s, opening cross-repo pull request", fork.Owner.Login, target.Repo)
return fmt.Sprintf("%s:%s", fork.Owner.Login, target.BranchName), ""
}

func pullRequestTitle(task *models.Task, target *gh.IssueTarget) string {
title := target.IssueTitle
if title == "" {
Expand Down Expand Up @@ -171,4 +210,3 @@ func extractRemoteURL(projectPath string) string {
}
return strings.TrimSpace(string(out))
}

62 changes: 49 additions & 13 deletions orchestrator/internal/service/rules/boss/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ func (s *Service) mergeManagerBranches(repoPath string, roles []models.ManagerRo
// Использует FlakeBuilder для генерации богатого flake.nix с зависимостями
// на основе techStack, определённого AI на этапе планирования.
// После записи flake.nix:
// 1. Генерирует flake.lock для закрепления версий зависимостей.
// 2. Коммитит flake.nix + flake.lock в git, чтобы они не потерялись
// при последующих git-операциях (ветвление/мерж воркеров).
// 1. Генерирует flake.lock для закрепления версий зависимостей.
// 2. Коммитит flake.nix + flake.lock в git, чтобы они не потерялись
// при последующих git-операциях (ветвление/мерж воркеров).
func (s *Service) generateFlake(projectPath, taskID, title string, techStack []string, progress rules.ProgressFunc) {
packages := NewFlakeBuilder().ResolveFromTechStacks(techStack)
s.WriteFlake(projectPath, taskID, title, packages)
Expand Down Expand Up @@ -228,15 +228,15 @@ func prepareSnapshotDir(projectPath string) (string, error) {
}
src := filepath.Join(projectPath, file)
dst := filepath.Join(stagingDir, file)
if err := copyFile(src, dst); err != nil {
if _, err := copyFile(src, dst); err != nil {
os.RemoveAll(stagingDir)
return "", fmt.Errorf("failed to copy %s: %w", file, err)
}
}

flakeSrc := filepath.Join(projectPath, "flake.nix")
if _, err := os.Stat(flakeSrc); err == nil {
if err := copyFile(flakeSrc, filepath.Join(stagingDir, "flake.nix")); err != nil {
if _, err := copyFile(flakeSrc, filepath.Join(stagingDir, "flake.nix")); err != nil {
os.RemoveAll(stagingDir)
return "", fmt.Errorf("failed to copy flake.nix: %w", err)
}
Expand All @@ -245,25 +245,62 @@ func prepareSnapshotDir(projectPath string) (string, error) {
return stagingDir, nil
}

// copyFile — копирует файл с созданием родительских директорий
func copyFile(src, dst string) error {
// copyFile — копирует файл с созданием родительских директорий.
// Возвращает (skipped=true, nil) для записей, которые нельзя/не нужно копировать
// как обычный файл: симлинки в Nix store (например, `result` от `nix build`),
// директории и битые симлинки. Раньше попытка скопировать такой `result` падала
// с `copy_file_range: is a directory` и срывала весь снапшот проекта (issue #85).
func copyFile(src, dst string) (skipped bool, err error) {
info, err := os.Lstat(src)
if err != nil {
return false, err
}

// Симлинки: не идём по ссылке (это и вызывало падение на `result`,
// указывающем на директорию в /nix/store), а воссоздаём саму ссылку.
if info.Mode()&os.ModeSymlink != 0 {
target, rerr := os.Readlink(src)
if rerr != nil {
return false, rerr
}
// Артефакты сборки Nix (`result`, `result-*`) не должны попадать в снапшот.
if strings.HasPrefix(target, "/nix/store") {
return true, nil
}
// Битый симлинк или указывает на директорию — пропускаем, чтобы не падать.
if resolved, serr := os.Stat(src); serr != nil || resolved.IsDir() {
return true, nil
}
if err := os.MkdirAll(filepath.Dir(dst), 0755); err != nil {
return false, err
}
return false, os.Symlink(target, dst)
}

// git ls-files не должен возвращать директории, но на всякий случай пропускаем.
if info.IsDir() {
return true, nil
}

if err := os.MkdirAll(filepath.Dir(dst), 0755); err != nil {
return err
return false, err
}
srcFile, err := os.Open(src)
if err != nil {
return err
return false, err
}
defer srcFile.Close()

dstFile, err := os.Create(dst)
if err != nil {
return err
return false, err
}
defer dstFile.Close()

_, err = io.Copy(dstFile, srcFile)
return err
if _, err := io.Copy(dstFile, srcFile); err != nil {
return false, err
}
return false, nil
}

// registerGCRoot — регистрирует store path как GC root, чтобы Nix не удалил его
Expand Down Expand Up @@ -459,4 +496,3 @@ func envOrDefault(key, def string) string {
}
return def
}

Loading
Loading