diff --git a/Cargo.lock b/Cargo.lock index 243a5c7c..cd37c5a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1178,6 +1178,15 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "cbor4ii" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.0.79" @@ -3295,6 +3304,7 @@ dependencies = [ "reqwest", "rocket", "serde", + "serde_ipld_dagcbor", "serde_json", "serde_with 1.14.0", "sled", @@ -3308,6 +3318,7 @@ dependencies = [ "tracing-log", "tracing-opentelemetry", "tracing-subscriber", + "unsigned-varint", "urlencoding", "uuid", "void", @@ -6105,6 +6116,18 @@ dependencies = [ "syn 2.0.13", ] +[[package]] +name = "serde_ipld_dagcbor" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2433e94ffb5977e67afbd75733abd6ada1c4f497125894a8c89b3fdc5fd6a058" +dependencies = [ + "cbor4ii", + "cid", + "scopeguard", + "serde", +] + [[package]] name = "serde_jcs" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0e095968..1b5c47e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ rocket = { version = "0.5.0-rc.2", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_json = "1" serde_with = { version = "1", features = ["hex"] } +serde_ipld_dagcbor = "0.3" sled = "0.34" thiserror = "1" tempfile = "3" @@ -50,6 +51,7 @@ tracing = "0.1" tracing-log = "0.1" tracing-opentelemetry = "0.17.2" tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] } +unsigned-varint = { version = "0.7", features = ["futures"] } urlencoding = "2.1" void = "1" uuid = "1" diff --git a/src/p2p/exchange/carv1.rs b/src/p2p/exchange/carv1.rs new file mode 100644 index 00000000..30575954 --- /dev/null +++ b/src/p2p/exchange/carv1.rs @@ -0,0 +1,410 @@ +use super::utils::{read_cid, read_dag_cbor_cid, read_leb128, write_leb128, Error as CidReadError}; +use async_stream::try_stream; +use futures::{ + channel::oneshot, + io::{copy, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Take}, + stream::{Stream, TryStream, TryStreamExt}, +}; +use libipld::Cid; +use pin_project::pin_project; +use serde::{Deserialize, Serialize}; +use serde_ipld_dagcbor::{to_writer, DecodeError, EncodeError}; +use std::{ + future::Future, + io::{Error as IoError, ErrorKind}, + pin::Pin, + task::Poll, +}; + +use crate::storage::Content; + +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] +pub struct Header { + version: u8, + roots: Vec, +} + +impl Header { + const INITIAL_BYTES: [u8; 9] = [0xa2, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e]; + const ROOTS_BYTES: [u8; 6] = [0x65, 0x72, 0x6f, 0x6f, 0x74, 0x73]; + pub async fn write_to(&self, mut writer: W) -> Result<(), EncodeError> + where + W: AsyncWrite + Unpin, + { + let mut buf = Vec::new(); + to_writer(&mut buf, self)?; + writer.write_all(&mut buf).await?; + Ok(()) + } + + // no async serde decoding :(, so we have to do it manually + pub async fn read_from(mut reader: R) -> Result> + where + R: AsyncRead + Unpin, + { + let mut buf = [0u8; 9]; + reader.read_exact(&mut buf).await?; + // expect opening of cbor map with 2 elements, then "version" key + if buf != Header::INITIAL_BYTES { + return Err(DecodeError::Msg("invalid header".to_string())); + }; + + // read version first, version < roots in dag-cbor + reader.read_exact(&mut buf[0..1]).await?; + let version = buf[0]; + + // read roots + reader.read_exact(&mut buf[0..6]).await?; + if buf[0..6] != Header::ROOTS_BYTES { + return Err(DecodeError::Msg("invalid header".to_string())); + }; + + // array tag + array len + reader.read_exact(&mut buf[0..1]).await?; + let array_tag = buf[0] >> 5; + if array_tag != 4 { + return Err(DecodeError::Mismatch { + expect_major: 4, + byte: array_tag, + }); + }; + + // assuming for now we'll never have more than 7 roots :/ + let array_len = buf[0] & 0b00011111; + + let mut roots = Vec::with_capacity(array_len.into()); + // for array len 'n' read n cids + for _ in 0..array_len { + roots.push(read_dag_cbor_cid(&mut reader).await.map_err(|e| match e { + CidReadError::Io(e) => DecodeError::Read(e), + _ => DecodeError::Msg(e.to_string()), + })?); + } + + Ok(Self { version, roots }) + } +} + +pub struct DataSection { + cid: Cid, + block: Content, +} + +impl DataSection { + pub fn new(cid: Cid, block: Content) -> Self { + Self { cid, block } + } + + pub fn into_inner(self) -> (Cid, Content) { + (self.cid, self.block) + } +} + +impl DataSection +where + R: AsyncRead, +{ + pub async fn write_to(self, mut writer: W) -> Result<(), IoError> + where + W: AsyncWrite + Unpin, + { + let cid_bytes = self.cid.to_bytes(); + let total_len = cid_bytes.len() as u64 + self.block.len(); + write_leb128(total_len, &mut writer).await?; + writer.write_all(&cid_bytes).await?; + copy(self.block, &mut writer).await?; + Ok(()) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum WriteError { + #[error(transparent)] + Header(#[from] EncodeError), + #[error(transparent)] + Data(E), +} + +impl From for WriteError { + fn from(e: IoError) -> Self { + Self::Header(EncodeError::Write(e)) + } +} + +pub async fn write_carv1( + header: &Header, + data: &mut S, + mut writer: W, +) -> Result<(), WriteError> +where + W: AsyncWrite + Unpin, + S: TryStream, Error = E> + Unpin, + R: AsyncRead, +{ + header.write_to(&mut writer).await?; + + while let Some(data) = data.try_next().await.map_err(WriteError::Data)? { + data.write_to(&mut writer).await? + } + + Ok(()) +} + +#[derive(thiserror::Error, Debug)] +pub enum ReadError { + #[error(transparent)] + Io(#[from] IoError), + #[error(transparent)] + Cid(#[from] libipld::cid::Error), + #[error(transparent)] + Canceled(#[from] oneshot::Canceled), + #[error(transparent)] + Header(#[from] DecodeError), +} + +impl From for ReadError { + fn from(e: CidReadError) -> Self { + match e { + CidReadError::Io(e) => Self::Io(e), + CidReadError::Cid(e) => Self::Cid(e), + } + } +} + +#[derive(Default)] +pub enum TakenReader { + #[default] + Finished, + Unfinished(Take, Option>), +} + +impl TakenReader +where + R: AsyncRead, +{ + pub fn new(reader: R, limit: u64) -> (Self, oneshot::Receiver) { + let (tx, rx) = oneshot::channel(); + let reader = reader.take(limit); + (Self::Unfinished(reader, Some(tx)), rx) + } + + fn finish(&mut self) -> Result<(), oneshot::Canceled> { + match std::mem::take(self) { + Self::Unfinished(r, Some(tx)) => { + tx.send(r.into_inner()).map_err(|_| oneshot::Canceled)?; + } + _ => (), + }; + Ok(()) + } +} + +impl AsyncRead for TakenReader +where + R: AsyncRead + Unpin, +{ + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + // boy I hope this doesnt need to be pinned in a better way + match *self { + TakenReader::Finished => Poll::Ready(Ok(0)), + TakenReader::Unfinished(ref mut reader, _) => { + let poll = Pin::new(reader).poll_read(cx, buf); + if let Poll::Ready(Ok(0)) = poll { + let _ = self.finish(); + } + poll + } + } + } + + fn poll_read_vectored( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &mut [std::io::IoSliceMut<'_>], + ) -> std::task::Poll> { + match *self { + TakenReader::Finished => Poll::Ready(Ok(0)), + TakenReader::Unfinished(ref mut reader, _) => { + let poll = Pin::new(reader).poll_read_vectored(cx, bufs); + if let Poll::Ready(Ok(0)) = poll { + let _ = self.finish(); + } + poll + } + } + } +} + +pub(crate) enum ReaderState { + Empty(R), + Element(DataSection>, F), +} + +pub(crate) async fn read_section( + mut reader: R, +) -> Result>, ReadError> +where + R: AsyncRead + Unpin, +{ + // check if reader is already empty (if it can't read 1 byte, it's empty) + let mut buf = [0u8; 1]; + match reader.read_exact(&mut buf).await { + Ok(_) => (), + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + return Ok(ReaderState::Empty(reader)) + } + Err(e) => return Err(e.into()), + }; + + let len = read_leb128(buf.chain(&mut reader)).await?; + let cid = read_cid(&mut reader).await?; + let cid_len = cid.to_bytes().len() as u64; + let block_len = len - cid_len; + let (reader, rx) = TakenReader::new(reader, block_len); + Ok(ReaderState::Element( + DataSection::new(cid, Content::new(block_len, reader)), + rx, + )) +} + +// this function should take a reader and stream out length-delimited cid-block pairs +// until the reader is empty. once the reader is empty, the returned future should +// resolve with the value of the reader +pub async fn stream_carv1( + mut reader: R, +) -> Result< + ( + Header, + impl Stream>, ReadError>>, + impl Future>, + ), + ReadError, +> +where + R: AsyncRead + Unpin, +{ + // read the header + let header = Header::read_from(&mut reader).await?; + // setup the channel for completion state + let (final_tx, final_rx) = oneshot::channel(); + let stream = try_stream! { + loop { + // try read a section + match read_section(reader).await? { + // section is empty, send reader to completion channel + ReaderState::Empty(r) => {final_tx.send(r).map_err(|_| oneshot::Canceled)?; break}, + // reader is not empty, send section to stream + ReaderState::Element(ds, rx) => { + yield ds; + reader = rx.await?; + } + } + } + }; + Ok((header, stream, final_rx)) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use libipld::{multihash::Code, raw::RawCodec, Block, DefaultParams}; + + #[test] + async fn header() { + let header = Header { + version: 1, + roots: vec![ + "bagaaierasords4njcts6vs7qvdjfcvgnume4hqohf65zsfguprqphs3icwea" + .parse() + .unwrap(), + ], + }; + let mut buf = Vec::new(); + header.write_to(&mut buf).await.unwrap(); + println!("{:x?}", buf); + let deser = Header::read_from(&mut buf.as_slice()).await.unwrap(); + assert_eq!(header, deser); + } + + #[test] + async fn reader() { + let header = Header { + version: 1, + roots: Vec::new(), + }; + + let (cid1, block1) = + Block::::encode(RawCodec, Code::Sha3_256, &vec![0u8; 10]) + .expect("block encoding to work") + .into_inner(); + + let (cid2, block2) = + Block::::encode(RawCodec, Code::Sha3_256, &vec![1u8; 24]) + .expect("block encoding to work") + .into_inner(); + + println!("{:?}", cid1); + println!("{:?}", cid2); + let cid_len = cid1.to_bytes().len() as u64; + + let mut buf = Vec::with_capacity(34); + header + .write_to(&mut buf) + .await + .expect("header write to work"); + + write_leb128(block1.len() as u64 + cid_len, &mut buf) + .await + .expect("leb128 write to work"); + buf.extend_from_slice(cid1.to_bytes().as_slice()); + buf.extend_from_slice(&block1); + + write_leb128(block2.len() as u64 + cid_len, &mut buf) + .await + .expect("leb128 write to work"); + buf.extend_from_slice(cid2.to_bytes().as_slice()); + buf.extend_from_slice(&block2); + + let (read_header, stream, final_rx) = stream_carv1(buf.as_slice()) + .await + .expect("stream from buffer to work"); + assert_eq!(read_header, header); + + let mut s = Box::pin(stream); + + let (rcid1, mut rblock1) = s + .next() + .await + .expect("no read error") + .expect("there should be a section") + .into_inner(); + assert_eq!(rcid1, cid1); + let mut buf1 = Vec::new(); + rblock1 + .read_to_end(&mut buf1) + .await + .expect("there should be a block"); + assert_eq!(buf1, block1); + + let (rcid2, mut rblock2) = s + .next() + .await + .expect("no read error") + .expect("there should be a section") + .into_inner(); + assert_eq!(rcid2, cid2); + let mut buf2 = Vec::new(); + rblock2 + .read_to_end(&mut buf2) + .await + .expect("there should be a block"); + assert_eq!(buf2, block2); + + assert_eq!(final_rx.await.unwrap(), buf); + } +} diff --git a/src/p2p/exchange/carv2.rs b/src/p2p/exchange/carv2.rs new file mode 100644 index 00000000..52e20e56 --- /dev/null +++ b/src/p2p/exchange/carv2.rs @@ -0,0 +1,90 @@ +use futures::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, + TryStream, +}; +use libipld::Cid; +use std::io::Error as IoError; + +use super::carv1::{write_carv1, DataSection, Header as V1Header, WriteError}; +use crate::storage::ImmutableStore; + +const CAR_V2_PRAGMA: [u8; 11] = [ + 0x0a, 0xa1, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02, +]; + +const HEADER_LEN: u64 = 40; + +pub struct Characteristics([u8; 16]); + +impl Characteristics { + pub fn new() -> Self { + Characteristics([0; 16]) + } + + pub fn fully_indexed(&mut self, indexed: bool) -> &mut Self { + let byte_0 = self.0[0]; + self.0[0] = if indexed { + byte_0 | 0b1000_0000 + } else { + byte_0 & 0b0111_1111 + }; + self + } +} + +impl AsRef<[u8]> for Characteristics { + fn as_ref(&self) -> &[u8] { + &self.0.as_ref() + } +} + +pub struct Header { + characteristics: Characteristics, + data_offset: u64, + data_size: u64, + index_offset: u64, +} + +impl Header { + pub fn new() -> Self { + Header { + characteristics: Characteristics::new(), + data_offset: 0, + data_size: 0, + index_offset: 0, + } + } + + pub async fn write_to(&self, writer: &mut W) -> Result<(), IoError> + where + W: AsyncWrite + Unpin, + { + writer.write_all(self.characteristics.as_ref()).await?; + writer.write_all(&self.data_offset.to_be_bytes()).await?; + writer.write_all(&self.data_size.to_be_bytes()).await?; + writer.write_all(&self.index_offset.to_be_bytes()).await?; + Ok(()) + } +} + +pub async fn write_carv2( + header: &Header, + v1_header: &V1Header, + data: &mut S, + index: Option<()>, + writer: &mut W, +) -> Result<(), WriteError> +where + W: AsyncWrite + Unpin, + S: TryStream, Error = E> + Unpin, + R: AsyncRead, +{ + writer.write_all(&CAR_V2_PRAGMA).await?; + header.write_to(writer).await?; + + write_carv1(v1_header, data, writer).await?; + + // TODO write index if present + + Ok(()) +} diff --git a/src/p2p/exchange/mod.rs b/src/p2p/exchange/mod.rs new file mode 100644 index 00000000..791ca4a0 --- /dev/null +++ b/src/p2p/exchange/mod.rs @@ -0,0 +1,3 @@ +pub mod carv1; +pub mod carv2; +pub mod utils; diff --git a/src/p2p/exchange/utils.rs b/src/p2p/exchange/utils.rs new file mode 100644 index 00000000..6f48c87a --- /dev/null +++ b/src/p2p/exchange/utils.rs @@ -0,0 +1,150 @@ +use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use libipld::cid::{ + multihash::{Code, MultihashDigest}, + Cid, Error as CidError, +}; +use std::io::Error as IoError; +use unsigned_varint::aio; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error(transparent)] + Io(#[from] IoError), + #[error(transparent)] + Cid(#[from] CidError), +} + +impl From for Error { + fn from(e: unsigned_varint::io::ReadError) -> Self { + Self::Cid(e.into()) + } +} + +/// incremental parsing of a CID from an AsyncRead +pub async fn read_cid(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + use Code::*; + let mut buf = [0u8; 1]; + reader.read_exact(&mut buf).await?; + match buf[0] { + // CID v0, should never really happen + 0x12 => { + let mut buf = [0u8; 32]; + reader.read_exact(&mut buf).await?; + Ok(Cid::new_v0(Code::Sha2_256.digest(&buf))?) + } + // CID v1 + 0x1 => { + let codec = aio::read_u64(&mut reader).await?; + let mh_code = + Code::try_from(aio::read_u64(&mut reader).await?).map_err(CidError::from)?; + let mh = match mh_code { + Sha2_256 | Sha3_256 | Keccak256 | Blake2b256 | Blake2s256 | Blake3_256 => { + let mut buf = [0u8; 32]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Sha2_512 | Sha3_512 | Keccak512 | Blake2b512 => { + let mut buf = [0u8; 64]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Sha3_224 | Keccak224 => { + let mut buf = [0u8; 28]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Sha3_384 | Keccak384 => { + let mut buf = [0u8; 48]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Blake2s128 => { + let mut buf = [0u8; 16]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + } + .map_err(CidError::from)?; + Ok(Cid::new_v1(codec, mh)) + } + v => { + println!("{:x?}", v); + Err(Error::Cid(CidError::InvalidCidVersion)) + } + } +} + +pub async fn read_dag_cbor_cid(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let mut buf = [0u8; 2]; + reader.read_exact(&mut buf).await?; + // check cid tag (0xd8) is 42 (0x2a) + if buf != [0xd8, 0x2a] { + return Err(Error::Cid(CidError::InvalidCidVersion)); + }; + + reader.read_exact(&mut buf).await?; + // check byte string tag + // tbh not sure what the extra bits in this byte are for + // so I'm ignoring them for now + if buf[0] >> 5 != 2 { + return Err(Error::Cid(CidError::InvalidCidVersion)); + }; + + let cid_len = buf[1]; + + println!("buf: {:x?}", buf); + let mut vec = Vec::with_capacity(cid_len as usize); + // TODO wtf why doesnt this read anything + reader.take(cid_len.into()).read_exact(&mut vec).await?; + println!("cid bytes: {:x?}", vec); + if vec.get(0) != Some(&0) { + return Err(Error::Cid(CidError::InvalidCidVersion)); + } + Ok(Cid::read_bytes(&vec[0..])?) +} + +pub async fn read_leb128(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let mut buf = [0u8; 1]; + let mut result = 0u64; + let mut shift = 0u8; + loop { + reader.read_exact(&mut buf).await?; + let byte = buf[0]; + result |= ((byte & 0x7f) as u64) << shift; + if byte & 0x80 == 0 { + return Ok(result); + } + shift += 7; + } +} + +pub async fn write_leb128(value: u64, mut writer: W) -> Result +where + W: AsyncWrite + Unpin, +{ + let mut buf = [0u8; 1]; + let mut written = 0; + let mut value = value; + loop { + let mut byte = (value & 0x7f) as u8; + value >>= 7; + if value != 0 { + byte |= 0x80; + } + buf[0] = byte; + writer.write_all(&buf).await?; + written += 1; + if value == 0 { + return Ok(written); + } + } +} diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 5149e47c..3a68e5d1 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -3,6 +3,7 @@ use core::time::Duration; use libp2p::{identify::Config as OIdentifyConfig, identity::PublicKey}; pub mod behaviour; +pub mod exchange; pub mod relay; pub mod transport;