Skip to content

Commit 49f079b

Browse files
committed
Revert "feat: adding parallel processing"
This reverts commit c3c1bcd.
1 parent c3c1bcd commit 49f079b

File tree

4 files changed

+56
-208
lines changed

4 files changed

+56
-208
lines changed

cmd/gitops/main.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,6 @@ func main() {
6969
Usage: "display unchanged secrets in the plan overview",
7070
EnvVars: []string{"GITOPS_SHOW_UNCHANGED"},
7171
},
72-
&cli.IntFlag{
73-
Name: "parallelism",
74-
Aliases: []string{"p"},
75-
Value: 5,
76-
Usage: "number of parallel operations for decrypting secrets and kubernetes operations",
77-
EnvVars: []string{"GITOPS_PARALLELISM"},
78-
},
7972
},
8073
Commands: []*cli.Command{
8174
{

internal/kubernetes/kubernetes.go

Lines changed: 28 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kubernetes
33
import (
44
"fmt"
55
"strings"
6-
"sync"
76

87
"github.com/TwiN/go-color"
98
"github.com/google/uuid"
@@ -123,11 +122,6 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) {
123122
Items: []plan.PlanItem{},
124123
}
125124

126-
parallelism := c.Int("parallelism")
127-
if parallelism < 1 {
128-
parallelism = 1
129-
}
130-
131125
bar := progressbar.NewOptions(len(localSecrets),
132126
progressbar.OptionEnableColorCodes(true),
133127
progressbar.OptionShowBytes(false),
@@ -138,79 +132,38 @@ func createKubernetesPlan(c *cli.Context) (*plan.Plan, error) {
138132
progressbar.OptionSetDescription("[green][Syncing local state with cluster][reset]"),
139133
)
140134

141-
// Create channels for parallel processing
142-
type planItemResult struct {
143-
item plan.PlanItem
144-
err error
145-
}
146-
147-
secretChan := make(chan *secret.Secret, len(localSecrets))
148-
resultChan := make(chan planItemResult, len(localSecrets))
149-
150-
// Start worker goroutines
151-
var workerGroup sync.WaitGroup
152-
for i := 0; i < parallelism; i++ {
153-
workerGroup.Add(1)
154-
go func() {
155-
defer workerGroup.Done()
156-
for localSecret := range secretChan {
157-
// check for local secret in state
158-
// update ID in localSecret if secret exists in state
159-
// update hash in state if secret exists in state
160-
stateSecret := state.GetState().GetByPath(localSecret.Path)
161-
if stateSecret == nil {
162-
log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state")
163-
localSecret.ID = uuid.New().String()
164-
stateSecret = state.GetState().Add(localSecret)
165-
} else {
166-
log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating")
167-
stateSecret.Update(localSecret)
168-
}
169-
170-
planItem := plan.PlanItem{
171-
LocalSecret: localSecret,
172-
}
173-
174-
remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target)
175-
if err != nil {
176-
if k8sErrors.IsNotFound(err) {
177-
log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster")
178-
} else {
179-
log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster")
180-
resultChan <- planItemResult{err: err}
181-
return
182-
}
183-
}
184-
185-
planItem.RemoteSecret = remoteSecret
186-
planItem.ComputeDiff()
187-
resultChan <- planItemResult{item: planItem, err: nil}
188-
}
189-
}()
190-
}
191-
192-
// Send work to workers
193-
go func() {
194-
for _, localSecret := range localSecrets {
195-
secretChan <- localSecret
135+
for _, localSecret := range localSecrets {
136+
bar.Add(1)
137+
// check for local secret in state
138+
// update ID in localSecret if secret exists in state
139+
// update hash in state if secret exists in state
140+
stateSecret := state.GetState().GetByPath(localSecret.Path)
141+
if stateSecret == nil {
142+
log.Trace("Secret ", localSecret.CombinedName(), " does not exist in state")
143+
localSecret.ID = uuid.New().String()
144+
stateSecret = state.GetState().Add(localSecret)
145+
} else {
146+
log.Trace("Secret ", localSecret.CombinedName(), " exists in state. Updating")
147+
stateSecret.Update(localSecret)
196148
}
197-
close(secretChan)
198-
}()
199149

200-
// Wait for all workers to finish and close result channel
201-
go func() {
202-
workerGroup.Wait()
203-
close(resultChan)
204-
}()
150+
planItem := plan.PlanItem{
151+
LocalSecret: localSecret,
152+
}
205153

206-
// Collect results
207-
for result := range resultChan {
208-
bar.Add(1)
209-
if result.err != nil {
210-
bar.Finish()
211-
return nil, result.err
154+
remoteSecret, err := k8s.GetSecret(localSecret, localSecret.Target)
155+
if err != nil {
156+
if k8sErrors.IsNotFound(err) {
157+
log.Trace("Secret ", localSecret.Name, " does not exist in Kubernetes cluster")
158+
} else {
159+
log.Error("Failed to get secret ", localSecret.Name, " from Kubernetes cluster")
160+
return nil, err
161+
}
212162
}
213-
p.AddItem(result.item)
163+
164+
planItem.RemoteSecret = remoteSecret
165+
planItem.ComputeDiff()
166+
p.AddItem(planItem)
214167
}
215168
bar.Finish()
216169
println("")

internal/plan/plan.go

Lines changed: 22 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package plan
22

33
import (
4-
"sync"
5-
64
log "github.com/sirupsen/logrus"
75

86
"github.com/mxcd/gitops-cli/internal/k8s"
97
"github.com/mxcd/gitops-cli/internal/secret"
10-
"github.com/mxcd/gitops-cli/internal/util"
118
)
129

1310
type Plan struct {
@@ -67,83 +64,33 @@ func (p *Plan) Execute() error {
6764
}
6865

6966
func executeKubernetesPlan(p *Plan) error {
70-
parallelism := util.GetCliContext().Int("parallelism")
71-
if parallelism < 1 {
72-
parallelism = 1
73-
}
74-
75-
// Filter items that need processing
76-
itemsToProcess := []PlanItem{}
7767
for _, item := range p.Items {
78-
if !item.Diff.Equal {
79-
itemsToProcess = append(itemsToProcess, item)
80-
} else {
68+
if item.Diff.Equal {
8169
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is equal, skipping...")
70+
continue
8271
}
83-
}
84-
85-
if len(itemsToProcess) == 0 {
86-
return nil
87-
}
88-
89-
// Create channels for parallel processing
90-
itemChan := make(chan PlanItem, len(itemsToProcess))
91-
errorChan := make(chan error, len(itemsToProcess))
92-
93-
// Start worker goroutines
94-
var workerGroup sync.WaitGroup
95-
for i := 0; i < parallelism; i++ {
96-
workerGroup.Add(1)
97-
go func() {
98-
defer workerGroup.Done()
99-
for item := range itemChan {
100-
var err error
101-
if item.Diff.Type == secret.SecretDiffTypeAdded {
102-
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...")
103-
err = k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target)
104-
if err != nil {
105-
log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
106-
}
107-
} else if item.Diff.Type == secret.SecretDiffTypeChanged {
108-
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...")
109-
err = k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target)
110-
if err != nil {
111-
log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
112-
}
113-
} else if item.Diff.Type == secret.SecretDiffTypeRemoved {
114-
log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...")
115-
err = k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target)
116-
if err != nil {
117-
log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster")
118-
}
119-
}
120-
if err != nil {
121-
errorChan <- err
122-
}
72+
if item.Diff.Type == secret.SecretDiffTypeAdded {
73+
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is new, creating...")
74+
err := k8s.CreateSecret(item.LocalSecret, item.LocalSecret.Target)
75+
if err != nil {
76+
log.Error("Failed to create secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
77+
return err
78+
}
79+
} else if item.Diff.Type == secret.SecretDiffTypeChanged {
80+
log.Trace("Secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " is modified, updating...")
81+
err := k8s.UpdateSecret(item.LocalSecret, item.LocalSecret.Target)
82+
if err != nil {
83+
log.Error("Failed to update secret ", item.LocalSecret.Namespace, "/", item.LocalSecret.Name, " in cluster")
84+
return err
85+
}
86+
} else if item.Diff.Type == secret.SecretDiffTypeRemoved {
87+
log.Trace("Secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " is deleted, deleting...")
88+
err := k8s.DeleteSecret(item.RemoteSecret, item.RemoteSecret.Target)
89+
if err != nil {
90+
log.Error("Failed to delete secret ", item.RemoteSecret.Namespace, "/", item.RemoteSecret.Name, " in cluster")
91+
return err
12392
}
124-
}()
125-
}
126-
127-
// Send work to workers
128-
go func() {
129-
for _, item := range itemsToProcess {
130-
itemChan <- item
131-
}
132-
close(itemChan)
133-
}()
134-
135-
// Wait for all workers to finish
136-
go func() {
137-
workerGroup.Wait()
138-
close(errorChan)
139-
}()
140-
141-
// Check for errors
142-
for err := range errorChan {
143-
if err != nil {
144-
return err
14593
}
14694
}
147-
14895
return nil
14996
}

internal/secret/loader.go

Lines changed: 6 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package secret
33
import (
44
"errors"
55
"strings"
6-
"sync"
76

87
"github.com/mxcd/gitops-cli/internal/util"
98
"github.com/schollz/progressbar/v3"
@@ -41,11 +40,6 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s
4140
}
4241
secretFileNames = filteredFileNames
4342

44-
parallelism := util.GetCliContext().Int("parallelism")
45-
if parallelism < 1 {
46-
parallelism = 1
47-
}
48-
4943
secrets := []*Secret{}
5044
bar := progressbar.NewOptions(len(secretFileNames),
5145
progressbar.OptionEnableColorCodes(true),
@@ -56,58 +50,19 @@ func LoadLocalSecretsLimited(targetTypeFilter SecretTargetType, directoryLimit s
5650
progressbar.OptionSetPredictTime(false),
5751
progressbar.OptionSetDescription("[green][Loading local secrets][reset]"),
5852
)
59-
60-
// Create channels for parallel processing
61-
type secretResult struct {
62-
secret *Secret
63-
err error
64-
}
65-
66-
secretChan := make(chan string, len(secretFileNames))
67-
resultChan := make(chan secretResult, len(secretFileNames))
68-
69-
// Start worker goroutines
70-
var workerGroup sync.WaitGroup
71-
for i := 0; i < parallelism; i++ {
72-
workerGroup.Add(1)
73-
go func() {
74-
defer workerGroup.Done()
75-
for secretFileName := range secretChan {
76-
secret, err := FromPath(secretFileName)
77-
resultChan <- secretResult{secret: secret, err: err}
78-
}
79-
}()
80-
}
81-
82-
// Send work to workers
83-
go func() {
84-
for _, secretFileName := range secretFileNames {
85-
secretChan <- secretFileName
86-
}
87-
close(secretChan)
88-
}()
89-
90-
// Wait for all workers to finish and close result channel
91-
go func() {
92-
workerGroup.Wait()
93-
close(resultChan)
94-
}()
95-
96-
// Collect results
97-
for result := range resultChan {
53+
for _, secretFileName := range secretFileNames {
9854
bar.Add(1)
99-
if result.err != nil {
55+
secret, err := FromPath(secretFileName)
56+
if err != nil {
10057
bar.Finish()
101-
return nil, result.err
58+
return nil, err
10259
}
103-
104-
secret := result.secret
10560
if secret.TargetType != targetTypeFilter && targetTypeFilter != SecretTargetTypeAll {
106-
log.Trace("Skipping file due to targetType filter: ", secret.Path)
61+
log.Trace("Skipping file due to targetType filter: ", secretFileName)
10762
continue
10863
}
10964
if clusterLimit != "" && secret.Target != clusterLimit {
110-
log.Trace("Skipping file due to target filter: ", secret.Path)
65+
log.Trace("Skipping file due to target filter: ", secretFileName)
11166
continue
11267
}
11368
for _, s := range secrets {

0 commit comments

Comments
 (0)