From d3dadf9a69b0ac756a9372d7edce36e5d70f310d Mon Sep 17 00:00:00 2001 From: chunningham Date: Fri, 21 Apr 2023 17:00:56 +0200 Subject: [PATCH 1/7] carv1 and v2 impls --- src/p2p/exchange/carv1.rs | 106 ++++++++++++++++++++++++++++++++++++++ src/p2p/exchange/carv2.rs | 89 ++++++++++++++++++++++++++++++++ src/p2p/exchange/mod.rs | 2 + src/p2p/mod.rs | 1 + 4 files changed, 198 insertions(+) create mode 100644 src/p2p/exchange/carv1.rs create mode 100644 src/p2p/exchange/carv2.rs create mode 100644 src/p2p/exchange/mod.rs diff --git a/src/p2p/exchange/carv1.rs b/src/p2p/exchange/carv1.rs new file mode 100644 index 00000000..099b7d96 --- /dev/null +++ b/src/p2p/exchange/carv1.rs @@ -0,0 +1,106 @@ +use futures::{ + io::{copy, AsyncRead, AsyncWrite, AsyncWriteExt}, + {TryStream, TryStreamExt}, +}; +use async_stream::try_stream; +use libipld::Cid; +use serde::{Deserialize, Serialize}; +use std::io::Error as IoError; + +use crate::storage::{Content, ImmutableStore}; + +#[derive(Serialize, Deserialize)] +pub struct Header { + version: u8, + roots: Vec, +} + +impl Header { + pub async fn write_to(&self, writer: &mut W) -> Result<(), IoError> + where + W: AsyncWrite + Unpin, + { + // TODO write serde_ipld_cbor + Ok(()) + } +} + +pub struct DataSection { + cid: Cid, + block: Content, +} + +impl DataSection { + pub fn new(cid: Cid, block: Content) -> Self { + Self { cid, block } + } +} + +impl DataSection +where + R: AsyncRead, +{ + pub async fn write_to(self, writer: &mut W) -> Result<(), IoError> + where + W: AsyncWrite + Unpin, + { + let cid_bytes = self.cid.to_bytes(); + let total_len = cid_bytes.len() as u64 + self.block.len(); + // TODO encode as varint + writer.write_all(&total_len.to_be_bytes()).await?; + writer.write_all(&cid_bytes).await?; + copy(self.block, writer).await?; + Ok(()) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum WriteError { + #[error(transparent)] + Io(#[from] IoError), + #[error(transparent)] + Data(E), +} + +pub async fn write( + header: &Header, + data: &mut S, + writer: &mut W, +) -> Result<(), WriteError> +where + W: AsyncWrite + Unpin, + S: TryStream, Error = E> + Unpin, + R: AsyncRead, +{ + header.write_to(writer).await?; + + while let Some(data) = data.try_next().await.map_err(WriteError::Data)? { + data.write_to(writer).await?; + } + + Ok(()) +} + +pub async fn read( + reader: &mut R, +) -> Result<(Header, impl TryStream>, Error = E>), IoError> +where + R: AsyncRead + Unpin, +{ + // TODO read header + let header = Header { + version: 0, + roots: vec![], + }; + if len == 0 { + break; + } + + let mut cid_bytes = vec![0u8; len as usize]; + reader.read_exact(&mut cid_bytes).await?; + let cid = Cid::try_from(cid_bytes).map_err(|_| IoError::from(IoErrorKind::InvalidData))?; + cids.push(cid); + } + + Ok((header, cids)) +} diff --git a/src/p2p/exchange/carv2.rs b/src/p2p/exchange/carv2.rs new file mode 100644 index 00000000..36824e7f --- /dev/null +++ b/src/p2p/exchange/carv2.rs @@ -0,0 +1,89 @@ +use futures::{ + io::{AsyncWrite, AsyncWriteExt}, + TryStream, +}; +use libipld::Cid; +use std::io::Error as IoError; + +use super::carv1::{write as v1_write, DataSection, Header as V1Header, WriteError}; +use crate::storage::ImmutableStore; + +const CAR_V2_PRAGMA: [u8; 11] = [ + 0x0a, 0xa1, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02, +]; + +const HEADER_LEN: u64 = 40; + +pub struct Characteristics([u8; 16]); + +impl Characteristics { + pub fn new() -> Self { + Characteristics([0; 16]) + } + + pub fn fully_indexed(&mut self, indexed: bool) -> &mut Self { + let byte_0 = self.0[0]; + self.0[0] = if indexed { + byte_0 | 0b1000_0000 + } else { + byte_0 & 0b0111_1111 + }; + self + } +} + +impl AsRef<[u8]> for Characteristics { + fn as_ref(&self) -> &[u8] { + &self.0.as_ref() + } +} + +pub struct Header { + characteristics: Characteristics, + data_offset: u64, + data_size: u64, + index_offset: u64, +} + +impl Header { + pub fn new() -> Self { + Header { + characteristics: Characteristics::new(), + data_offset: 0, + data_size: 0, + index_offset: 0, + } + } + + pub async fn write_to(&self, writer: &mut W) -> Result<(), IoError> + where + W: AsyncWrite + Unpin, + { + writer.write_all(self.characteristics.as_ref()).await?; + writer.write_all(&self.data_offset.to_be_bytes()).await?; + writer.write_all(&self.data_size.to_be_bytes()).await?; + writer.write_all(&self.index_offset.to_be_bytes()).await?; + Ok(()) + } +} + +pub async fn write( + header: &Header, + v1_header: &V1Header, + data: impl TryStream, Error = E>, + index: Option<()>, + writer: &mut W, +) -> Result<(), WriteError> +where + W: AsyncWrite + Unpin, + S: ImmutableStore, +{ + writer.write_all(&CAR_V2_PRAGMA).await?; + header.write_to(writer).await?; + + v1_write(v1_header, store, writer).await?; + + // TODO write index if present + + Ok(()) +} diff --git a/src/p2p/exchange/mod.rs b/src/p2p/exchange/mod.rs new file mode 100644 index 00000000..801bce88 --- /dev/null +++ b/src/p2p/exchange/mod.rs @@ -0,0 +1,2 @@ +pub mod carv1; +pub mod carv2; diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 5149e47c..3a68e5d1 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -3,6 +3,7 @@ use core::time::Duration; use libp2p::{identify::Config as OIdentifyConfig, identity::PublicKey}; pub mod behaviour; +pub mod exchange; pub mod relay; pub mod transport; From b39e84cbd829b4215425125425d86e56ef638825 Mon Sep 17 00:00:00 2001 From: chunningham Date: Wed, 26 Apr 2023 14:32:46 +0200 Subject: [PATCH 2/7] WIP car streaming tools --- Cargo.lock | 1 + Cargo.toml | 1 + src/p2p/exchange/carv1.rs | 253 ++++++++++++++++++++++++++++++++++---- src/p2p/exchange/carv2.rs | 9 +- src/p2p/exchange/mod.rs | 1 + 5 files changed, 239 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 243a5c7c..a2db27cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3308,6 +3308,7 @@ dependencies = [ "tracing-log", "tracing-opentelemetry", "tracing-subscriber", + "unsigned-varint", "urlencoding", "uuid", "void", diff --git a/Cargo.toml b/Cargo.toml index 0e095968..3d4bd253 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ tracing = "0.1" tracing-log = "0.1" tracing-opentelemetry = "0.17.2" tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] } +unsigned-varint = { version = "0.7", features = ["futures"] } urlencoding = "2.1" void = "1" uuid = "1" diff --git a/src/p2p/exchange/carv1.rs b/src/p2p/exchange/carv1.rs index 099b7d96..6f23a289 100644 --- a/src/p2p/exchange/carv1.rs +++ b/src/p2p/exchange/carv1.rs @@ -1,13 +1,21 @@ +use super::utils::{read_cid, read_leb128, write_leb128, Error as CidReadError, Leb128Reader}; +use async_stream::try_stream; use futures::{ - io::{copy, AsyncRead, AsyncWrite, AsyncWriteExt}, - {TryStream, TryStreamExt}, + channel::oneshot, + io::{copy, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Take}, + stream::{Stream, TryStream, TryStreamExt}, }; -use async_stream::try_stream; use libipld::Cid; +use pin_project::pin_project; use serde::{Deserialize, Serialize}; -use std::io::Error as IoError; +use std::{ + future::Future, + io::{Error as IoError, ErrorKind}, + pin::Pin, + task::Poll, +}; -use crate::storage::{Content, ImmutableStore}; +use crate::storage::Content; #[derive(Serialize, Deserialize)] pub struct Header { @@ -23,6 +31,17 @@ impl Header { // TODO write serde_ipld_cbor Ok(()) } + + pub async fn read_from(reader: &mut R) -> Result + where + R: AsyncRead + Unpin, + { + // TODO write serde_ipld_cbor + Ok(Self { + version: 0, + roots: Vec::new(), + }) + } } pub struct DataSection { @@ -46,8 +65,7 @@ where { let cid_bytes = self.cid.to_bytes(); let total_len = cid_bytes.len() as u64 + self.block.len(); - // TODO encode as varint - writer.write_all(&total_len.to_be_bytes()).await?; + write_leb128(total_len, writer).await?; writer.write_all(&cid_bytes).await?; copy(self.block, writer).await?; Ok(()) @@ -81,26 +99,217 @@ where Ok(()) } -pub async fn read( - reader: &mut R, -) -> Result<(Header, impl TryStream>, Error = E>), IoError> +#[derive(thiserror::Error, Debug)] +pub enum ReadError { + #[error(transparent)] + Io(#[from] IoError), + #[error(transparent)] + Cid(#[from] libipld::cid::Error), + #[error(transparent)] + Canceled(#[from] oneshot::Canceled), +} + +impl From for ReadError { + fn from(e: CidReadError) -> Self { + match e { + CidReadError::Io(e) => Self::Io(e), + CidReadError::Cid(e) => Self::Cid(e), + } + } +} + +pub struct CarBlockReader { + reader: DelimitedReader, +} + +enum DelimitedReaderState { + Waiting(Waiting), + Available(Available), +} + +#[pin_project] +struct Available { + reader: Option, + len: Leb128Reader, +} + +struct Waiting { + receiver: oneshot::Receiver, +} + +impl Available { + fn new(reader: R) -> Self { + Self { + reader: Some(reader), + len: Leb128Reader::new(), + } + } +} + +impl Future for Available where R: AsyncRead + Unpin, { - // TODO read header - let header = Header { - version: 0, - roots: vec![], - }; - if len == 0 { - break; + type Output = Result<(Waiting, TakenReader), IoError>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + let len = match this.reader { + Some(r) => { + let mut buf = [0u8; 1]; + loop { + match Pin::new(r).poll_read(cx, &mut buf)? { + Poll::Ready(0) => { + return Poll::Ready(Err(IoError::new(ErrorKind::Other, "reader empty"))) + } + Poll::Ready(1) => {} + Poll::Pending => return Poll::Pending, + }; + if let Some(len) = this.len.read(buf[0]) { + break len; + } + } + } + None => { + return Poll::Ready(Err(IoError::new(ErrorKind::Other, "reader already taken"))) + } + }; + // create oneshot + let (tx, rx) = oneshot::channel(); + // create a new reader + let reader = TakenReader { + reader: this + .reader + .take() + .ok_or_else(|| IoError::new(ErrorKind::Other, "reader already taken"))? + .take(len), + origin: tx, + }; + Poll::Ready(Ok((Waiting { receiver: rx }, reader))) + } +} + +pub struct TakenReader { + reader: Take, + origin: oneshot::Sender, +} + +impl DelimitedReaderState { + fn new(reader: R) -> Self { + Self::Available(Available::new(reader)) + } +} + +#[pin_project] +struct DelimitedReader { + reader: DelimitedReaderState, +} + +impl DelimitedReader { + pub fn new(reader: R) -> Self { + Self { + reader: DelimitedReaderState::new(reader), + } + } +} + +impl Stream for DelimitedReader +where + R: AsyncRead + Unpin, +{ + type Item = Result, ReadError>; + fn poll_next( + self: Pin<&mut Self>, + context: &mut std::task::Context, + ) -> Poll> { + let p = self.project(); + match p.reader { + DelimitedReaderState::Waiting(r) => { + // wait for receiver to be ready + let ar = match r.receiver.try_recv() { + Ok(Some(ar)) => ar, + Ok(None) => return Poll::Pending, + // if reciever is dropped, should this return None? + Err(e) => return Poll::Ready(Some(Err(e.into()))), + }; + *p.reader = DelimitedReaderState::new(ar); + self.poll_next(context) + } + DelimitedReaderState::Available(a) => { + // read len + let (w, t) = futures::ready!(Pin::new(a).poll(context))?; + *p.reader = DelimitedReaderState::Waiting(w); + Poll::Ready(Some(Ok(t))) + } } + } +} - let mut cid_bytes = vec![0u8; len as usize]; - reader.read_exact(&mut cid_bytes).await?; - let cid = Cid::try_from(cid_bytes).map_err(|_| IoError::from(IoErrorKind::InvalidData))?; - cids.push(cid); +impl CarBlockReader { + pub fn new(reader: R) -> Self { + Self { + reader: DelimitedReader::new(reader), + } } +} + +pub async fn read_delimited( + mut reader: R, +) -> Result< + ( + TakenReader, + impl Future>, + ), + ReadError, +> +where + R: AsyncRead + Unpin, +{ + let len = read_leb128(&mut reader).await?; + let (tx, rx) = oneshot::channel(); + let reader = TakenReader { + reader: reader.take(len), + origin: tx, + }; + Ok((reader, rx)) +} + +pub async fn read( + mut reader: R, +) -> Result< + ( + Header, + impl TryStream>, Error = ReadError>, + impl Future, + ), + ReadError, +> +where + R: AsyncRead + Unpin, +{ + let header = Header::read_from(&mut reader).await?; + + let (sender, reciever) = oneshot::channel(); + + let data = try_stream! { + loop { + let len = match read_leb128(&mut reader).await { + Ok(l) => l, + Err(e) if e.kind() == ErrorKind::Eof => { + sender.send(reader).await?; + break; + }, + Err(e) => Err(e)? + }; + let cid = read_cid(&mut reader).await?; + + // TODO use libipld 0.16 to get cid len easily + let cid_len = cid.to_bytes().len() as u64; + + let block = reader.take(len - cid_len); + yield Ok((cid, block)) + } + }; - Ok((header, cids)) + Ok((header, data, reciever)) } diff --git a/src/p2p/exchange/carv2.rs b/src/p2p/exchange/carv2.rs index 36824e7f..dde84ada 100644 --- a/src/p2p/exchange/carv2.rs +++ b/src/p2p/exchange/carv2.rs @@ -1,5 +1,5 @@ use futures::{ - io::{AsyncWrite, AsyncWriteExt}, + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, TryStream, }; use libipld::Cid; @@ -70,18 +70,19 @@ impl Header { pub async fn write( header: &Header, v1_header: &V1Header, - data: impl TryStream, Error = E>, + data: &mut S, index: Option<()>, writer: &mut W, ) -> Result<(), WriteError> where W: AsyncWrite + Unpin, - S: ImmutableStore, + S: TryStream, Error = E> + Unpin, + R: AsyncRead, { writer.write_all(&CAR_V2_PRAGMA).await?; header.write_to(writer).await?; - v1_write(v1_header, store, writer).await?; + v1_write(v1_header, data, writer).await?; // TODO write index if present diff --git a/src/p2p/exchange/mod.rs b/src/p2p/exchange/mod.rs index 801bce88..791ca4a0 100644 --- a/src/p2p/exchange/mod.rs +++ b/src/p2p/exchange/mod.rs @@ -1,2 +1,3 @@ pub mod carv1; pub mod carv2; +pub mod utils; From a7746dcb43249c9662592af1c328c7772b431438 Mon Sep 17 00:00:00 2001 From: chunningham Date: Wed, 26 Apr 2023 17:31:30 +0200 Subject: [PATCH 3/7] commit utils --- src/p2p/exchange/utils.rs | 137 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 src/p2p/exchange/utils.rs diff --git a/src/p2p/exchange/utils.rs b/src/p2p/exchange/utils.rs new file mode 100644 index 00000000..ce1a29e4 --- /dev/null +++ b/src/p2p/exchange/utils.rs @@ -0,0 +1,137 @@ +use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use libipld::cid::{ + multihash::{Code, MultihashDigest}, + Cid, Error as CidError, +}; +use std::io::Error as IoError; +use unsigned_varint::aio; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error(transparent)] + Io(#[from] IoError), + #[error(transparent)] + Cid(#[from] CidError), + #[error(transparent)] + VarInt(#[from] unsigned_varint::decode::Error), +} + +impl From for Error { + fn from(e: unsigned_varint::io::ReadError) -> Self { + match e { + unsigned_varint::io::ReadError::Io(e) => e.into(), + unsigned_varint::io::ReadError::Decode(e) => e.into(), + } + } +} + +/// incremental parsing of a CID from an AsyncRead +pub async fn read_cid(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + use Code::*; + let mut buf = [0u8; 1]; + reader.read_exact(&mut buf).await?; + match buf[0] { + // CID v0, should never really happen + 0x12 => { + let mut buf = [0u8; 32]; + reader.read_exact(&mut buf).await?; + Ok(Cid::new_v0(Code::Sha2_256.digest(&buf))?) + } + // CID v1 + 0x20 => { + let codec = aio::read_u64(&mut reader).await?; + let mh_code = + Code::try_from(aio::read_u64(&mut reader).await?).map_err(CidError::from)?; + let mh = match mh_code { + Sha2_256 | Sha3_256 | Keccak256 | Blake2b256 | Blake2s256 | Blake3_256 => { + let mut buf = [0u8; 32]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Sha2_512 | Sha3_512 | Keccak512 | Blake2b512 => { + let mut buf = [0u8; 64]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Sha3_224 | Keccak224 => { + let mut buf = [0u8; 28]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Sha3_384 | Keccak384 => { + let mut buf = [0u8; 48]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + Blake2s128 => { + let mut buf = [0u8; 16]; + reader.read_exact(&mut buf).await?; + mh_code.wrap(&buf) + } + } + .map_err(CidError::from)?; + Ok(Cid::new_v1(codec, mh)) + } + _ => Err(Error::Cid(CidError::InvalidCidVersion)), + } +} + +pub struct Leb128Reader(u64, u8); + +impl Leb128Reader { + pub fn new() -> Self { + Self(0, 0) + } + + pub fn read(&mut self, byte: u8) -> Option { + self.0 |= ((byte & 0x7f) as u64) << self.1; + self.1 += 7; + if byte & 0x80 == 0 { + return Some(self.0); + } + None + } +} + +pub async fn read_leb128(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let mut buf = [0u8; 1]; + let mut result = 0u64; + let mut shift = 0u8; + loop { + reader.read_exact(&mut buf).await?; + let byte = buf[0]; + result |= ((byte & 0x7f) as u64) << shift; + if byte & 0x80 == 0 { + return Ok(result); + } + shift += 7; + } +} + +pub async fn write_leb128(value: u64, mut writer: W) -> Result +where + W: AsyncWrite + Unpin, +{ + let mut buf = [0u8; 1]; + let mut written = 0; + let mut value = value; + loop { + let mut byte = (value & 0x7f) as u8; + value >>= 7; + if value != 0 { + byte |= 0x80; + } + buf[0] = byte; + writer.write_all(&buf).await?; + written += 1; + if value == 0 { + return Ok(written); + } + } +} From 8c71329742c1457edba2f68b7fbb9733a78f406a Mon Sep 17 00:00:00 2001 From: chunningham Date: Fri, 28 Apr 2023 15:53:11 +0200 Subject: [PATCH 4/7] wip --- Cargo.lock | 22 ++ Cargo.toml | 1 + src/p2p/exchange/carv1.rs | 454 ++++++++++++++++++++++++-------------- src/p2p/exchange/utils.rs | 110 +++++++-- 4 files changed, 398 insertions(+), 189 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2db27cf..cd37c5a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1178,6 +1178,15 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "cbor4ii" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.0.79" @@ -3295,6 +3304,7 @@ dependencies = [ "reqwest", "rocket", "serde", + "serde_ipld_dagcbor", "serde_json", "serde_with 1.14.0", "sled", @@ -6106,6 +6116,18 @@ dependencies = [ "syn 2.0.13", ] +[[package]] +name = "serde_ipld_dagcbor" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2433e94ffb5977e67afbd75733abd6ada1c4f497125894a8c89b3fdc5fd6a058" +dependencies = [ + "cbor4ii", + "cid", + "scopeguard", + "serde", +] + [[package]] name = "serde_jcs" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 3d4bd253..1b5c47e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ rocket = { version = "0.5.0-rc.2", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_json = "1" serde_with = { version = "1", features = ["hex"] } +serde_ipld_dagcbor = "0.3" sled = "0.34" thiserror = "1" tempfile = "3" diff --git a/src/p2p/exchange/carv1.rs b/src/p2p/exchange/carv1.rs index 6f23a289..82579fac 100644 --- a/src/p2p/exchange/carv1.rs +++ b/src/p2p/exchange/carv1.rs @@ -1,4 +1,4 @@ -use super::utils::{read_cid, read_leb128, write_leb128, Error as CidReadError, Leb128Reader}; +use super::utils::{read_cid, read_dag_cbor_cid, read_leb128, write_leb128, Error as CidReadError}; use async_stream::try_stream; use futures::{ channel::oneshot, @@ -8,6 +8,7 @@ use futures::{ use libipld::Cid; use pin_project::pin_project; use serde::{Deserialize, Serialize}; +use serde_ipld_dagcbor::{to_writer, DecodeError, EncodeError}; use std::{ future::Future, io::{Error as IoError, ErrorKind}, @@ -17,30 +18,70 @@ use std::{ use crate::storage::Content; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] pub struct Header { version: u8, roots: Vec, } impl Header { - pub async fn write_to(&self, writer: &mut W) -> Result<(), IoError> + const INITIAL_BYTES: [u8; 9] = [0xa2, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e]; + const ROOTS_BYTES: [u8; 6] = [0x65, 0x72, 0x6f, 0x6f, 0x74, 0x73]; + pub async fn write_to(&self, mut writer: W) -> Result<(), EncodeError> where W: AsyncWrite + Unpin, { - // TODO write serde_ipld_cbor + let mut buf = Vec::new(); + to_writer(&mut buf, self)?; + writer.write_all(&mut buf).await?; Ok(()) } - pub async fn read_from(reader: &mut R) -> Result + // no async serde decoding :(, so we have to do it manually + pub async fn read_from(mut reader: R) -> Result> where R: AsyncRead + Unpin, { - // TODO write serde_ipld_cbor - Ok(Self { - version: 0, - roots: Vec::new(), - }) + let mut buf = [0u8; 9]; + reader.read_exact(&mut buf).await?; + // expect opening of cbor map with 2 elements, then "version" key + if buf != Header::INITIAL_BYTES { + return Err(DecodeError::Msg("invalid header".to_string())); + }; + + // read version first, version < roots in dag-cbor + reader.read_exact(&mut buf[0..1]).await?; + let version = buf[0]; + + // read roots + reader.read_exact(&mut buf[0..6]).await?; + if buf[0..6] != Header::ROOTS_BYTES { + return Err(DecodeError::Msg("invalid header".to_string())); + }; + + // array tag + array len + reader.read_exact(&mut buf[0..1]).await?; + let array_tag = buf[0] >> 5; + if array_tag != 4 { + return Err(DecodeError::Mismatch { + expect_major: 4, + byte: array_tag, + }); + }; + + // assuming for now we'll never have more than 7 roots :/ + let array_len = buf[0] & 0b00011111; + + let mut roots = Vec::with_capacity(array_len.into()); + // for array len 'n' read n cids + for _ in 0..array_len { + roots.push(read_dag_cbor_cid(&mut reader).await.map_err(|e| match e { + CidReadError::Io(e) => DecodeError::Read(e), + _ => DecodeError::Msg(e.to_string()), + })?); + } + + Ok(Self { version, roots }) } } @@ -59,15 +100,15 @@ impl DataSection where R: AsyncRead, { - pub async fn write_to(self, writer: &mut W) -> Result<(), IoError> + pub async fn write_to(self, mut writer: W) -> Result<(), IoError> where W: AsyncWrite + Unpin, { let cid_bytes = self.cid.to_bytes(); let total_len = cid_bytes.len() as u64 + self.block.len(); - write_leb128(total_len, writer).await?; + write_leb128(total_len, &mut writer).await?; writer.write_all(&cid_bytes).await?; - copy(self.block, writer).await?; + copy(self.block, &mut writer).await?; Ok(()) } } @@ -75,25 +116,31 @@ where #[derive(thiserror::Error, Debug)] pub enum WriteError { #[error(transparent)] - Io(#[from] IoError), + Header(#[from] EncodeError), #[error(transparent)] Data(E), } +impl From for WriteError { + fn from(e: IoError) -> Self { + Self::Header(EncodeError::Write(e)) + } +} + pub async fn write( header: &Header, data: &mut S, - writer: &mut W, + mut writer: W, ) -> Result<(), WriteError> where W: AsyncWrite + Unpin, S: TryStream, Error = E> + Unpin, R: AsyncRead, { - header.write_to(writer).await?; + header.write_to(&mut writer).await?; while let Some(data) = data.try_next().await.map_err(WriteError::Data)? { - data.write_to(writer).await?; + data.write_to(&mut writer).await? } Ok(()) @@ -107,6 +154,8 @@ pub enum ReadError { Cid(#[from] libipld::cid::Error), #[error(transparent)] Canceled(#[from] oneshot::Canceled), + #[error(transparent)] + Header(#[from] DecodeError), } impl From for ReadError { @@ -118,169 +167,223 @@ impl From for ReadError { } } -pub struct CarBlockReader { - reader: DelimitedReader, -} - -enum DelimitedReaderState { - Waiting(Waiting), - Available(Available), +// pub struct CarBlockReader { +// reader: DelimitedReader, +// } + +// enum DelimitedReaderState { +// Waiting(Waiting), +// Available(Available), +// } + +// #[pin_project] +// struct Available { +// #[pin] +// len: Leb128Reader, +// } + +// struct Waiting { +// receiver: oneshot::Receiver, +// } + +// impl Available { +// fn new(reader: R) -> Self { +// Self { +// len: Leb128Reader::new(reader), +// } +// } +// } + +// impl Future for Available +// where +// R: AsyncRead + Unpin, +// { +// type Output = Result<(Waiting, TakenReader), IoError>; + +// fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { +// let this = self.project(); +// let (len, reader) = futures::ready!(this.len.poll(cx))?; +// // create a new reader +// let (reader, rx) = TakenReader::new(reader, len); +// Poll::Ready(Ok((Waiting { receiver: rx }, reader))) +// } +// } + +#[pin_project(project = TakenReaderProj)] +pub enum TakenReader { + Finished, + Unfinished(#[pin] Take, Option>), } -#[pin_project] -struct Available { - reader: Option, - len: Leb128Reader, -} - -struct Waiting { - receiver: oneshot::Receiver, -} +impl TakenReader +where + R: AsyncRead, +{ + pub fn new(reader: R, limit: u64) -> (Self, oneshot::Receiver) { + let (tx, rx) = oneshot::channel(); + let reader = reader.take(limit); + (Self::Unfinished(reader, Some(tx)), rx) + } -impl Available { - fn new(reader: R) -> Self { - Self { - reader: Some(reader), - len: Leb128Reader::new(), - } + fn finish(&mut self) -> Result<(), oneshot::Canceled> { + match self { + Self::Finished => (), + Self::Unfinished(r, tx) => { + let sender = tx.take(); + match sender { + Some(sender) => sender.send(r.into_inner()).map_err(|_| oneshot::Canceled)?, + None => (), + }; + *self = Self::Finished; + } + }; + Ok(()) } } -impl Future for Available +impl AsyncRead for TakenReader where R: AsyncRead + Unpin, { - type Output = Result<(Waiting, TakenReader), IoError>; - - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let this = self.project(); - let len = match this.reader { - Some(r) => { - let mut buf = [0u8; 1]; - loop { - match Pin::new(r).poll_read(cx, &mut buf)? { - Poll::Ready(0) => { - return Poll::Ready(Err(IoError::new(ErrorKind::Other, "reader empty"))) - } - Poll::Ready(1) => {} - Poll::Pending => return Poll::Pending, - }; - if let Some(len) = this.len.read(buf[0]) { - break len; + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + { + let mut this = self.project(); + match this { + TakenReaderProj::Finished => return Poll::Ready(Ok(0)), + TakenReaderProj::Unfinished(ref mut reader, _) => { + match Pin::new(reader).poll_read(cx, buf) { + Poll::Ready(Ok(0)) => true, + p => return p, } } } - None => { - return Poll::Ready(Err(IoError::new(ErrorKind::Other, "reader already taken"))) - } }; - // create oneshot - let (tx, rx) = oneshot::channel(); - // create a new reader - let reader = TakenReader { - reader: this - .reader - .take() - .ok_or_else(|| IoError::new(ErrorKind::Other, "reader already taken"))? - .take(len), - origin: tx, - }; - Poll::Ready(Ok((Waiting { receiver: rx }, reader))) + // we can only get here if we're finished but haven't sent yet + let _ = self.finish(); + Poll::Ready(Ok(0)) } -} -pub struct TakenReader { - reader: Take, - origin: oneshot::Sender, -} - -impl DelimitedReaderState { - fn new(reader: R) -> Self { - Self::Available(Available::new(reader)) - } -} - -#[pin_project] -struct DelimitedReader { - reader: DelimitedReaderState, -} - -impl DelimitedReader { - pub fn new(reader: R) -> Self { - Self { - reader: DelimitedReaderState::new(reader), - } - } -} - -impl Stream for DelimitedReader -where - R: AsyncRead + Unpin, -{ - type Item = Result, ReadError>; - fn poll_next( - self: Pin<&mut Self>, - context: &mut std::task::Context, - ) -> Poll> { - let p = self.project(); - match p.reader { - DelimitedReaderState::Waiting(r) => { - // wait for receiver to be ready - let ar = match r.receiver.try_recv() { - Ok(Some(ar)) => ar, - Ok(None) => return Poll::Pending, - // if reciever is dropped, should this return None? - Err(e) => return Poll::Ready(Some(Err(e.into()))), - }; - *p.reader = DelimitedReaderState::new(ar); - self.poll_next(context) - } - DelimitedReaderState::Available(a) => { - // read len - let (w, t) = futures::ready!(Pin::new(a).poll(context))?; - *p.reader = DelimitedReaderState::Waiting(w); - Poll::Ready(Some(Ok(t))) + fn poll_read_vectored( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &mut [std::io::IoSliceMut<'_>], + ) -> std::task::Poll> { + { + let mut this = self.project(); + match this { + TakenReaderProj::Finished => return Poll::Ready(Ok(0)), + TakenReaderProj::Unfinished(ref mut reader, _) => { + match reader.poll_read_vectored(cx, bufs) { + Poll::Ready(Ok(0)) => (), + p => return p, + } + } } - } + }; + // we can only get here if we're finished but haven't sent yet + let _ = self.finish(); + Poll::Ready(Ok(0)) } } -impl CarBlockReader { - pub fn new(reader: R) -> Self { - Self { - reader: DelimitedReader::new(reader), - } - } +// impl DelimitedReaderState { +// fn new(reader: R) -> Self { +// Self::Available(Available::new(reader)) +// } +// } + +// #[pin_project] +// struct DelimitedReader { +// reader: DelimitedReaderState, +// } + +// impl DelimitedReader { +// pub fn new(reader: R) -> Self { +// Self { +// reader: DelimitedReaderState::new(reader), +// } +// } +// } + +// impl Stream for DelimitedReader +// where +// R: AsyncRead + Unpin, +// { +// type Item = Result, ReadError>; +// fn poll_next( +// self: Pin<&mut Self>, +// context: &mut std::task::Context, +// ) -> Poll> { +// let p = self.project(); +// match p.reader { +// DelimitedReaderState::Waiting(r) => { +// // wait for receiver to be ready +// let ar = match r.receiver.try_recv() { +// Ok(Some(ar)) => ar, +// Ok(None) => return Poll::Pending, +// // if sender is dropped, should this return None? +// Err(e) => return Poll::Ready(Some(Err(e.into()))), +// }; +// *p.reader = DelimitedReaderState::new(ar); +// // self.poll_next(context) +// Poll::Pending +// } +// DelimitedReaderState::Available(a) => { +// // read len +// let (w, t) = futures::ready!(Pin::new(a).poll(context))?; +// *p.reader = DelimitedReaderState::Waiting(w); +// Poll::Ready(Some(Ok(t))) +// } +// } +// } +// } + +// impl CarBlockReader { +// pub fn new(reader: R) -> Self { +// Self { +// reader: DelimitedReader::new(reader), +// } +// } +// } + +enum ReaderState { + Empty(R), + Error(ReadError), + Element(DataSection>, F), } -pub async fn read_delimited( +pub(crate) async fn read_section( mut reader: R, -) -> Result< - ( - TakenReader, - impl Future>, - ), - ReadError, -> +) -> Result>, ReadError> where R: AsyncRead + Unpin, { - let len = read_leb128(&mut reader).await?; - let (tx, rx) = oneshot::channel(); - let reader = TakenReader { - reader: reader.take(len), - origin: tx, + let len = match read_leb128(&mut reader).await { + Ok(len) => len, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(ReaderState::Empty(reader)), + Err(e) => return Err(ReadError::Io(e)), }; - Ok((reader, rx)) + let cid = read_cid(&mut reader).await?; + let cid_len = cid.to_bytes().len() as u64; + let block_len = len - cid_len; + let (reader, rx) = TakenReader::new(reader, block_len); + Ok(ReaderState::Element( + DataSection::new(cid, Content::new(block_len, reader)), + rx, + )) } -pub async fn read( +pub async fn stream_carv1( mut reader: R, ) -> Result< ( Header, - impl TryStream>, Error = ReadError>, - impl Future, + impl Stream>, ReadError>>, + impl Future>, ), ReadError, > @@ -288,28 +391,39 @@ where R: AsyncRead + Unpin, { let header = Header::read_from(&mut reader).await?; - - let (sender, reciever) = oneshot::channel(); - - let data = try_stream! { - loop { - let len = match read_leb128(&mut reader).await { - Ok(l) => l, - Err(e) if e.kind() == ErrorKind::Eof => { - sender.send(reader).await?; - break; - }, - Err(e) => Err(e)? - }; - let cid = read_cid(&mut reader).await?; - - // TODO use libipld 0.16 to get cid len easily - let cid_len = cid.to_bytes().len() as u64; - - let block = reader.take(len - cid_len); - yield Ok((cid, block)) - } + let (final_tx, final_rx) = oneshot::channel(); + let stream = try_stream! { + while match read_section(reader).await? { + ReaderState::Empty(r) => {final_tx.send(r).map_err(|_| oneshot::Canceled)?; false}, + ReaderState::Error(e) => {yield Err(e); false}, + ReaderState::Element(ds, rx) => { + yield ds; + reader = rx.await?; + true + } + } {} }; + Ok((header, stream, final_rx)) +} - Ok((header, data, reciever)) +#[cfg(test)] +mod tests { + use super::*; + + #[test] + async fn header() { + let header = Header { + version: 1, + roots: vec![ + "bagaaierasords4njcts6vs7qvdjfcvgnume4hqohf65zsfguprqphs3icwea" + .parse() + .unwrap(), + ], + }; + let mut buf = Vec::new(); + header.write_to(&mut buf).await.unwrap(); + println!("{:x?}", buf); + let deser = Header::read_from(&mut buf.as_slice()).await.unwrap(); + assert_eq!(header, deser); + } } diff --git a/src/p2p/exchange/utils.rs b/src/p2p/exchange/utils.rs index ce1a29e4..d8a0b73f 100644 --- a/src/p2p/exchange/utils.rs +++ b/src/p2p/exchange/utils.rs @@ -1,9 +1,17 @@ -use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + Future, +}; use libipld::cid::{ multihash::{Code, MultihashDigest}, Cid, Error as CidError, }; -use std::io::Error as IoError; +use pin_project::pin_project; +use std::{ + io::{Error as IoError, ErrorKind as IoErrorKind}, + pin::Pin, + task::Poll, +}; use unsigned_varint::aio; #[derive(thiserror::Error, Debug)] @@ -12,16 +20,11 @@ pub enum Error { Io(#[from] IoError), #[error(transparent)] Cid(#[from] CidError), - #[error(transparent)] - VarInt(#[from] unsigned_varint::decode::Error), } impl From for Error { fn from(e: unsigned_varint::io::ReadError) -> Self { - match e { - unsigned_varint::io::ReadError::Io(e) => e.into(), - unsigned_varint::io::ReadError::Decode(e) => e.into(), - } + Self::Cid(e.into()) } } @@ -75,25 +78,94 @@ where .map_err(CidError::from)?; Ok(Cid::new_v1(codec, mh)) } - _ => Err(Error::Cid(CidError::InvalidCidVersion)), + v => { + println!("{:x?}", v); + Err(Error::Cid(CidError::InvalidCidVersion)) + } } } -pub struct Leb128Reader(u64, u8); +#[pin_project] +pub enum Leb128Reader { + Unfinished(u64, #[pin] R), + Finished, +} + +pub fn update_leb128(val: u64, byte: u8) -> u64 { + (val << 7) | (byte & 0x7f) as u64 +} -impl Leb128Reader { - pub fn new() -> Self { - Self(0, 0) +impl Leb128Reader { + pub fn new(reader: R) -> Self { + Self::Unfinished(0, reader) } - pub fn read(&mut self, byte: u8) -> Option { - self.0 |= ((byte & 0x7f) as u64) << self.1; - self.1 += 7; - if byte & 0x80 == 0 { - return Some(self.0); + fn finish(&mut self) -> (u64, R) { + match self { + Self::Unfinished(val, reader) => (*val, reader), + Self::Finished => panic!("already finished"), + } + } +} + +impl Future for Leb128Reader +where + R: AsyncRead + Unpin, +{ + type Output = Result<(u64, R), IoError>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + match *self { + Self::Unfinished(v, r) => { + let this = self.project(); + let mut buf = [0u8; 1]; + + match this.1.poll_read(cx, &mut buf) { + Poll::Ready(Ok(_)) => (), + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + }; + let byte = buf[0]; + *this.0 = update_leb128(*this.0, byte); + if byte & 0x80 == 0 {} + } + Self::Finished => return Poll::Ready(Err(IoError::from(IoErrorKind::UnexpectedEof))), } - None + return Poll::Ready(Ok((*this.0, *this.1))); + Poll::Pending + } +} + +pub async fn read_dag_cbor_cid(mut reader: R) -> Result +where + R: AsyncRead + Unpin, +{ + let mut buf = [0u8; 2]; + reader.read_exact(&mut buf).await?; + // check cid tag (0xd8) is 42 (0x2a) + if buf != [0xd8, 0x2a] { + return Err(Error::Cid(CidError::InvalidCidVersion)); + }; + + reader.read_exact(&mut buf).await?; + // check byte string tag + // tbh not sure what the extra bits in this byte are for + // so I'm ignoring them for now + if buf[0] >> 5 != 2 { + return Err(Error::Cid(CidError::InvalidCidVersion)); + }; + + let cid_len = buf[1]; + + println!("{:x?}", buf); + let mut vec = Vec::with_capacity(cid_len as usize); + // TODO wtf why doesnt this read anything + reader.take(cid_len.into()).read_exact(&mut vec).await?; + println!("{:x?}", vec); + if vec.get(0) != Some(&0) { + return Err(Error::Cid(CidError::InvalidCidVersion)); } + Ok(Cid::read_bytes(&vec[0..])?) } pub async fn read_leb128(mut reader: R) -> Result From aa1d53124a81446dc57e5894fdbaed82c8f90f58 Mon Sep 17 00:00:00 2001 From: chunningham Date: Fri, 28 Apr 2023 16:15:07 +0200 Subject: [PATCH 5/7] use async_stream instead of fancy structs --- src/p2p/exchange/carv1.rs | 199 ++++++++++---------------------------- src/p2p/exchange/utils.rs | 63 +----------- 2 files changed, 53 insertions(+), 209 deletions(-) diff --git a/src/p2p/exchange/carv1.rs b/src/p2p/exchange/carv1.rs index 82579fac..ec098eb9 100644 --- a/src/p2p/exchange/carv1.rs +++ b/src/p2p/exchange/carv1.rs @@ -167,50 +167,10 @@ impl From for ReadError { } } -// pub struct CarBlockReader { -// reader: DelimitedReader, -// } - -// enum DelimitedReaderState { -// Waiting(Waiting), -// Available(Available), -// } - -// #[pin_project] -// struct Available { -// #[pin] -// len: Leb128Reader, -// } - -// struct Waiting { -// receiver: oneshot::Receiver, -// } - -// impl Available { -// fn new(reader: R) -> Self { -// Self { -// len: Leb128Reader::new(reader), -// } -// } -// } - -// impl Future for Available -// where -// R: AsyncRead + Unpin, -// { -// type Output = Result<(Waiting, TakenReader), IoError>; - -// fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { -// let this = self.project(); -// let (len, reader) = futures::ready!(this.len.poll(cx))?; -// // create a new reader -// let (reader, rx) = TakenReader::new(reader, len); -// Poll::Ready(Ok((Waiting { receiver: rx }, reader))) -// } -// } - +#[derive(Default)] #[pin_project(project = TakenReaderProj)] pub enum TakenReader { + #[default] Finished, Unfinished(#[pin] Take, Option>), } @@ -226,16 +186,11 @@ where } fn finish(&mut self) -> Result<(), oneshot::Canceled> { - match self { - Self::Finished => (), - Self::Unfinished(r, tx) => { - let sender = tx.take(); - match sender { - Some(sender) => sender.send(r.into_inner()).map_err(|_| oneshot::Canceled)?, - None => (), - }; - *self = Self::Finished; + match std::mem::take(self) { + Self::Unfinished(r, Some(tx)) => { + tx.send(r.into_inner()).map_err(|_| oneshot::Canceled)?; } + _ => (), }; Ok(()) } @@ -250,21 +205,16 @@ where cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { - { - let mut this = self.project(); - match this { - TakenReaderProj::Finished => return Poll::Ready(Ok(0)), - TakenReaderProj::Unfinished(ref mut reader, _) => { - match Pin::new(reader).poll_read(cx, buf) { - Poll::Ready(Ok(0)) => true, - p => return p, - } + match *self { + TakenReader::Finished => Poll::Ready(Ok(0)), + TakenReader::Unfinished(ref mut reader, _) => { + let poll = Pin::new(reader).poll_read(cx, buf); + if let Poll::Ready(Ok(0)) = poll { + let _ = self.finish(); } + poll } - }; - // we can only get here if we're finished but haven't sent yet - let _ = self.finish(); - Poll::Ready(Ok(0)) + } } fn poll_read_vectored( @@ -272,87 +222,21 @@ where cx: &mut std::task::Context<'_>, bufs: &mut [std::io::IoSliceMut<'_>], ) -> std::task::Poll> { - { - let mut this = self.project(); - match this { - TakenReaderProj::Finished => return Poll::Ready(Ok(0)), - TakenReaderProj::Unfinished(ref mut reader, _) => { - match reader.poll_read_vectored(cx, bufs) { - Poll::Ready(Ok(0)) => (), - p => return p, - } + match *self { + TakenReader::Finished => Poll::Ready(Ok(0)), + TakenReader::Unfinished(ref mut reader, _) => { + let poll = Pin::new(reader).poll_read_vectored(cx, bufs); + if let Poll::Ready(Ok(0)) = poll { + let _ = self.finish(); } + poll } - }; - // we can only get here if we're finished but haven't sent yet - let _ = self.finish(); - Poll::Ready(Ok(0)) + } } } -// impl DelimitedReaderState { -// fn new(reader: R) -> Self { -// Self::Available(Available::new(reader)) -// } -// } - -// #[pin_project] -// struct DelimitedReader { -// reader: DelimitedReaderState, -// } - -// impl DelimitedReader { -// pub fn new(reader: R) -> Self { -// Self { -// reader: DelimitedReaderState::new(reader), -// } -// } -// } - -// impl Stream for DelimitedReader -// where -// R: AsyncRead + Unpin, -// { -// type Item = Result, ReadError>; -// fn poll_next( -// self: Pin<&mut Self>, -// context: &mut std::task::Context, -// ) -> Poll> { -// let p = self.project(); -// match p.reader { -// DelimitedReaderState::Waiting(r) => { -// // wait for receiver to be ready -// let ar = match r.receiver.try_recv() { -// Ok(Some(ar)) => ar, -// Ok(None) => return Poll::Pending, -// // if sender is dropped, should this return None? -// Err(e) => return Poll::Ready(Some(Err(e.into()))), -// }; -// *p.reader = DelimitedReaderState::new(ar); -// // self.poll_next(context) -// Poll::Pending -// } -// DelimitedReaderState::Available(a) => { -// // read len -// let (w, t) = futures::ready!(Pin::new(a).poll(context))?; -// *p.reader = DelimitedReaderState::Waiting(w); -// Poll::Ready(Some(Ok(t))) -// } -// } -// } -// } - -// impl CarBlockReader { -// pub fn new(reader: R) -> Self { -// Self { -// reader: DelimitedReader::new(reader), -// } -// } -// } - -enum ReaderState { +pub(crate) enum ReaderState { Empty(R), - Error(ReadError), Element(DataSection>, F), } @@ -362,7 +246,17 @@ pub(crate) async fn read_section( where R: AsyncRead + Unpin, { - let len = match read_leb128(&mut reader).await { + // check if reader is already empty (if it can't read 1 byte, it's empty) + let mut buf = [0u8; 1]; + match reader.read_exact(&mut buf).await { + Ok(_) => (), + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + return Ok(ReaderState::Empty(reader)) + } + Err(e) => return Err(e.into()), + }; + + let len = match read_leb128(buf.chain(&mut reader)).await { Ok(len) => len, Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(ReaderState::Empty(reader)), Err(e) => return Err(ReadError::Io(e)), @@ -377,6 +271,9 @@ where )) } +// this function should take a reader and stream out length-delimited cid-block pairs +// until the reader is empty. once the reader is empty, the returned future should +// resolve with the value of the reader pub async fn stream_carv1( mut reader: R, ) -> Result< @@ -390,18 +287,24 @@ pub async fn stream_carv1( where R: AsyncRead + Unpin, { + // read the header let header = Header::read_from(&mut reader).await?; + // setup the channel for completion state let (final_tx, final_rx) = oneshot::channel(); let stream = try_stream! { - while match read_section(reader).await? { - ReaderState::Empty(r) => {final_tx.send(r).map_err(|_| oneshot::Canceled)?; false}, - ReaderState::Error(e) => {yield Err(e); false}, - ReaderState::Element(ds, rx) => { - yield ds; - reader = rx.await?; - true + loop { + // try read a section + match read_section(reader).await? { + // section is empty, send reader to completion channel + ReaderState::Empty(r) => {final_tx.send(r).map_err(|_| oneshot::Canceled)?; break}, + // reader is not empty, send section to stream + ReaderState::Element(ds, rx) => { + yield ds; + reader = rx.await?; + } } - } {} + } + }; Ok((header, stream, final_rx)) } diff --git a/src/p2p/exchange/utils.rs b/src/p2p/exchange/utils.rs index d8a0b73f..cd686d8c 100644 --- a/src/p2p/exchange/utils.rs +++ b/src/p2p/exchange/utils.rs @@ -1,17 +1,9 @@ -use futures::{ - io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - Future, -}; +use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use libipld::cid::{ multihash::{Code, MultihashDigest}, Cid, Error as CidError, }; -use pin_project::pin_project; -use std::{ - io::{Error as IoError, ErrorKind as IoErrorKind}, - pin::Pin, - task::Poll, -}; +use std::io::Error as IoError; use unsigned_varint::aio; #[derive(thiserror::Error, Debug)] @@ -85,57 +77,6 @@ where } } -#[pin_project] -pub enum Leb128Reader { - Unfinished(u64, #[pin] R), - Finished, -} - -pub fn update_leb128(val: u64, byte: u8) -> u64 { - (val << 7) | (byte & 0x7f) as u64 -} - -impl Leb128Reader { - pub fn new(reader: R) -> Self { - Self::Unfinished(0, reader) - } - - fn finish(&mut self) -> (u64, R) { - match self { - Self::Unfinished(val, reader) => (*val, reader), - Self::Finished => panic!("already finished"), - } - } -} - -impl Future for Leb128Reader -where - R: AsyncRead + Unpin, -{ - type Output = Result<(u64, R), IoError>; - - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - match *self { - Self::Unfinished(v, r) => { - let this = self.project(); - let mut buf = [0u8; 1]; - - match this.1.poll_read(cx, &mut buf) { - Poll::Ready(Ok(_)) => (), - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - }; - let byte = buf[0]; - *this.0 = update_leb128(*this.0, byte); - if byte & 0x80 == 0 {} - } - Self::Finished => return Poll::Ready(Err(IoError::from(IoErrorKind::UnexpectedEof))), - } - return Poll::Ready(Ok((*this.0, *this.1))); - Poll::Pending - } -} - pub async fn read_dag_cbor_cid(mut reader: R) -> Result where R: AsyncRead + Unpin, From cb1ed042fd257db1b7c70c0f413f2008bef73f6a Mon Sep 17 00:00:00 2001 From: chunningham Date: Fri, 28 Apr 2023 16:51:21 +0200 Subject: [PATCH 6/7] testing --- src/p2p/exchange/carv1.rs | 90 +++++++++++++++++++++++++++++++++++++-- src/p2p/exchange/utils.rs | 6 +-- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/src/p2p/exchange/carv1.rs b/src/p2p/exchange/carv1.rs index ec098eb9..d2f0cb63 100644 --- a/src/p2p/exchange/carv1.rs +++ b/src/p2p/exchange/carv1.rs @@ -94,6 +94,10 @@ impl DataSection { pub fn new(cid: Cid, block: Content) -> Self { Self { cid, block } } + + pub fn into_inner(self) -> (Cid, Content) { + (self.cid, self.block) + } } impl DataSection @@ -127,7 +131,7 @@ impl From for WriteError { } } -pub async fn write( +pub async fn write_carv1( header: &Header, data: &mut S, mut writer: W, @@ -168,11 +172,10 @@ impl From for ReadError { } #[derive(Default)] -#[pin_project(project = TakenReaderProj)] pub enum TakenReader { #[default] Finished, - Unfinished(#[pin] Take, Option>), + Unfinished(Take, Option>), } impl TakenReader @@ -205,6 +208,7 @@ where cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { + // boy I hope this doesnt need to be pinned in a better way match *self { TakenReader::Finished => Poll::Ready(Ok(0)), TakenReader::Unfinished(ref mut reader, _) => { @@ -304,7 +308,6 @@ where } } } - }; Ok((header, stream, final_rx)) } @@ -312,6 +315,8 @@ where #[cfg(test)] mod tests { use super::*; + use futures::StreamExt; + use libipld::{multihash::Code, raw::RawCodec, Block, DefaultParams}; #[test] async fn header() { @@ -329,4 +334,81 @@ mod tests { let deser = Header::read_from(&mut buf.as_slice()).await.unwrap(); assert_eq!(header, deser); } + + #[test] + async fn reader() { + let header = Header { + version: 1, + roots: Vec::new(), + }; + + let (cid1, block1) = + Block::::encode(RawCodec, Code::Sha3_256, &vec![0u8; 1024]) + .expect("block encoding to work") + .into_inner(); + + let (cid2, block2) = + Block::::encode(RawCodec, Code::Sha3_256, &vec![1u8; 1024]) + .expect("block encoding to work") + .into_inner(); + + println!("{:?}", cid1); + println!("{:?}", cid2); + let cid_len = cid1.to_bytes().len() as u64; + + let mut buf = Vec::with_capacity(2048); + header + .write_to(&mut buf) + .await + .expect("header write to work"); + + write_leb128(block1.len() as u64 + cid_len, &mut buf) + .await + .expect("leb128 write to work"); + buf.extend_from_slice(cid1.to_bytes().as_slice()); + buf.extend_from_slice(&block1); + + write_leb128(block2.len() as u64 + cid_len, &mut buf) + .await + .expect("leb128 write to work"); + buf.extend_from_slice(cid2.to_bytes().as_slice()); + buf.extend_from_slice(&block2); + + let (read_header, stream, final_rx) = stream_carv1(buf.as_slice()) + .await + .expect("stream from buffer to work"); + assert_eq!(read_header, header); + + let mut s = Box::pin(stream); + + let (rcid1, mut rblock1) = s + .next() + .await + .expect("no read error") + .expect("there should be a section") + .into_inner(); + assert_eq!(rcid1, cid1); + let mut buf1 = Vec::new(); + rblock1 + .read_to_end(&mut buf1) + .await + .expect("there should be a block"); + assert_eq!(buf1, block1); + + let (rcid2, mut rblock2) = s + .next() + .await + .expect("no read error") + .expect("there should be a section") + .into_inner(); + assert_eq!(rcid2, cid2); + let mut buf2 = Vec::new(); + rblock2 + .read_to_end(&mut buf2) + .await + .expect("there should be a block"); + assert_eq!(buf2, block2); + + assert_eq!(final_rx.await.unwrap(), buf); + } } diff --git a/src/p2p/exchange/utils.rs b/src/p2p/exchange/utils.rs index cd686d8c..6f48c87a 100644 --- a/src/p2p/exchange/utils.rs +++ b/src/p2p/exchange/utils.rs @@ -36,7 +36,7 @@ where Ok(Cid::new_v0(Code::Sha2_256.digest(&buf))?) } // CID v1 - 0x20 => { + 0x1 => { let codec = aio::read_u64(&mut reader).await?; let mh_code = Code::try_from(aio::read_u64(&mut reader).await?).map_err(CidError::from)?; @@ -98,11 +98,11 @@ where let cid_len = buf[1]; - println!("{:x?}", buf); + println!("buf: {:x?}", buf); let mut vec = Vec::with_capacity(cid_len as usize); // TODO wtf why doesnt this read anything reader.take(cid_len.into()).read_exact(&mut vec).await?; - println!("{:x?}", vec); + println!("cid bytes: {:x?}", vec); if vec.get(0) != Some(&0) { return Err(Error::Cid(CidError::InvalidCidVersion)); } From c900453a2888f34f491438dad365128e839f2611 Mon Sep 17 00:00:00 2001 From: chunningham Date: Sat, 29 Apr 2023 12:46:26 +0200 Subject: [PATCH 7/7] wip --- src/p2p/exchange/carv1.rs | 12 ++++-------- src/p2p/exchange/carv2.rs | 6 +++--- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/p2p/exchange/carv1.rs b/src/p2p/exchange/carv1.rs index d2f0cb63..30575954 100644 --- a/src/p2p/exchange/carv1.rs +++ b/src/p2p/exchange/carv1.rs @@ -260,11 +260,7 @@ where Err(e) => return Err(e.into()), }; - let len = match read_leb128(buf.chain(&mut reader)).await { - Ok(len) => len, - Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(ReaderState::Empty(reader)), - Err(e) => return Err(ReadError::Io(e)), - }; + let len = read_leb128(buf.chain(&mut reader)).await?; let cid = read_cid(&mut reader).await?; let cid_len = cid.to_bytes().len() as u64; let block_len = len - cid_len; @@ -343,12 +339,12 @@ mod tests { }; let (cid1, block1) = - Block::::encode(RawCodec, Code::Sha3_256, &vec![0u8; 1024]) + Block::::encode(RawCodec, Code::Sha3_256, &vec![0u8; 10]) .expect("block encoding to work") .into_inner(); let (cid2, block2) = - Block::::encode(RawCodec, Code::Sha3_256, &vec![1u8; 1024]) + Block::::encode(RawCodec, Code::Sha3_256, &vec![1u8; 24]) .expect("block encoding to work") .into_inner(); @@ -356,7 +352,7 @@ mod tests { println!("{:?}", cid2); let cid_len = cid1.to_bytes().len() as u64; - let mut buf = Vec::with_capacity(2048); + let mut buf = Vec::with_capacity(34); header .write_to(&mut buf) .await diff --git a/src/p2p/exchange/carv2.rs b/src/p2p/exchange/carv2.rs index dde84ada..52e20e56 100644 --- a/src/p2p/exchange/carv2.rs +++ b/src/p2p/exchange/carv2.rs @@ -5,7 +5,7 @@ use futures::{ use libipld::Cid; use std::io::Error as IoError; -use super::carv1::{write as v1_write, DataSection, Header as V1Header, WriteError}; +use super::carv1::{write_carv1, DataSection, Header as V1Header, WriteError}; use crate::storage::ImmutableStore; const CAR_V2_PRAGMA: [u8; 11] = [ @@ -67,7 +67,7 @@ impl Header { } } -pub async fn write( +pub async fn write_carv2( header: &Header, v1_header: &V1Header, data: &mut S, @@ -82,7 +82,7 @@ where writer.write_all(&CAR_V2_PRAGMA).await?; header.write_to(writer).await?; - v1_write(v1_header, data, writer).await?; + write_carv1(v1_header, data, writer).await?; // TODO write index if present