Allow specifying other parameters in levin-cuprate

This commit is contained in:
Boog900 2023-08-03 12:19:19 +01:00
parent 6875a9a11d
commit 6a75b90fe7
No known key found for this signature in database
GPG key ID: 5401367FB7302004
3 changed files with 180 additions and 146 deletions

View file

@ -22,36 +22,79 @@ use bytes::{Buf, BufMut, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
use crate::{
Bucket, BucketBuilder, BucketError, BucketHead, LevinBody, MessageType,
LEVIN_DEFAULT_MAX_PACKET_SIZE,
Bucket, BucketBuilder, BucketError, BucketHead, LevinBody, LevinCommand, MessageType, Protocol,
};
/// The levin tokio-codec for decoding and encoding levin buckets
#[derive(Default, Debug, Clone)]
pub enum LevinCodec {
#[derive(Debug, Clone)]
pub enum LevinBucketState<C> {
/// Waiting for the peer to send a header.
#[default]
WaitingForHeader,
/// Waiting for a peer to send a body.
WaitingForBody(BucketHead),
WaitingForBody(BucketHead<C>),
}
impl Decoder for LevinCodec {
type Item = Bucket;
/// The levin tokio-codec for decoding and encoding raw levin buckets
///
#[derive(Debug, Clone)]
pub struct LevinBucketCodec<C> {
state: LevinBucketState<C>,
protocol: Protocol,
handshake_message_seen: bool,
}
impl<C> Default for LevinBucketCodec<C> {
fn default() -> Self {
LevinBucketCodec {
state: LevinBucketState::WaitingForHeader,
protocol: Protocol::default(),
handshake_message_seen: false,
}
}
}
impl<C> LevinBucketCodec<C> {
pub fn new(protocol: Protocol) -> Self {
LevinBucketCodec {
state: LevinBucketState::WaitingForHeader,
protocol,
handshake_message_seen: false,
}
}
}
impl<C: LevinCommand> Decoder for LevinBucketCodec<C> {
type Item = Bucket<C>;
type Error = BucketError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
match self {
LevinCodec::WaitingForHeader => {
if src.len() < BucketHead::SIZE {
match &self.state {
LevinBucketState::WaitingForHeader => {
if src.len() < BucketHead::<C>::SIZE {
return Ok(None);
};
let head = BucketHead::from_bytes(src)?;
let _ = std::mem::replace(self, LevinCodec::WaitingForBody(head));
let head = BucketHead::<C>::from_bytes(src);
if head.size > self.protocol.max_packet_size
|| head.size > head.command.bucket_size_limit()
{
return Err(BucketError::BucketExceededMaxSize);
}
if !self.handshake_message_seen {
if head.size > self.protocol.max_packet_size_before_handshake {
return Err(BucketError::BucketExceededMaxSize);
}
if head.command.is_handshake() {
self.handshake_message_seen = true;
}
}
let _ =
std::mem::replace(&mut self.state, LevinBucketState::WaitingForBody(head));
}
LevinCodec::WaitingForBody(head) => {
// We size check header while decoding it.
LevinBucketState::WaitingForBody(head) => {
let body_len = head
.size
.try_into()
@ -61,7 +104,7 @@ impl Decoder for LevinCodec {
return Ok(None);
}
let LevinCodec::WaitingForBody(header) = std::mem::replace(self, LevinCodec::WaitingForHeader) else {
let LevinBucketState::WaitingForBody(header) = std::mem::replace(&mut self.state, LevinBucketState::WaitingForHeader) else {
unreachable!()
};
@ -75,10 +118,10 @@ impl Decoder for LevinCodec {
}
}
impl Encoder<Bucket> for LevinCodec {
impl<C: LevinCommand> Encoder<Bucket<C>> for LevinBucketCodec<C> {
type Error = BucketError;
fn encode(&mut self, item: Bucket, dst: &mut BytesMut) -> Result<(), Self::Error> {
if dst.capacity() < BucketHead::SIZE + item.body.len() {
fn encode(&mut self, item: Bucket<C>, dst: &mut BytesMut) -> Result<(), Self::Error> {
if dst.capacity() < BucketHead::<C>::SIZE + item.body.len() {
return Err(BucketError::IO(std::io::Error::new(
ErrorKind::OutOfMemory,
"Not enough capacity to write the bucket",
@ -91,22 +134,22 @@ impl Encoder<Bucket> for LevinCodec {
}
#[derive(Default, Debug, Clone)]
enum MessageState {
enum MessageState<C> {
#[default]
WaitingForBucket,
WaitingForRestOfFragment(Vec<u8>, MessageType, u32),
WaitingForRestOfFragment(Vec<u8>, MessageType, C),
}
/// A tokio-codec for levin messages or in other words the decoded body
/// of a levin bucket.
#[derive(Debug, Clone)]
pub struct LevinMessageCodec<T> {
pub struct LevinMessageCodec<T: LevinBody> {
message_ty: PhantomData<T>,
bucket_codec: LevinCodec,
state: MessageState,
bucket_codec: LevinBucketCodec<T::Command>,
state: MessageState<T::Command>,
}
impl<T> Default for LevinMessageCodec<T> {
impl<T: LevinBody> Default for LevinMessageCodec<T> {
fn default() -> Self {
Self {
message_ty: Default::default(),
@ -127,23 +170,20 @@ impl<T: LevinBody> Decoder for LevinMessageCodec<T> {
return Ok(None);
};
let end_fragment = bucket.header.flags.end_fragment;
let start_fragment = bucket.header.flags.start_fragment;
let request = bucket.header.flags.request;
let response = bucket.header.flags.response;
let flags = &bucket.header.flags;
if start_fragment && end_fragment {
if flags.is_start_fragment() && flags.is_end_fragment() {
// Dummy message
return Ok(None);
};
if end_fragment {
if flags.is_end_fragment() {
return Err(BucketError::InvalidHeaderFlags(
"Flag end fragment received before a start fragment",
));
};
if !request && !response {
if !flags.is_request() && !flags.is_response() {
return Err(BucketError::InvalidHeaderFlags(
"Request and response flags both not set",
));
@ -154,13 +194,13 @@ impl<T: LevinBody> Decoder for LevinMessageCodec<T> {
bucket.header.have_to_return_data,
)?;
if start_fragment {
if flags.is_start_fragment() {
let _ = std::mem::replace(
&mut self.state,
MessageState::WaitingForRestOfFragment(
bucket.body.to_vec(),
message_type,
bucket.header.protocol_version,
bucket.header.command,
),
);
@ -178,17 +218,14 @@ impl<T: LevinBody> Decoder for LevinMessageCodec<T> {
return Ok(None);
};
let end_fragment = bucket.header.flags.end_fragment;
let start_fragment = bucket.header.flags.start_fragment;
let request = bucket.header.flags.request;
let response = bucket.header.flags.response;
let flags = &bucket.header.flags;
if start_fragment && end_fragment {
if flags.is_start_fragment() && flags.is_end_fragment() {
// Dummy message
return Ok(None);
};
if !request && !response {
if !flags.is_request() && !flags.is_response() {
return Err(BucketError::InvalidHeaderFlags(
"Request and response flags both not set",
));
@ -207,12 +244,12 @@ impl<T: LevinBody> Decoder for LevinMessageCodec<T> {
if bucket.header.command != *command {
return Err(BucketError::InvalidFragmentedMessage(
"Command not consistent across message",
"Command not consistent across fragments",
));
}
if bytes.len() + bucket.body.len()
> LEVIN_DEFAULT_MAX_PACKET_SIZE.try_into().unwrap()
if bytes.len().saturating_add(bucket.body.len())
> command.bucket_size_limit().try_into().unwrap()
{
return Err(BucketError::InvalidFragmentedMessage(
"Fragmented message exceeded maximum size",
@ -221,7 +258,7 @@ impl<T: LevinBody> Decoder for LevinMessageCodec<T> {
bytes.append(&mut bucket.body.to_vec());
if end_fragment {
if flags.is_end_fragment() {
let MessageState::WaitingForRestOfFragment(bytes, ty, command) =
std::mem::replace(&mut self.state, MessageState::WaitingForBucket) else {
unreachable!();

View file

@ -16,10 +16,9 @@
//! This module provides a struct BucketHead for the header of a levin protocol
//! message.
use crate::LEVIN_DEFAULT_MAX_PACKET_SIZE;
use bytes::{Buf, BufMut, BytesMut};
use super::{BucketError, LEVIN_SIGNATURE, PROTOCOL_VERSION};
use crate::LevinCommand;
const REQUEST: u32 = 0b0000_0001;
const RESPONSE: u32 = 0b0000_0010;
@ -28,57 +27,41 @@ const END_FRAGMENT: u32 = 0b0000_1000;
/// Levin header flags
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct Flags {
/// Q bit
pub request: bool,
/// S bit
pub response: bool,
/// B bit
pub start_fragment: bool,
/// E bit
pub end_fragment: bool,
pub struct Flags(u32);
impl Flags {
pub const REQUEST: Flags = Flags(REQUEST);
pub const RESPONSE: Flags = Flags(RESPONSE);
pub fn is_request(&self) -> bool {
self.0 & REQUEST != 0
}
pub fn is_response(&self) -> bool {
self.0 & RESPONSE != 0
}
pub fn is_start_fragment(&self) -> bool {
self.0 & START_FRAGMENT != 0
}
pub fn is_end_fragment(&self) -> bool {
self.0 & END_FRAGMENT != 0
}
}
impl TryFrom<u32> for Flags {
type Error = BucketError;
fn try_from(value: u32) -> Result<Self, Self::Error> {
let flags = Flags {
request: value & REQUEST > 0,
response: value & RESPONSE > 0,
start_fragment: value & START_FRAGMENT > 0,
end_fragment: value & END_FRAGMENT > 0,
};
if flags.request && flags.response {
return Err(BucketError::InvalidHeaderFlags(
"Request and Response bits set",
));
};
Ok(flags)
impl From<u32> for Flags {
fn from(value: u32) -> Self {
Flags(value)
}
}
impl From<Flags> for u32 {
fn from(value: Flags) -> Self {
let mut ret = 0;
if value.request {
ret |= REQUEST;
};
if value.response {
ret |= RESPONSE;
};
if value.start_fragment {
ret |= START_FRAGMENT;
};
if value.end_fragment {
ret |= END_FRAGMENT;
};
ret
value.0
}
}
/// The Header of a Bucket. This contains
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct BucketHead {
pub struct BucketHead<C> {
/// The network signature, should be `LEVIN_SIGNATURE` for Monero
pub signature: u64,
/// The size of the body
@ -87,7 +70,7 @@ pub struct BucketHead {
/// messages require responses but don't have this set (some notifications)
pub have_to_return_data: bool,
/// Command
pub command: u32,
pub command: C,
/// Return Code - will be 0 for requests and >0 for ok responses otherwise will be
/// a negative number corresponding to the error
pub return_code: i32,
@ -97,61 +80,36 @@ pub struct BucketHead {
pub protocol_version: u32,
}
impl BucketHead {
impl<C: LevinCommand> BucketHead<C> {
/// The size of the header (in bytes)
pub const SIZE: usize = 33;
/// Builds the header in a Monero specific way
pub fn build_monero(
payload_size: u64,
have_to_return_data: bool,
command: u32,
flags: Flags,
return_code: i32,
) -> BucketHead {
BucketHead {
signature: LEVIN_SIGNATURE,
size: payload_size,
have_to_return_data,
command,
return_code,
flags,
protocol_version: PROTOCOL_VERSION,
}
}
/// Builds the header from bytes, this function does not check any fields should
/// match the expected ones (signature, protocol_version)
/// match the expected ones.
///
/// # Panics
/// This function will panic if there aren't enough bytes to fill the header.
/// Currently ['SIZE'](BucketHead::SIZE)
pub fn from_bytes(buf: &mut BytesMut) -> Result<BucketHead, BucketError> {
let header = BucketHead {
pub fn from_bytes(buf: &mut BytesMut) -> BucketHead<C> {
BucketHead {
signature: buf.get_u64_le(),
size: buf.get_u64_le(),
have_to_return_data: buf.get_u8() != 0,
command: buf.get_u32_le(),
command: buf.get_u32_le().into(),
return_code: buf.get_i32_le(),
flags: Flags::try_from(buf.get_u32_le())?,
flags: Flags::from(buf.get_u32_le()),
protocol_version: buf.get_u32_le(),
};
if header.size > LEVIN_DEFAULT_MAX_PACKET_SIZE {
return Err(BucketError::BucketExceededMaxSize);
}
Ok(header)
}
/// Serializes the header
pub fn write_bytes(&self, dst: &mut BytesMut) {
dst.reserve(BucketHead::SIZE);
dst.reserve(Self::SIZE);
dst.put_u64_le(self.signature);
dst.put_u64_le(self.size);
dst.put_u8(if self.have_to_return_data { 1 } else { 0 });
dst.put_u32_le(self.command);
dst.put_u32_le(self.command.clone().into());
dst.put_i32_le(self.return_code);
dst.put_u32_le(self.flags.into());
dst.put_u32_le(self.protocol_version);

View file

@ -43,9 +43,10 @@ use std::fmt::Debug;
use thiserror::Error;
const PROTOCOL_VERSION: u32 = 1;
const LEVIN_SIGNATURE: u64 = 0x0101010101012101;
const LEVIN_DEFAULT_MAX_PACKET_SIZE: u64 = 100_000_000; // 100MB
const MONERO_PROTOCOL_VERSION: u32 = 1;
const MONERO_LEVIN_SIGNATURE: u64 = 0x0101010101012101;
const MONERO_MAX_PACKET_SIZE_BEFORE_HANDSHAKE: u64 = 256 * 1000; // 256 KiB
const MONERO_MAX_PACKET_SIZE: u64 = 100_000_000; // 100MB
/// Possible Errors when working with levin buckets
#[derive(Error, Debug)]
@ -59,6 +60,9 @@ pub enum BucketError {
/// Invalid Fragmented Message
#[error("Levin fragmented message was invalid: {0}")]
InvalidFragmentedMessage(&'static str),
/// The Header did not have the correct signature
#[error("Levin header had incorrect signature")]
InvalidHeaderSignature,
/// Error decoding the body
#[error("Error decoding bucket body")]
BodyDecodingError(Box<dyn std::error::Error + Send + Sync>),
@ -67,11 +71,33 @@ pub enum BucketError {
IO(#[from] std::io::Error),
}
/// Levin protocol settings, allows setting custom parameters.
///
/// For Monero use [`Default::default()`]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct Protocol {
pub version: u32,
pub signature: u64,
pub max_packet_size_before_handshake: u64,
pub max_packet_size: u64,
}
impl Default for Protocol {
fn default() -> Self {
Protocol {
version: MONERO_PROTOCOL_VERSION,
signature: MONERO_LEVIN_SIGNATURE,
max_packet_size_before_handshake: MONERO_MAX_PACKET_SIZE_BEFORE_HANDSHAKE,
max_packet_size: MONERO_MAX_PACKET_SIZE,
}
}
}
/// A levin Bucket
#[derive(Debug)]
pub struct Bucket {
pub struct Bucket<C> {
/// The bucket header
pub header: BucketHead,
pub header: BucketHead<C>,
/// The bucket body
pub body: Vec<u8>,
}
@ -101,11 +127,11 @@ impl MessageType {
flags: header::Flags,
have_to_return: bool,
) -> Result<Self, BucketError> {
if flags.request && have_to_return {
if flags.is_request() && have_to_return {
Ok(MessageType::Request)
} else if flags.request {
} else if flags.is_request() {
Ok(MessageType::Notification)
} else if flags.response && !have_to_return {
} else if flags.is_response() && !have_to_return {
Ok(MessageType::Response)
} else {
Err(BucketError::InvalidHeaderFlags(
@ -116,42 +142,36 @@ impl MessageType {
pub fn as_flags(&self) -> header::Flags {
match self {
MessageType::Request | MessageType::Notification => header::Flags {
request: true,
..Default::default()
},
MessageType::Response => header::Flags {
response: true,
..Default::default()
},
MessageType::Request | MessageType::Notification => header::Flags::REQUEST,
MessageType::Response => header::Flags::RESPONSE,
}
}
}
#[derive(Debug)]
pub struct BucketBuilder {
pub struct BucketBuilder<C> {
signature: Option<u64>,
ty: Option<MessageType>,
command: Option<u32>,
command: Option<C>,
return_code: Option<i32>,
protocol_version: Option<u32>,
body: Option<Vec<u8>>,
}
impl Default for BucketBuilder {
impl<C> Default for BucketBuilder<C> {
fn default() -> Self {
Self {
signature: Some(LEVIN_SIGNATURE),
signature: Some(MONERO_LEVIN_SIGNATURE),
ty: None,
command: None,
return_code: None,
protocol_version: Some(PROTOCOL_VERSION),
protocol_version: Some(MONERO_PROTOCOL_VERSION),
body: None,
}
}
}
impl BucketBuilder {
impl<C: LevinCommand> BucketBuilder<C> {
pub fn set_signature(&mut self, sig: u64) {
self.signature = Some(sig)
}
@ -160,7 +180,7 @@ impl BucketBuilder {
self.ty = Some(ty)
}
pub fn set_command(&mut self, command: u32) {
pub fn set_command(&mut self, command: C) {
self.command = Some(command)
}
@ -176,7 +196,7 @@ impl BucketBuilder {
self.body = Some(body)
}
pub fn finish(self) -> Bucket {
pub fn finish(self) -> Bucket<C> {
let body = self.body.unwrap();
let ty = self.ty.unwrap();
Bucket {
@ -196,9 +216,28 @@ impl BucketBuilder {
/// A levin body
pub trait LevinBody: Sized {
type Command: LevinCommand;
/// Decodes the message from the data in the header
fn decode_message(body: &[u8], typ: MessageType, command: u32) -> Result<Self, BucketError>;
fn decode_message(
body: &[u8],
typ: MessageType,
command: Self::Command,
) -> Result<Self, BucketError>;
/// Encodes the message
fn encode(&self, builder: &mut BucketBuilder) -> Result<(), BucketError>;
fn encode(&self, builder: &mut BucketBuilder<Self::Command>) -> Result<(), BucketError>;
}
/// The levin commands.
///
/// Implementers should account for all possible u32 values, this means
/// you will probably need some sort of `Unknown` variant.
pub trait LevinCommand: From<u32> + Into<u32> + PartialEq + Clone {
/// Returns the size limit for this command.
///
/// must be less than [`usize::MAX`]
fn bucket_size_limit(&self) -> u64;
/// Returns if this is a handshake
fn is_handshake(&self) -> bool;
}