Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions archivist/archivist.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,27 @@ type
NodeServer* = ref object
config: NodeConf
restServer: RestServerRef
archivistNode: ArchivistNodeRef
archivistNode*: ArchivistNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
taskpool: Taskpool
started: bool # Track whether the node was started
discoveryStore: Datastore # Store reference to close explicitly

NodePrivateKey* = libp2p.PrivateKey # alias

func node*(self: NodeServer): ArchivistNodeRef =
return self.archivistNode

func repoStore*(self: NodeServer): RepoStore =
return self.repoStore

func dataDir*(self: NodeServer): string =
return string(self.config.dataDir)

func config*(self: NodeServer): NodeConf =
return self.config

proc connectMarketplace(s: NodeServer) {.async.} =
let config = s.config

Expand Down Expand Up @@ -106,24 +120,50 @@ proc start*(s: NodeServer) {.async.} =
await s.connectMarketplace()
await s.archivistNode.start()
s.restServer.start()
s.started = true

proc stop*(s: NodeServer) {.async.} =
notice "Stopping node"

let res = await noCancel allFinishedFailed[void](
@[
s.restServer.stop(),
s.archivistNode.switch.stop(),
s.archivistNode.stop(),
s.repoStore.stop(),
s.maintenance.stop(),
]
)

if res.failure.len > 0:
error "Failed to stop node", failures = res.failure.len
raiseAssert "Failed to stop node"

if not s.started:
# Close the discovery store to release the LevelDB lock
if not s.discoveryStore.isNil:
try:
discard await s.discoveryStore.close()
except Exception as e:
error "Failed to close discovery store", error = e.msg
if not s.taskpool.isNil:
s.taskpool.shutdown()
return

var futures: seq[Future[void]] = @[]

if not s.restServer.isNil:
futures.add(s.restServer.stop())

if not s.archivistNode.isNil:
futures.add(s.archivistNode.switch.stop())
futures.add(s.archivistNode.stop())

if not s.repoStore.isNil:
futures.add(s.repoStore.stop())

if not s.maintenance.isNil:
futures.add(s.maintenance.stop())

if futures.len > 0:
let res = await noCancel allFinishedFailed[void](futures)

if res.failure.len > 0:
error "Failed to stop node", failures = res.failure.len
raiseAssert "Failed to stop node"

# Close the discovery store to release the LevelDB lock
if not s.discoveryStore.isNil:
try:
discard await s.discoveryStore.close()
except Exception as e:
error "Failed to close discovery store", error = e.msg
if not s.taskpool.isNil:
s.taskpool.shutdown()

Expand Down Expand Up @@ -156,24 +196,27 @@ proc new*(
except CatchableError as exc:
raiseAssert("Failure in tp initialization:" & exc.msg)

info "Threadpool started", numThreads = tp.numThreads

let discoveryDir = config.dataDir / ArchivistDhtNamespace

if io2.createPath(discoveryDir).isErr:
trace "Unable to create discovery directory for block store",
discoveryDir = discoveryDir
raise (ref Defect)(
msg: "Unable to create discovery directory for block store: " & discoveryDir
)

let discoveryProvidersDir = config.dataDir / ArchivistDhtProvidersNamespace
if io2.createPath(discoveryProvidersDir).isErr:
raise (ref Defect)(
msg: "Unable to create discovery providers directory: " & discoveryProvidersDir
)

let
discoveryStore = Datastore(
LevelDbDatastore.new(config.dataDir / ArchivistDhtProvidersNamespace).expect(
LevelDbDatastore.new(discoveryProvidersDir).expect(
"Should create discovery datastore!"
)
)

let
discovery = Discovery.new(
switch.peerInfo.privateKey,
announceAddrs = config.listenAddrs,
Expand Down Expand Up @@ -264,4 +307,6 @@ proc new*(
repoStore: repoStore,
maintenance: maintenance,
taskpool: tp,
discoveryStore: discoveryStore,
started: false,
)
68 changes: 66 additions & 2 deletions archivist/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import pkg/chronicles/helpers
import pkg/chronicles/topics_registry
import pkg/confutils/defs
import pkg/confutils/std/net
import pkg/confutils/toml/defs
import pkg/toml_serialization
import pkg/serialization

type ConfTypes = InputFile | InputDir | OutPath | OutDir | OutFile
serializesAsBase(ConfTypes, Toml)
import pkg/metrics
import pkg/metrics/chronos_httpserver
import pkg/stew/byteutils
Expand Down Expand Up @@ -661,7 +666,67 @@ proc readValue*(
except CatchableError as err:
raise newException(SerializationError, err.msg)

# no idea why confutils needs this:
# TOML Serialization readValue procedures
proc readValue*(r: var TomlReader, value: var IpAddress) {.raises: [SerializationError, TomlError, IOError].} =
try:
value = parseIpAddress(r.parseAsString())
except ValueError as ex:
raise newException(SerializationError, ex.msg)

proc readValue*(r: var TomlReader, value: var Port) {.raises: [SerializationError, TomlError, IOError].} =
value = r.parseInt(int).Port

# TOML Serialization writeValue procedures
proc writeValue*(w: var TomlWriter, value: ThreadCount) {.raises: [IOError].} =
w.writeValue(int(value))

proc writeValue*(w: var TomlWriter, value: NBytes) {.raises: [IOError].} =
w.writeValue(int(value))

proc writeValue*(w: var TomlWriter, value: Duration) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: IpAddress) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: Port) {.raises: [IOError].} =
w.writeValue(int(value))

proc writeValue*(w: var TomlWriter, value: MultiAddress) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: EthAddress) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: SignedPeerRecord) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: NatConfig) {.raises: [IOError].} =
if value.hasExtIp:
w.writeValue("extip:" & $value.extIp)
else:
case value.nat
of NatStrategy.NatAny:
w.writeValue("any")
of NatStrategy.NatNone:
w.writeValue("none")
of NatStrategy.NatUpnp:
w.writeValue("upnp")
of NatStrategy.NatPmp:
w.writeValue("pmp")

