-
Notifications
You must be signed in to change notification settings - Fork 5
feat: c1z sanitizer v0.1 — library + CLI #875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| // baton-c1z-sanitize transforms a .c1z snapshot into an identity- | ||
| // stripped copy whose graph topology and cardinalities are preserved. | ||
| // See pkg/c1zsanitize for the transform contract. | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "crypto/rand" | ||
| "flag" | ||
| "fmt" | ||
| "os" | ||
| "time" | ||
|
|
||
| "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" | ||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/conductorone/baton-sdk/pkg/c1zsanitize" | ||
| "github.com/conductorone/baton-sdk/pkg/dotc1z" | ||
| "github.com/conductorone/baton-sdk/pkg/logging" | ||
| ) | ||
|
|
||
| func main() { | ||
| if err := run(); err != nil { | ||
| fmt.Fprintln(os.Stderr, "baton-c1z-sanitize:", err) | ||
| os.Exit(1) | ||
| } | ||
| } | ||
|
|
||
| func run() error { | ||
| fs := flag.NewFlagSet("baton-c1z-sanitize", flag.ContinueOnError) | ||
| inPath := fs.String("in", "", "Path to source .c1z file (required)") | ||
| outPath := fs.String("out", "", "Path to sanitized .c1z output file (required)") | ||
| secretFile := fs.String("secret-file", "", "Path to per-c1z HMAC secret (>=32 random bytes). If unset, a fresh secret is generated and written next to -out.") | ||
| anchorRaw := fs.String("anchor", "", "RFC3339 timestamp the newest source timestamp lands on. Defaults to now.") | ||
| allowUnknown := fs.Bool("allow-unknown-annotations", false, "Pass annotations of unknown type through unchanged instead of dropping. Dangerous on real customer data.") | ||
| logLevel := fs.String("log-level", "info", "Log level: debug, info, warn, error.") | ||
| logFormat := fs.String("log-format", "console", "Log format: console or json.") | ||
|
|
||
| if err := fs.Parse(os.Args[1:]); err != nil { | ||
| return err | ||
| } | ||
| if *inPath == "" || *outPath == "" { | ||
| fs.Usage() | ||
| return fmt.Errorf("-in and -out are required") | ||
| } | ||
|
|
||
| ctx, err := logging.Init(context.Background(), | ||
| logging.WithLogLevel(*logLevel), | ||
| logging.WithLogFormat(*logFormat), | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("init logging: %w", err) | ||
| } | ||
| log := ctxzap.Extract(ctx) | ||
|
|
||
| secret, generated, err := loadOrGenerateSecret(*secretFile, *outPath) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if generated { | ||
| log.Warn("c1zsanitize: generated fresh secret; archive it if you want reversibility", | ||
| zap.String("path", secretPath(*secretFile, *outPath))) | ||
| } | ||
|
|
||
| var anchor time.Time | ||
| if *anchorRaw != "" { | ||
| anchor, err = time.Parse(time.RFC3339, *anchorRaw) | ||
| if err != nil { | ||
| return fmt.Errorf("parse -anchor: %w", err) | ||
| } | ||
| } | ||
|
|
||
| if _, err := os.Stat(*inPath); err != nil { | ||
| return fmt.Errorf("stat -in: %w", err) | ||
| } | ||
| if _, err := os.Stat(*outPath); err == nil { | ||
| return fmt.Errorf("-out path %q already exists; refusing to overwrite", *outPath) | ||
| } | ||
|
|
||
| src, err := dotc1z.NewC1ZFile(ctx, *inPath, dotc1z.WithReadOnly(true)) | ||
| if err != nil { | ||
| return fmt.Errorf("open source c1z: %w", err) | ||
| } | ||
| defer src.Close(ctx) | ||
|
|
||
| dst, err := dotc1z.NewC1ZFile(ctx, *outPath) | ||
| if err != nil { | ||
| return fmt.Errorf("open dst c1z: %w", err) | ||
| } | ||
| dstClosed := false | ||
| defer func() { | ||
| if !dstClosed { | ||
| _ = dst.Close(ctx) | ||
| } | ||
| }() | ||
|
|
||
| opts := c1zsanitize.Options{ | ||
| Secret: secret, | ||
| TimestampAnchor: anchor, | ||
| DropUnknownAnnotations: !*allowUnknown, | ||
| } | ||
|
|
||
| log.Info("c1zsanitize: starting", | ||
| zap.String("in", *inPath), | ||
| zap.String("out", *outPath), | ||
| zap.Bool("drop_unknown_annotations", opts.DropUnknownAnnotations), | ||
| ) | ||
| start := time.Now() | ||
| if err := c1zsanitize.Sanitize(ctx, src, dst, opts); err != nil { | ||
| return fmt.Errorf("sanitize: %w", err) | ||
| } | ||
| dstClosed = true | ||
| if err := dst.Close(ctx); err != nil { | ||
| return fmt.Errorf("close dst c1z: %w", err) | ||
| } | ||
| log.Info("c1zsanitize: done", zap.Duration("elapsed", time.Since(start))) | ||
| return nil | ||
| } | ||
|
|
||
| func secretPath(flagPath, outPath string) string { | ||
| if flagPath != "" { | ||
| return flagPath | ||
| } | ||
| return outPath + ".secret" | ||
| } | ||
|
|
||
| func loadOrGenerateSecret(flagPath, outPath string) ([]byte, bool, error) { | ||
| if flagPath != "" { | ||
| b, err := os.ReadFile(flagPath) | ||
| if err != nil { | ||
| return nil, false, fmt.Errorf("read -secret-file: %w", err) | ||
| } | ||
| if len(b) < c1zsanitize.MinSecretBytes { | ||
| return nil, false, fmt.Errorf("-secret-file %q is too short: got %d bytes, want at least %d", flagPath, len(b), c1zsanitize.MinSecretBytes) | ||
| } | ||
| return b, false, nil | ||
| } | ||
| path := secretPath(flagPath, outPath) | ||
| if _, err := os.Stat(path); err == nil { | ||
| return nil, false, fmt.Errorf("default secret path %q already exists; pass -secret-file to reuse it", path) | ||
| } | ||
| b := make([]byte, c1zsanitize.MinSecretBytes) | ||
| if _, err := rand.Read(b); err != nil { | ||
| return nil, false, fmt.Errorf("generate secret: %w", err) | ||
| } | ||
| if err := os.WriteFile(path, b, 0o600); err != nil { | ||
| return nil, false, fmt.Errorf("write generated secret: %w", err) | ||
| } | ||
| return b, true, nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| package c1zsanitize | ||
|
|
||
| import ( | ||
| "go.uber.org/zap" | ||
| "google.golang.org/protobuf/proto" | ||
| "google.golang.org/protobuf/types/known/anypb" | ||
|
|
||
| v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" | ||
| ) | ||
|
|
||
| // annotationHandler returns a sanitized replacement for the supplied | ||
| // annotation message. Implementations may register additional asset | ||
| // references via the supplied set so copyAssets can pick them up at | ||
| // end-of-sync. Implementations MUST NOT mutate the input. | ||
| type annotationHandler func(s *sanitizer, msg proto.Message, refs *assetRefSet) proto.Message | ||
|
|
||
| func defaultAnnotationHandlers() map[string]annotationHandler { | ||
| return map[string]annotationHandler{ | ||
| typeURL(&v2.UserTrait{}): handleUserTrait, | ||
| typeURL(&v2.GroupTrait{}): handleGroupTrait, | ||
| typeURL(&v2.AppTrait{}): handleAppTrait, | ||
| typeURL(&v2.RoleTrait{}): handleRoleTrait, | ||
| typeURL(&v2.SecretTrait{}): handleSecretTrait, | ||
| typeURL(&v2.LicenseProfileTrait{}): handleLicenseProfileTrait, | ||
| typeURL(&v2.ScopeBindingTrait{}): handleScopeBindingTrait, | ||
| } | ||
| } | ||
|
|
||
| // typeURL returns the canonical anypb type-URL for a proto message. | ||
| // It matches what anypb.New would set on Any.TypeUrl, so it works as | ||
| // a lookup key against the values returned by Any.GetTypeUrl(). | ||
| func typeURL(m proto.Message) string { | ||
| a, err := anypb.New(m) | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
| return a.GetTypeUrl() | ||
| } | ||
|
|
||
| // transformAnnotations walks the slice once, dispatching each entry | ||
| // on its Any type URL. Unknown types are dropped by default (with a | ||
| // log line naming the URL) or passed through unchanged if the | ||
| // operator opted in via Options.DropUnknownAnnotations=false. | ||
| func (s *sanitizer) transformAnnotations(in []*anypb.Any, refs *assetRefSet) []*anypb.Any { | ||
| if len(in) == 0 { | ||
| return nil | ||
| } | ||
| out := make([]*anypb.Any, 0, len(in)) | ||
| for _, a := range in { | ||
| if a == nil { | ||
| continue | ||
| } | ||
| handler, ok := s.handlers[a.GetTypeUrl()] | ||
| if !ok { | ||
| if s.dropUnknownAnnotations { | ||
| s.log.Debug("c1zsanitize: dropping unknown annotation", zap.String("type_url", a.GetTypeUrl())) | ||
| continue | ||
| } | ||
| s.log.Warn("c1zsanitize: passing unknown annotation through unchanged", zap.String("type_url", a.GetTypeUrl())) | ||
| out = append(out, a) | ||
| continue | ||
| } | ||
| msg, err := a.UnmarshalNew() | ||
| if err != nil { | ||
| s.log.Warn("c1zsanitize: failed to unmarshal annotation; dropping", | ||
| zap.String("type_url", a.GetTypeUrl()), zap.Error(err)) | ||
| continue | ||
| } | ||
| sanitized := handler(s, msg, refs) | ||
| if sanitized == nil { | ||
| continue | ||
| } | ||
| repacked, err := anypb.New(sanitized) | ||
| if err != nil { | ||
| s.log.Warn("c1zsanitize: failed to repack annotation; dropping", | ||
| zap.String("type_url", a.GetTypeUrl()), zap.Error(err)) | ||
| continue | ||
| } | ||
| out = append(out, repacked) | ||
| } | ||
| if len(out) == 0 { | ||
| return nil | ||
| } | ||
| return out | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| package c1zsanitize | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "strings" | ||
| "sync" | ||
|
|
||
| "go.uber.org/zap" | ||
|
|
||
| v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" | ||
| "github.com/conductorone/baton-sdk/pkg/connectorstore" | ||
| ) | ||
|
|
||
| // AssetRecord.data is replaced with a deterministic placeholder | ||
| // matching the original content type. The placeholder bytes are | ||
| // chosen so that consumers that inspect content_type without parsing | ||
| // the payload still see a sensible content_type/length pair. | ||
| // | ||
| // content_type values not in the explicit map fall through to a | ||
| // single zero byte; that's enough to keep the row non-empty (PutAsset | ||
| // silently drops empty data) while preserving the cross-reference. | ||
| var placeholderByContentType = map[string][]byte{ | ||
| // 1x1 transparent PNG. | ||
| "image/png": { | ||
| 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, | ||
| 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52, | ||
| 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, | ||
| 0x08, 0x06, 0x00, 0x00, 0x00, 0x1f, 0x15, 0xc4, | ||
| 0x89, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x44, 0x41, | ||
| 0x54, 0x78, 0x9c, 0x62, 0x00, 0x01, 0x00, 0x00, | ||
| 0x05, 0x00, 0x01, 0x0d, 0x0a, 0x2d, 0xb4, 0x00, | ||
| 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44, 0xae, | ||
| 0x42, 0x60, 0x82, | ||
| }, | ||
| } | ||
|
|
||
| // placeholderForContentType returns the bytes the sanitizer writes | ||
| // in place of the original asset payload. The single-byte fallback | ||
| // is intentional: zero-length data is silently dropped by PutAsset, | ||
| // which would break the cross-reference invariant. | ||
| func placeholderForContentType(contentType string) []byte { | ||
| if b, ok := placeholderByContentType[strings.ToLower(contentType)]; ok { | ||
| return b | ||
| } | ||
| if strings.HasPrefix(strings.ToLower(contentType), "image/") { | ||
| return placeholderByContentType["image/png"] | ||
| } | ||
| return []byte{0x00} | ||
| } | ||
|
|
||
| // assetRefSet collects every AssetRef.Id encountered while walking | ||
| // records during a sync. The set is drained after the record walk by | ||
| // copyAssets which fetches each original asset, replaces its payload | ||
| // with a placeholder, and writes the sanitized AssetRef into dst. | ||
| type assetRefSet struct { | ||
| mu sync.Mutex | ||
| m map[string]struct{} | ||
| } | ||
|
|
||
| func newAssetRefSet() *assetRefSet { | ||
| return &assetRefSet{m: map[string]struct{}{}} | ||
| } | ||
|
|
||
| func (a *assetRefSet) add(id string) { | ||
| if id == "" { | ||
| return | ||
| } | ||
| a.mu.Lock() | ||
| a.m[id] = struct{}{} | ||
| a.mu.Unlock() | ||
| } | ||
|
|
||
| func (a *assetRefSet) drain() []string { | ||
| a.mu.Lock() | ||
| defer a.mu.Unlock() | ||
| out := make([]string, 0, len(a.m)) | ||
| for id := range a.m { | ||
| out = append(out, id) | ||
| } | ||
| a.m = map[string]struct{}{} | ||
| return out | ||
| } | ||
|
|
||
| // drainAndClose consumes the reader and closes it if the underlying | ||
| // type also implements io.Closer. The connectorstore.Reader.GetAsset | ||
| // contract returns an io.Reader, but at least one implementation | ||
| // returns *os.File; not closing it leaks a fd per asset. | ||
| func drainAndClose(r io.Reader) error { | ||
| if c, ok := r.(io.Closer); ok { | ||
| defer c.Close() | ||
| } | ||
| _, err := io.Copy(io.Discard, r) | ||
| return err | ||
| } | ||
|
|
||
| func (s *sanitizer) copyAssets( | ||
| ctx context.Context, | ||
| src connectorstore.Reader, | ||
| dst connectorstore.Writer, | ||
| refs *assetRefSet, | ||
| ) error { | ||
| ids := refs.drain() | ||
| for _, srcID := range ids { | ||
| req := v2.AssetServiceGetAssetRequest_builder{ | ||
| Asset: v2.AssetRef_builder{Id: srcID}.Build(), | ||
| }.Build() | ||
| contentType, r, err := src.GetAsset(ctx, req) | ||
| if err != nil { | ||
| // Asset referenced from an annotation but missing from | ||
| // the asset table. Skip — we don't fabricate placeholder | ||
| // rows because the cross-reference invariant treats it | ||
| // as a known dangling pointer in the source. | ||
| s.log.Debug("c1zsanitize: asset ref not found in source", zap.String("asset_id", srcID), zap.Error(err)) | ||
| continue | ||
| } | ||
| if err := drainAndClose(r); err != nil && !errors.Is(err, io.EOF) { | ||
| return fmt.Errorf("drain source asset %s: %w", srcID, err) | ||
|
Comment on lines
+111
to
+120
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Suggestion: The reader |
||
| } | ||
| dstID := s.id(srcID) | ||
| if err := dst.PutAsset(ctx, v2.AssetRef_builder{Id: dstID}.Build(), contentType, placeholderForContentType(contentType)); err != nil { | ||
| return fmt.Errorf("put dst asset %s: %w", dstID, err) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 Bug:
dst.Close(ctx)is deferred so its error is silently dropped. For a write-mode C1File that must flush and compress the sqlite-zstd output, a failed close means the CLI reports success but the output.c1zis corrupt. Explicitly closedston the success path and check the error, keeping the defer only as a safety net for early-return error paths.