1- """nlboot-compatible SourceOS boot adapter skeleton .
1+ """nlboot-compatible SourceOS boot adapter.
22
3- This module defines the first executable boundary between the original nlboot
4- shape and SourceOS BootReleaseSet v1. It deliberately does not perform network
5- or kexec actions yet; it normalizes request/response objects and produces an
6- evidence record that the boot client and Prophet Platform can agree on .
3+ This module defines the executable boundary between the nlboot safe planner
4+ shape and SourceOS BootReleaseSet v1. It deliberately avoids network, disk, and
5+ kexec side effects. It maps nlboot manifest/token/plan-shaped dictionaries into
6+ SourceOS control-plane payloads and evidence envelopes .
77"""
88
99from __future__ import annotations
1313from typing import Any
1414
1515
16+ BOOT_MODE_TO_CHANNEL = {
17+ "installer" : "installer" ,
18+ "recovery" : "recovery" ,
19+ "ephemeral" : "live" ,
20+ "bootstrap" : "live" ,
21+ }
22+
23+ BOOT_MODE_TO_ACTION = {
24+ "installer" : "install" ,
25+ "recovery" : "repair" ,
26+ "ephemeral" : "kexec" ,
27+ "bootstrap" : "enroll" ,
28+ }
29+
30+
1631@dataclass (frozen = True )
1732class DeviceClaim :
1833 """Minimal self-registration claim emitted by a boot environment."""
@@ -75,8 +90,81 @@ def to_dict(self) -> dict[str, Any]:
7590 }
7691
7792
93+ @dataclass (frozen = True )
94+ class NlbootManifestView :
95+ """Normalized subset of nlboot SignedBootManifest fields."""
96+
97+ manifest_id : str
98+ boot_release_set_id : str
99+ base_release_set_ref : str
100+ boot_mode : str
101+ artifacts : dict [str , str ]
102+ signature_ref : str
103+ signer_ref : str
104+ signature_algorithm : str
105+ crypto_profile : str
106+
107+ @classmethod
108+ def from_dict (cls , data : dict [str , Any ]) -> "NlbootManifestView" :
109+ return cls (
110+ manifest_id = _required_str (data , "manifest_id" ),
111+ boot_release_set_id = _required_str (data , "boot_release_set_id" ),
112+ base_release_set_ref = _required_str (data , "base_release_set_ref" ),
113+ boot_mode = _required_str (data , "boot_mode" ),
114+ artifacts = _required_dict_of_str (data , "artifacts" ),
115+ signature_ref = _required_str (data , "signature_ref" ),
116+ signer_ref = _required_str (data , "signer_ref" ),
117+ signature_algorithm = _required_str (data , "signature_algorithm" ),
118+ crypto_profile = _required_str (data , "crypto_profile" ),
119+ )
120+
121+
122+ @dataclass (frozen = True )
123+ class NlbootTokenView :
124+ """Normalized subset of nlboot EnrollmentToken fields."""
125+
126+ token_id : str
127+ purpose : str
128+ expires_at : str
129+ release_set_ref : str | None
130+ boot_release_set_ref : str | None
131+
132+ @classmethod
133+ def from_dict (cls , data : dict [str , Any ]) -> "NlbootTokenView" :
134+ return cls (
135+ token_id = _required_str (data , "token_id" ),
136+ purpose = _required_str (data , "purpose" ),
137+ expires_at = _required_str (data , "expires_at" ),
138+ release_set_ref = _optional_str (data , "release_set_ref" ),
139+ boot_release_set_ref = _optional_str (data , "boot_release_set_ref" ),
140+ )
141+
142+
143+ def _required_str (data : dict [str , Any ], key : str ) -> str :
144+ value = data .get (key )
145+ if not isinstance (value , str ) or not value :
146+ raise ValueError (f"{ key } must be a non-empty string" )
147+ return value
148+
149+
150+ def _optional_str (data : dict [str , Any ], key : str ) -> str | None :
151+ value = data .get (key )
152+ if value is None :
153+ return None
154+ if not isinstance (value , str ):
155+ raise ValueError (f"{ key } must be a string or null" )
156+ return value
157+
158+
159+ def _required_dict_of_str (data : dict [str , Any ], key : str ) -> dict [str , str ]:
160+ value = data .get (key )
161+ if not isinstance (value , dict ):
162+ raise ValueError (f"{ key } must be an object" )
163+ return {str (k ): _required_str (value , str (k )) for k in value }
164+
165+
78166class SourceOSBootAdapter :
79- """Pure adapter for the nlboot-like control-plane handshake .
167+ """Pure adapter for nlboot-like control-plane handshakes .
80168
81169 The runtime flow this class models is:
82170
@@ -86,13 +174,52 @@ class SourceOSBootAdapter:
86174 def build_announce_payload (self , claim : DeviceClaim ) -> dict [str , Any ]:
87175 return {"kind" : "SourceOSBootAnnounce" , "apiVersion" : "sourceos.dev/v1" , "claim" : claim .to_dict ()}
88176
177+ def authorization_from_nlboot_token (self , token_doc : dict [str , Any ], * , correlation_id : str ) -> BootAuthorization :
178+ token = NlbootTokenView .from_dict (token_doc )
179+ if token .boot_release_set_ref is None :
180+ raise ValueError ("nlboot token must include boot_release_set_ref" )
181+ return BootAuthorization (
182+ correlation_id = correlation_id ,
183+ boot_release_set_ref = token .boot_release_set_ref ,
184+ token_id = token .token_id ,
185+ expires_at = token .expires_at ,
186+ )
187+
89188 def build_fetch_request (self , authorization : BootAuthorization ) -> dict [str , Any ]:
90189 return {
91190 "kind" : "SourceOSBootFetchRequest" ,
92191 "apiVersion" : "sourceos.dev/v1" ,
93192 "authorization" : authorization .to_dict (),
94193 }
95194
195+ def boot_release_set_patch_from_nlboot_manifest (self , manifest_doc : dict [str , Any ]) -> dict [str , Any ]:
196+ manifest = NlbootManifestView .from_dict (manifest_doc )
197+ selected_channel = BOOT_MODE_TO_CHANNEL .get (manifest .boot_mode )
198+ if selected_channel is None :
199+ raise ValueError (f"unsupported nlboot boot_mode={ manifest .boot_mode !r} " )
200+ return {
201+ "releaseSetRef" : manifest .base_release_set_ref ,
202+ "channels" : [selected_channel ],
203+ "artifacts" : [
204+ {"name" : "kernel" , "role" : "kernel" , "uri" : manifest .artifacts ["kernel_ref" ], "sha256" : _unknown_sha256 ()},
205+ {"name" : "initrd" , "role" : "initrd" , "uri" : manifest .artifacts ["initrd_ref" ], "sha256" : _unknown_sha256 ()},
206+ {"name" : "rootfs" , "role" : "rootfs" , "uri" : manifest .artifacts ["rootfs_ref" ], "sha256" : _unknown_sha256 ()},
207+ ],
208+ "signature" : {
209+ "type" : "x509" if manifest .signature_algorithm == "rsa-pss-sha256" else "other" ,
210+ "bundleRef" : manifest .signature_ref ,
211+ "digest" : "sha256:" + _unknown_sha256 (),
212+ },
213+ "provenance" : {
214+ "builderId" : manifest .signer_ref ,
215+ "sourceRefs" : [manifest .manifest_id ],
216+ "attestations" : ["slsa" , "in-toto" ],
217+ },
218+ "policy" : {
219+ "allowedActions" : ["announce" , "enroll" , "fetch" , "verify" , BOOT_MODE_TO_ACTION [manifest .boot_mode ], "attest" ]
220+ },
221+ }
222+
96223 def build_evidence (
97224 self ,
98225 * ,
@@ -119,3 +246,29 @@ def build_evidence(
119246 verification_result = verification_result ,
120247 reports = reports ,
121248 )
249+
250+ def build_evidence_from_nlboot_manifest (
251+ self ,
252+ * ,
253+ claim : DeviceClaim ,
254+ authorization : BootAuthorization ,
255+ manifest_doc : dict [str , Any ],
256+ manifest_hash : str ,
257+ verification_result : str ,
258+ ) -> BootEvidence :
259+ manifest = NlbootManifestView .from_dict (manifest_doc )
260+ channel = BOOT_MODE_TO_CHANNEL .get (manifest .boot_mode )
261+ if channel is None :
262+ raise ValueError (f"unsupported nlboot boot_mode={ manifest .boot_mode !r} " )
263+ return self .build_evidence (
264+ claim = claim ,
265+ authorization = authorization ,
266+ selected_channel = channel ,
267+ boot_mode = manifest .boot_mode ,
268+ manifest_hash = manifest_hash ,
269+ verification_result = verification_result ,
270+ )
271+
272+
273+ def _unknown_sha256 () -> str :
274+ return "0" * 64
0 commit comments