proc writeValue*(w: var TomlWriter, value: LogKind) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: RepoKind) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: ProverBackendCmd) {.raises: [IOError].} =
w.writeValue($value)

proc writeValue*(w: var TomlWriter, value: Curves) {.raises: [IOError].} =
w.writeValue($value)

proc completeCmdArg*(T: type EthAddress, val: string): seq[string] =
discard

Expand All @@ -674,7 +739,6 @@ proc completeCmdArg*(T: type Duration, val: string): seq[string] =
proc completeCmdArg*(T: type ThreadCount, val: string): seq[string] =
discard

# silly chronicles, colors is a compile-time property
proc stripAnsi*(v: string): string =
var
res = newStringOfCap(v.len)
Expand Down
151 changes: 151 additions & 0 deletions archivist/conf_serialization.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
## Custom TOML serialization for NodeConf
##
## This module provides a workaround for the TOML serialization library's
## limitation where it treats object types as nested objects instead of
## using custom writeValue procedures.
##
## The TOML library's writeValue procedure for objects automatically creates
## table headers (like [metricsAddress]) for object types, which produces
## malformed TOML. This custom serializer manually constructs the TOML
## string, ensuring that custom writeValue procedures are used.

{.push raises: [].}

import std/strutils
import std/options
import pkg/chronos
import pkg/toml_serialization
import pkg/libp2p
import ./conf
import ./logutils
import ./nat
import ./utils/natutils

proc toToml*(config: NodeConf): string =
var toml = newStringOfCap(16384)

proc append(key: string, value: string): string =
result = key & " = " & value & "\n"

proc appendOpt(key: string, value: Option[string]): string =
if value.isSome:
result = key & " = \"" & value.get() & "\"\n"
else:
result = ""

proc appendOpt(key: string, value: Option[EthAddress]): string =
if value.isSome:
result = key & " = \"" & value.get().short0xHexLog & "\"\n"
else:
result = ""

proc appendOpt(key: string, value: Option[int]): string =
if value.isSome:
result = key & " = " & $value.get() & "\n"
else:
result = ""

proc multiAddrToString(ma: MultiAddress): string =
## Helper function to convert MultiAddress to string
$ma

# Simple string fields
toml.add append("logLevel", "\"" & config.logLevel & "\"")
toml.add append("logFormat", "\"" & $config.logFormat & "\"")
toml.add append("agentString", "\"" & config.agentString & "\"")
toml.add append("apiBindAddress", "\"" & config.apiBindAddress & "\"")
toml.add append("netPrivKeyFile", "\"" & config.netPrivKeyFile & "\"")
toml.add append("ethProvider", "\"" & config.ethProvider & "\"")

# Boolean fields
toml.add append("metricsEnabled", if config.metricsEnabled: "true" else: "false")
toml.add append("persistence", if config.persistence: "true" else: "false")
toml.add append("useSystemClock", if config.useSystemClock: "true" else: "false")
toml.add append("validator", if config.validator: "true" else: "false")
toml.add append("prover", if config.prover: "true" else: "false")
toml.add append("circomNoZkey", if config.circomNoZkey: "true" else: "false")

# Integer fields
toml.add append("metricsPort", $int(config.metricsPort))
toml.add append("discoveryPort", $int(config.discoveryPort))
toml.add append("apiPort", $int(config.apiPort))
toml.add append("maxPeers", $config.maxPeers)
toml.add append("numThreads", $int(config.numThreads))
toml.add append("blockMaintenanceNumberOfBlocks", $config.blockMaintenanceNumberOfBlocks)
toml.add append("cacheSize", $int(config.cacheSize))
toml.add append("validatorMaxSlots", $config.validatorMaxSlots)
toml.add append("validatorGroupIndex", $config.validatorGroupIndex)
toml.add append("marketplaceRequestCacheSize", $config.marketplaceRequestCacheSize)
toml.add append("maxPriorityFeePerGas", $config.maxPriorityFeePerGas)
toml.add append("numProofSamples", $config.numProofSamples)
toml.add append("maxSlotDepth", $config.maxSlotDepth)
toml.add append("maxDatasetDepth", $config.maxDatasetDepth)
toml.add append("maxBlockDepth", $config.maxBlockDepth)
toml.add append("maxCellElms", $config.maxCellElms)

# Complex type fields (using string representations)
toml.add append("metricsAddress", "\"" & $config.metricsAddress & "\"")
toml.add append("dataDir", "\"" & string(config.dataDir) & "\"")
toml.add append("circuitDir", "\"" & string(config.circuitDir) & "\"")

# NatConfig - use custom serialization logic
let natStr = if config.nat.hasExtIp:
"extip:" & $config.nat.extIp
else:
case config.nat.nat
of NatStrategy.NatAny: "any"
of NatStrategy.NatNone: "none"
of NatStrategy.NatUpnp: "upnp"
of NatStrategy.NatPmp: "pmp"
toml.add append("nat", "\"" & natStr & "\"")

# Enum fields - need to be quoted as strings
toml.add append("repoKind", "\"" & $config.repoKind & "\"")
toml.add append("proverBackend", "\"" & $config.proverBackend & "\"")
toml.add append("curve", "\"" & $config.curve & "\"")

# Duration fields - use proper duration string representation
proc formatDuration(d: Duration): string =
let s = $d
if s.len == 0: "0s" else: s

toml.add append("blockTtl", "\"" & formatDuration(config.blockTtl) & "\"")
toml.add append("blockMaintenanceInterval", "\"" & formatDuration(config.blockMaintenanceInterval) & "\"")

# NBytes fields
toml.add append("storageQuota", $int(config.storageQuota))

# File path fields
toml.add append("circomR1cs", "\"" & string(config.circomR1cs) & "\"")
toml.add append("circomGraph", "\"" & string(config.circomGraph) & "\"")
toml.add append("circomWasm", "\"" & string(config.circomWasm) & "\"")
toml.add append("circomZkey", "\"" & string(config.circomZkey) & "\"")

# Option fields
toml.add appendOpt("apiCorsAllowedOrigin", config.apiCorsAllowedOrigin)
toml.add appendOpt("logFile", config.logFile)
toml.add appendOpt("ethPrivateKey", config.ethPrivateKey)
toml.add appendOpt("marketplaceAddress", config.marketplaceAddress)
toml.add appendOpt("validatorGroups", config.validatorGroups)

# MultiAddress array
if config.listenAddrs.len > 0:
toml.add("listenAddrs = [\n")
for la in config.listenAddrs:
toml.add(" \"")
toml.add(multiAddrToString(la))
toml.add("\",\n")
toml.add("]\n")

# SignedPeerRecord array
if config.bootstrapNodes.len > 0:
toml.add("bootstrapNodes = [\n")
for node in config.bootstrapNodes:
toml.add(" \"")
toml.add($node) # SignedPeerRecord $ operator returns the string representation
toml.add("\",\n")
toml.add("]\n")

result = toml

{.pop.}
Loading