Compare commits

...

10 commits

Author SHA1 Message Date
hinto.janai
4252a0a4a8
fix epee-encoding
Some checks failed
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled
2024-06-23 17:41:30 -04:00
hinto.janai
000b1947e1
fix leftover merge conflicts 2024-06-23 17:37:27 -04:00
hinto.janai
1f8d1f8399
crate import fixes 2024-06-23 17:32:55 -04:00
hinto.janai
2d3ecdaa8c
Merge branch 'main' into rename 2024-06-23 17:24:45 -04:00
hinto-janai
fe1d5faac9
contributing: expand issue/PR sections, re-format (#186)
Some checks are pending
Audit / audit (push) Waiting to run
CI / fmt (push) Waiting to run
CI / typo (push) Waiting to run
CI / ci (macos-latest, stable, bash) (push) Waiting to run
CI / ci (ubuntu-latest, stable, bash) (push) Waiting to run
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Waiting to run
Deny / audit (push) Waiting to run
* contributing.md: expand tracking issue/pr section

* add tracking issue section

* re-format sections

* typos

* fix links

* add `.github/pull_request_template.md`

* add `Pull request title and description` section

* wording
2024-06-22 01:33:29 +01:00
10aac8cbb2
P2P: Block downloader (#132)
* impl async buffer

* clippy

* p2p changes

* clippy

* a few more docs

* init cuprate-p2p

* remove some unrelated code and add some docs

* start documenting client_pool.rs

* add more docs

* typo

* fix docs

* use JoinSet in connection maintainer

* small changes

* add peer sync state svc

* add broadcast svc

* add more docs

* add some tests

* add a test

* fix merge

* add another test

* unify PeerDisconnectFut and add more docs

* start network init

* add an inbound connection server

* remove crate doc for now

* fix address book docs

* fix leak in client pool

* correct comment

* fix merge + add some docs

* review comments

* init block downloader

* fix doc

* initial chain search

* add chain_tracker

* move block downloader to struct

* spawn task whe getting blocks

* check for free peers and handle batch response

* add test bin

* working block downloader

* dynamic batch sizes

* dandelion_tower -> dandelion-tower

* fix async-buffer builds

* check if incoming peers are banned

* add interface methods

* update docs

* use a JoinSet for background network tasks

* dynamic batch size changes

* Keep a longer of queue of blocks to get

* more checks on incoming data

* fix merge

* fix imports

* add more docs

* add some limits on messages

* keep peers that dont have the current need data

* fix clippy

* fix .lock

* fix stopping the block downloader

* clean up API and add more docs

* tracing + bug fixes

* fix panic

* doc changes

* remove test_init

* remove spammy log

* fix previous merge

* add a test

* fix test

* remove test unwrap

* order imports correctly

* clean up test

* add a timeout

* fix tests

* review fixes

* make `BlockDownloader` pub

* make `initial_chain_search` pub

* make `block_downloader` private

* Apply suggestions from code review

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

* split some sections into separate modules

* split chain requests

* sort imports

* check previous ID is correct

* fix typos

* Apply suggestions from code review

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

---------

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
2024-06-22 01:29:40 +01:00
hinto-janai
ff1172f2ab
epee: make {read,write}_varint public, create write_{bytes,container} (#185)
* make `{read,write}_varint` public, create `write_{container,bytes}`

* add doc tests to varint functions

* `write_container` -> `write_iterator`

* add `write_{iterator,bytes}` doc test

* fix `write_iterator()` doc
2024-06-22 01:25:21 +01:00
hinto-janai
f6c4e4e9a8
repo: add Tracking Issue issue template (#182)
Some checks failed
Audit / audit (push) Has been cancelled
CI / fmt (push) Has been cancelled
CI / typo (push) Has been cancelled
CI / ci (macos-latest, stable, bash) (push) Has been cancelled
CI / ci (ubuntu-latest, stable, bash) (push) Has been cancelled
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Has been cancelled
Deny / audit (push) Has been cancelled
* add `tracking_issue.md`

* fix `{bug,proposal}.md`

* format `tracking_issue.md`
2024-06-20 23:27:09 +01:00
hinto-janai
bef2a2cbd4
epee: unseal trait EpeeValue (#184)
* unseal `trait EpeeValue`

* fix `container_as_blob.rs`

* clippy

* epee-encoding: remove `sealed`
2024-06-20 23:20:13 +01:00
b76042a4e4
Cargo update (#171)
Some checks are pending
Audit / audit (push) Waiting to run
CI / fmt (push) Waiting to run
CI / typo (push) Waiting to run
CI / ci (macos-latest, stable, bash) (push) Waiting to run
CI / ci (ubuntu-latest, stable, bash) (push) Waiting to run
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Waiting to run
Deny / audit (push) Waiting to run
cargo update
2024-06-18 22:43:35 +01:00
25 changed files with 2418 additions and 525 deletions

View file

@ -24,7 +24,7 @@ Example:
## Bug
What is the bug?
### Expected behavior
## Expected behavior
What correct beahvior was expected to happen?
## Steps to reproduce

View file

@ -14,11 +14,11 @@ Note: Please search to see if an issue already exists for this proposal.
## What
Describe your proposal.
## Where
Describe where your proposal will cause changes to.
## Why
Describe why the proposal is needed.
## Where
Describe where your proposal will cause changes to.
## How
Describe how the proposal could be implemented.

View file

@ -0,0 +1,41 @@
---
name: 🔍 Tracking Issue
about: Create an issue that tracks a wider effort
title: 'Tracking Issue for ...'
labels: ["C-tracking-issue"]
assignees: ''
---
<!-- Consider keeping the following section in the issue. -->
### About tracking issues
Tracking issues are used to record the overall progress of implementation.
They are also used as hubs connecting to other relevant issues, e.g., bugs or open design questions.
A tracking issue is however not meant for large scale discussion, questions, or bug reports about a feature.
Instead, open a dedicated issue for the specific matter.
### What
This is a tracking issue for ...
### Steps
<!--
Describe the steps required to bring this effort to completion.
For larger features, more steps might be involved.
If the feature is changed later, please add those PRs here as well.
-->
- [ ] Initial implementation: #...
- [ ] Other code: #...
- [ ] Multi-PR effort
- #...
- #...
- #...
- [ ] Finalization PR: #...
### Related
<!-- Link any related issues/PRs here. -->
- #...
- #...
- #...

35
.github/pull_request_template.md vendored Normal file
View file

@ -0,0 +1,35 @@
<!--
PR titles should be:
<AREA>: <SHORT_DESCRIPTION>
For example:
books: fix typo
-->
<!--
If your pull request is long and/or has sections
that need clarifying, consider leaving a review on
your own PR with comments explaining the changes.
-->
### What
<!--
If applicable, close a related issue with:
Fixes #<BUG_ISSUE_NUMBER>
...or...
Closes #<FEATURE_ISSUE_NUMBER>
-->
<!-- Describe the pull request in detail. -->
### Why
<!-- If applicable, describe why this pull request exists. -->
### Where
<!-- If applicable, describe the places this pull request affects. -->
### How
<!-- If applicable, describe how this pull request works. -->

View file

@ -4,37 +4,87 @@ Thank you for wanting to help out!
Cuprate is in the stage where things are likely to change quickly, so it's recommended
you ask questions in our public [Matrix room](https://matrix.to/#/#cuprate:monero.social).
- [1. Submitting a pull request](#1-submitting-a-pull-request)
- [1.1 Rust toolchain](#11-rust-toolchain)
- [1.2 Draft PR](#12-draft-pr)
- [1.3 Passing CI](#13-passing-ci)
- [1.4 Ready for review](#14-ready-for-review)
- [2. Crate names](#2-crate-names)
- [3. Coding guidelines](#3-coding-guidelines)
- [4. Keeping track of issues and PRs](#4-keeping-track-of-issues-and-prs)
- [1. Submitting an issue](#1-submitting-an-issue)
- [1.1 Discussion](#11-discussion)
- [1.2 Proposal](#12-proposal)
- [1.3 Tracking issue](#13-tracking-issue)
- [2. Submitting a pull request](#2-submitting-a-pull-request)
- [2.1 Rust toolchain](#21-rust-toolchain)
- [2.2 Draft PR](#22-draft-pr)
- [2.3 Passing CI](#23-passing-ci)
- [2.4 Ready for review](#24-ready-for-review)
- [3. Keeping track of issues and PRs](#3-keeping-track-of-issues-and-prs)
- [3.1 Labels](#31-labels)
- [3.2 Tracking issues](#32-tracking-issues)
- [4. Coding guidelines](#4-coding-guidelines)
- [4.1 General guidelines](#41-general-guidelines)
- [4.2 Crate names](#42-crate-names)
- [4.3 Pull request title and description](#43-pull-request-title-and-description)
- [5. Documentation](#5-documentation)
- [6. Books](#6-books)
- [6.1 Architecture book](#61-architecture-book)
- [6.2 Protocol book](#62-protocol-book)
- [6.3 User book](#63-user-book)
## 1. Submitting a pull request
Once you have found something you would like to work on by:
## 1. Submitting an issue
Before starting work, consider opening an issue for discussion.
If you have a plan already, you can jump straight into [submitting a pull request](#2-submitting-a-pull-request).
Otherwise, see below for issue types and what they're used for.
### 1.1 Discussion
These are for general discussion on topics that have questions that aren't fully answered yet.
If you would like to discuss a topic and get some feedback, consider [opening a discussion](https://github.com/Cuprate/cuprate/issues/new/choose).
Examples:
- https://github.com/Cuprate/cuprate/issues/40
- https://github.com/Cuprate/cuprate/issues/53
- https://github.com/Cuprate/cuprate/issues/163
### 1.2 Proposal
These are formal issues that specify changes that are _almost_ ready for implementation.
These should answer some basic questions:
- **What** is this proposal for?
- **Why** is this proposal needed?
- **Where** will this proposal make changes to?
- **How** will this proposal be implemented?
If you have a close to fully fleshed out idea, consider [opening a proposal](https://github.com/Cuprate/cuprate/issues/new/choose).
Opening a PR and writing the proposal in the PR description is also viable.
Examples:
- https://github.com/Cuprate/cuprate/pull/146
- https://github.com/Cuprate/cuprate/issues/106
- https://github.com/Cuprate/cuprate/issues/153
- https://github.com/Cuprate/cuprate/issues/181
### 1.3 Tracking issue
These are meta-issues that track an in-progress implementation.
See [`Tracking issues`](#32-tracking-issues) for more info.
## 2. Submitting a pull request
Once you have found something you would like to work on after:
- Discussing an idea on an [issue](#1-submitting-an-issue)
- Looking at the [open issues](https://github.com/Cuprate/cuprate/issues)
- Looking at issues with the [`A-help-wanted`](https://github.com/Cuprate/cuprate/issues?q=is%3Aissue+is%3Aopen+label%3AE-help-wanted) label
- or joining Cuprate's [Matrix room](https://matrix.to/#/#cuprate:monero.social) and asking
- Joining Cuprate's [Matrix room](https://matrix.to/#/#cuprate:monero.social) and asking
it is recommended to make your interest on working on that thing known so people don't duplicate work.
Before starting, consider reading/using Cuprate's:
- [`Documentation`](#5-documentation) (practical `cargo` docs)
- [`Books`](#6-books) (Cuprate's architecture and protocol)
- [`Documentation`](#5-documentation)
- [`Books`](#6-books)
These may answer some questions you have, or may confirm an issue you would like to fix.
_Note: Cuprate is currently a work-in-progress; documentation will be changing/unfinished._
### 1.1 Rust toolchain
### 2.1 Rust toolchain
Cuprate is written in [Rust](https://rust-lang.org).
If you are editing code, you will need Rust's toolchain and package manager,
@ -42,12 +92,12 @@ If you are editing code, you will need Rust's toolchain and package manager,
Get started with Rust here: <https://www.rust-lang.org/learn/get-started>.
### 1.2 Draft PR
### 2.2 Draft PR
Consider opening a draft PR until you have passed all CI.
This is also the stage where you can ask for feedback from others. Keep in mind that feedback may take time especially if the change is large.
### 1.3 Passing CI
### 2.3 Passing CI
Each commit pushed in a PR will trigger our [lovely, yet pedantic CI](https://github.com/Cuprate/cuprate/blob/main/.github/workflows/ci.yml).
It currently:
@ -57,7 +107,7 @@ It currently:
- Runs [`clippy`](https://github.com/rust-lang/rust-clippy) (and fails on warnings)
- Runs all tests
- Builds all targets
- Automatically add approriate [labels](#4-keeping-track-of-issues-and-prs) to your PR
- Automatically adds approriate [labels](#31-labels) to your PR
Before pushing your code, please run the following at the root of the repository:
@ -79,45 +129,25 @@ After that, ensure all other CI passes by running:
**Note: in order for some tests to work, you will need to place a [`monerod`](https://www.getmonero.org/downloads/) binary at the root of the repository.**
### 1.4 Ready for review
### 2.4 Ready for review
Once your PR has passed all CI and is ready to go, open it for review. Others will leave their thoughts and may ask for changes to be made.
Finally, if everything looks good, we will merge your code! Thank you for contributing!
## 2. Crate names
All of Cuprate's crates (libraries) are prefixed with `cuprate-`. All directories containing crates however, are not.
## 3. Keeping track of issues and PRs
The Cuprate GitHub repository has a lot of issues and PRs to keep track of.
For example:
This section documents tools used to help with this.
| Crate Directory | Crate Name |
|--------------------|--------------------|
| `storage/database` | `cuprate-database` |
| `net/levin` | `cuprate-levin` |
| `net/wire` | `cuprate-wire` |
## 3. Coding guidelines
This is a list of rules that are not mandated by any automation, but contributors generally follow.
You should keep these in mind when submitting code:
- Separate and sort imports as core, std, third-party, Cuprate crates, current crate
- Follow the [Rust API Guidelines](https://rust-lang.github.io/api-guidelines)
- `// Comment like this.` and not `//like this`
- Use `TODO` instead of `FIXME`
- Avoid `unsafe`
And the most important rule:
- Break any and all of the above rules when it makes sense
## 4. Keeping track of issues and PRs
The Cuprate GitHub repository has a lot of issues and PRs to keep track of. Cuprate makes use of generic labels and labels grouped by a prefixes to help with this.
### 3.1 Labels
Cuprate makes use of labels grouped by prefixes.
Some labels will be [automatically added/removed](https://github.com/Cuprate/cuprate/tree/main/.github/labeler.yml) if certain file paths have been changed in a PR.
The following section explains the meaning of various labels used.
This section is primarily targeted at maintainers. Most contributors aren't able to set these labels.
| Labels | Description | Example |
| Prefix | Description | Example |
|--------------|-------------|---------|
| [A-] | The **area** of the project an issue relates to. | `A-storage`, `A-rpc`, `A-docs`
| [C-] | The **category** of an issue. | `C-cleanup`, `C-optimization`
@ -135,6 +165,56 @@ This section is primarily targeted at maintainers. Most contributors aren't able
[O-]: https://github.com/Cuprate/cuprate/labels?q=O
[P-]: https://github.com/Cuprate/cuprate/labels?q=P
### 3.2 Tracking issues
If you are working on a larger effort, consider opening a [tracking issue](https://github.com/Cuprate/cuprate/issues/new/choose)!
The main purpose of these are to track efforts that may contain multiple PRs and/or are generally spread out. These don't usually contain the "why", but if they do, they are brief. These contain no implementation details or the how, as those are for the issues/PRs that are being tracked.
Examples:
- https://github.com/Cuprate/cuprate/issues/187
- https://github.com/Cuprate/cuprate/issues/183
## 4. Coding guidelines
These are some rules that are not mandated by any automation, but contributors generally follow.
### 4.1 General guidelines
General guidelines you should keep these in mind when submitting code:
- Separate and sort imports as `core`, `std`, `third-party`, Cuprate crates, current crate
- Follow the [Rust API Guidelines](https://rust-lang.github.io/api-guidelines)
- `// Comment like this.` and not `//like this`
- Use `TODO` instead of `FIXME`
- Avoid `unsafe`
And the most important rule:
- Break any and all of the above rules when it makes sense
### 4.2 Crate names
All of Cuprate's crates (libraries) are prefixed with `cuprate-`. All directories containing crates however, are not.
For example:
| Crate Directory | Crate Name |
|--------------------|--------------------|
| `storage/database` | `cuprate-database` |
| `net/levin` | `cuprate-levin` |
| `net/wire` | `cuprate-wire` |
### 4.3 Pull request title and description
In general, pull request titles should follow this syntax:
```
<AREA>: <SHORT_DESCRIPTION>
```
For example:
```
books: fix typo
```
The description of pull requests should generally follow the template laid out in [`.github/pull_request_template.md`](.github/pull_request_template.md).
If your pull request is long and/or has sections that need clarifying, consider leaving a review on your own PR with comments explaining the changes.
## 5. Documentation
Cuprate's crates (libraries) have inline documentation.

387
Cargo.lock generated
View file

@ -331,7 +331,7 @@ version = "4.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6"
dependencies = [
"heck 0.5.0",
"heck",
"proc-macro2",
"quote",
"syn 2.0.66",
@ -606,7 +606,6 @@ dependencies = [
"hex",
"paste",
"ref-cast",
"sealed",
"thiserror",
]
@ -684,6 +683,7 @@ version = "0.1.0"
dependencies = [
"bytes",
"cuprate-address-book",
"cuprate-async-buffer",
"cuprate-fixed-bytes",
"cuprate-helper",
"cuprate-p2p-core",
@ -696,16 +696,17 @@ dependencies = [
"indexmap 2.2.6",
"monero-serai",
"pin-project",
"proptest",
"rand",
"rand_distr",
"rayon",
"thiserror",
"tokio",
"tokio-stream",
"tokio-test",
"tokio-util",
"tower",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -795,9 +796,9 @@ dependencies = [
[[package]]
name = "curve25519-dalek"
version = "4.1.2"
version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if",
"cpufeatures",
@ -805,7 +806,6 @@ dependencies = [
"digest",
"fiat-crypto",
"group",
"platforms",
"rand_core",
"rustc_version",
"subtle",
@ -903,17 +903,6 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "displaydoc"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "doxygen-rs"
version = "0.4.2"
@ -1164,12 +1153,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "heck"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "heck"
version = "0.5.0"
@ -1277,9 +1260,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.9.3"
version = "1.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0e7a4dd27b9476dc40cb050d3632d3bba3a70ddbff012285f7f8559a1e7e545"
checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9"
[[package]]
name = "hyper"
@ -1361,134 +1344,14 @@ dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526"
dependencies = [
"displaydoc",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
name = "icu_locid"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637"
dependencies = [
"displaydoc",
"litemap",
"tinystr",
"writeable",
"zerovec",
]
[[package]]
name = "icu_locid_transform"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e"
dependencies = [
"displaydoc",
"icu_locid",
"icu_locid_transform_data",
"icu_provider",
"tinystr",
"zerovec",
]
[[package]]
name = "icu_locid_transform_data"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e"
[[package]]
name = "icu_normalizer"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f"
dependencies = [
"displaydoc",
"icu_collections",
"icu_normalizer_data",
"icu_properties",
"icu_provider",
"smallvec",
"utf16_iter",
"utf8_iter",
"write16",
"zerovec",
]
[[package]]
name = "icu_normalizer_data"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516"
[[package]]
name = "icu_properties"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036"
dependencies = [
"displaydoc",
"icu_collections",
"icu_locid_transform",
"icu_properties_data",
"icu_provider",
"tinystr",
"zerovec",
]
[[package]]
name = "icu_properties_data"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569"
[[package]]
name = "icu_provider"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9"
dependencies = [
"displaydoc",
"icu_locid",
"icu_provider_macros",
"stable_deref_trait",
"tinystr",
"writeable",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
name = "icu_provider_macros"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "idna"
version = "1.0.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed"
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
dependencies = [
"icu_normalizer",
"icu_properties",
"smallvec",
"utf8_iter",
"unicode-bidi",
"unicode-normalization",
]
[[package]]
@ -1569,12 +1432,6 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "litemap"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704"
[[package]]
name = "lmdb-master-sys"
version = "0.2.1"
@ -1614,9 +1471,9 @@ dependencies = [
[[package]]
name = "memchr"
version = "2.7.2"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "merlin"
@ -1632,9 +1489,9 @@ dependencies = [
[[package]]
name = "miniz_oxide"
version = "0.7.3"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae"
checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
dependencies = [
"adler",
]
@ -1709,16 +1566,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-traits"
version = "0.2.19"
@ -1766,12 +1613,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "page_size"
version = "0.6.0"
@ -1920,12 +1761,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "platforms"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -2125,9 +1960,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.5.1"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e"
checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd"
dependencies = [
"bitflags 2.5.0",
]
@ -2305,18 +2140,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sealed"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "security-framework"
version = "2.11.0"
@ -2398,15 +2221,6 @@ dependencies = [
"keccak",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
@ -2466,12 +2280,6 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "std-shims"
version = "0.1.1"
@ -2530,17 +2338,6 @@ dependencies = [
"crossbeam-queue",
]
[[package]]
name = "synstructure"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "tap"
version = "1.0.1"
@ -2599,15 +2396,20 @@ dependencies = [
]
[[package]]
name = "tinystr"
version = "0.7.6"
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"displaydoc",
"zerovec",
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.38.0"
@ -2771,18 +2573,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
@ -2791,12 +2581,7 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
]
[[package]]
@ -2817,12 +2602,27 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicode-bidi"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "unicode-normalization"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5"
dependencies = [
"tinyvec",
]
[[package]]
name = "untrusted"
version = "0.9.0"
@ -2831,33 +2631,15 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "url"
version = "2.5.1"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56"
checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]]
name = "utf16_iter"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246"
[[package]]
name = "utf8_iter"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "version_check"
version = "0.9.4"
@ -3174,18 +2956,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "write16"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936"
[[package]]
name = "writeable"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51"
[[package]]
name = "wyz"
version = "0.5.1"
@ -3201,30 +2971,6 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "yoke"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5"
dependencies = [
"serde",
"stable_deref_trait",
"yoke-derive",
"zerofrom",
]
[[package]]
name = "yoke-derive"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
"synstructure",
]
[[package]]
name = "zerocopy"
version = "0.7.34"
@ -3245,27 +2991,6 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "zerofrom"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
"synstructure",
]
[[package]]
name = "zeroize"
version = "1.8.1"
@ -3285,25 +3010,3 @@ dependencies = [
"quote",
"syn 2.0.66",
]
[[package]]
name = "zerovec"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c"
dependencies = [
"yoke",
"zerofrom",
"zerovec-derive",
]
[[package]]
name = "zerovec-derive"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]

View file

@ -89,6 +89,7 @@ tempfile = { version = "3" }
pretty_assertions = { version = "1.4.0" }
proptest = { version = "1" }
proptest-derive = { version = "0.4.0" }
tokio-test = { version = "0.4.4" }
## TODO:
## Potential dependencies.

View file

@ -17,7 +17,6 @@ std = ["dep:thiserror", "bytes/std", "cuprate-fixed-bytes/std"]
[dependencies]
cuprate-fixed-bytes = { path = "../fixed-bytes", default-features = false }
sealed = "0.5.0"
paste = "1.0.14"
ref-cast = "1.0.22"
bytes = { workspace = true }

View file

@ -1,8 +1,7 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use ref_cast::RefCast;
use sealed::sealed;
use crate::{error::*, value::*, EpeeValue, InnerMarker, Marker};
use crate::{error::*, EpeeValue, InnerMarker, Marker};
#[derive(RefCast)]
#[repr(transparent)]
@ -26,7 +25,6 @@ impl<'a, T: Containerable + EpeeValue> From<&'a Vec<T>> for &'a ContainerAsBlob<
}
}
#[sealed]
impl<T: Containerable + EpeeValue> EpeeValue for ContainerAsBlob<T> {
const MARKER: Marker = Marker::new(InnerMarker::String);

View file

@ -77,7 +77,7 @@ pub use error::*;
use io::*;
pub use marker::{InnerMarker, Marker};
pub use value::EpeeValue;
use varint::*;
pub use varint::{read_varint, write_varint};
/// Header that needs to be at the beginning of every binary blob that follows
/// this binary serialization format.
@ -212,6 +212,87 @@ fn write_epee_value<T: EpeeValue, B: BufMut>(val: T, w: &mut B) -> Result<()> {
val.write(w)
}
/// Write a byte array to `w` with [`write_varint`].
///
/// This function:
/// - Writes the length of `t`'s bytes into `w` using [`write_varint`]
/// - Writes `t`'s bytes into `w`
///
/// It is used as the internal [`EpeeValue::write`]
/// implementation of byte-like containers such as:
/// - [`EpeeValue::<Vec<u8>>::write`]
/// - [`EpeeValue::<String>::write`]
///
/// # Errors
/// This will error if:
/// - [`write_varint`] fails
/// - `w` does not have enough capacity
///
/// # Example
/// ```rust
/// let t: [u8; 8] = [3, 0, 0, 0, 1, 0, 0, 0];
/// let mut w = vec![];
///
/// cuprate_epee_encoding::write_bytes(t, &mut w).unwrap();
///
/// assert_eq!(w.len(), 9); // length of bytes + bytes
/// assert_eq!(w[1..], t);
/// ```
pub fn write_bytes<T: AsRef<[u8]>, B: BufMut>(t: T, w: &mut B) -> Result<()> {
let bytes = t.as_ref();
let len = bytes.len();
write_varint(len.try_into()?, w)?;
if w.remaining_mut() < len {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put_slice(bytes);
Ok(())
}
/// Write an [`Iterator`] of [`EpeeValue`]s to `w` with [`write_varint`].
///
/// This function:
/// - Writes the length of the `iterator`, into `w` using [`write_varint`]
/// - [`EpeeValue::write`]s each `T` of the iterator into `w`
///
/// It is used as the internal [`EpeeValue::write`]
/// implementation of containers such as [`EpeeValue::<Vec<T>>::write`].
///
/// # Errors
/// This will error if:
/// - [`write_varint`] fails
/// - [`EpeeValue::<T>::write`] fails
///
/// # Example
/// ```rust
/// let t: u64 = 3;
/// let vec: Vec<u64> = vec![t, t];
/// let mut w = vec![];
///
/// let iter: std::vec::IntoIter<u64> = vec.into_iter();
/// cuprate_epee_encoding::write_iterator(iter, &mut w).unwrap();
///
/// assert_eq!(w.len(), 17);
/// assert_eq!(w[1..9], [3, 0, 0, 0, 0, 0, 0, 0]);
/// assert_eq!(w[9..], [3, 0, 0, 0, 0, 0, 0, 0]);
/// ```
pub fn write_iterator<T, I, B>(iterator: I, w: &mut B) -> Result<()>
where
T: EpeeValue,
I: Iterator<Item = T> + ExactSizeIterator,
B: BufMut,
{
write_varint(iterator.len().try_into()?, w)?;
for item in iterator.into_iter() {
item.write(w)?;
}
Ok(())
}
/// A helper object builder that just skips every field.
#[derive(Default)]
struct SkipObjectBuilder;

View file

@ -1,21 +1,23 @@
//! This module contains a [`EpeeValue`] trait and
//! impls for some possible base epee values.
use alloc::{string::String, vec::Vec};
/// This module contains a `sealed` [`EpeeValue`] trait and different impls for
/// the different possible base epee values.
use core::fmt::Debug;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use sealed::sealed;
use cuprate_fixed_bytes::{ByteArray, ByteArrayVec};
use crate::{
io::*, varint::*, EpeeObject, Error, InnerMarker, Marker, Result, MAX_STRING_LEN_POSSIBLE,
io::{checked_read_primitive, checked_write_primitive},
varint::{read_varint, write_varint},
write_bytes, write_iterator, EpeeObject, Error, InnerMarker, Marker, Result,
MAX_STRING_LEN_POSSIBLE,
};
/// A trait for epee values, this trait is sealed as all possible epee values are
/// defined in the lib, to make an [`EpeeValue`] outside the lib you will need to
/// use the trait [`EpeeObject`].
#[sealed(pub(crate))]
/// A trait for epee values.
///
/// All [`EpeeObject`] objects automatically implement [`EpeeValue`].
pub trait EpeeValue: Sized {
const MARKER: Marker;
@ -37,7 +39,6 @@ pub trait EpeeValue: Sized {
fn write<B: BufMut>(self, w: &mut B) -> Result<()>;
}
#[sealed]
impl<T: EpeeObject> EpeeValue for T {
const MARKER: Marker = Marker::new(InnerMarker::Object);
@ -56,7 +57,6 @@ impl<T: EpeeObject> EpeeValue for T {
}
}
#[sealed]
impl<T: EpeeObject> EpeeValue for Vec<T> {
const MARKER: Marker = T::MARKER.into_seq();
@ -86,15 +86,10 @@ impl<T: EpeeObject> EpeeValue for Vec<T> {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
for item in self.into_iter() {
item.write(w)?;
}
Ok(())
write_iterator(self.into_iter(), w)
}
}
#[sealed]
impl<T: EpeeObject + Debug, const N: usize> EpeeValue for [T; N] {
const MARKER: Marker = <T>::MARKER.into_seq();
@ -109,17 +104,12 @@ impl<T: EpeeObject + Debug, const N: usize> EpeeValue for [T; N] {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
for item in self.into_iter() {
item.write(w)?;
}
Ok(())
write_iterator(self.into_iter(), w)
}
}
macro_rules! epee_numb {
($numb:ty, $marker:ident, $read_fn:ident, $write_fn:ident) => {
#[sealed]
impl EpeeValue for $numb {
const MARKER: Marker = Marker::new(InnerMarker::$marker);
@ -148,7 +138,6 @@ epee_numb!(u32, U32, get_u32_le, put_u32_le);
epee_numb!(u64, U64, get_u64_le, put_u64_le);
epee_numb!(f64, F64, get_f64_le, put_f64_le);
#[sealed]
impl EpeeValue for bool {
const MARKER: Marker = Marker::new(InnerMarker::Bool);
@ -165,7 +154,6 @@ impl EpeeValue for bool {
}
}
#[sealed]
impl EpeeValue for Vec<u8> {
const MARKER: Marker = Marker::new(InnerMarker::String);
@ -198,18 +186,10 @@ impl EpeeValue for Vec<u8> {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
if w.remaining_mut() < self.len() {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put_slice(&self);
Ok(())
write_bytes(self, w)
}
}
#[sealed::sealed]
impl EpeeValue for Bytes {
const MARKER: Marker = Marker::new(InnerMarker::String);
@ -239,18 +219,10 @@ impl EpeeValue for Bytes {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
if w.remaining_mut() < self.len() {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put(self);
Ok(())
write_bytes(self, w)
}
}
#[sealed::sealed]
impl EpeeValue for BytesMut {
const MARKER: Marker = Marker::new(InnerMarker::String);
@ -283,18 +255,10 @@ impl EpeeValue for BytesMut {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
if w.remaining_mut() < self.len() {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put(self);
Ok(())
write_bytes(self, w)
}
}
#[sealed::sealed]
impl<const N: usize> EpeeValue for ByteArrayVec<N> {
const MARKER: Marker = Marker::new(InnerMarker::String);
@ -326,19 +290,10 @@ impl<const N: usize> EpeeValue for ByteArrayVec<N> {
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
let bytes = self.take_bytes();
write_varint(bytes.len().try_into()?, w)?;
if w.remaining_mut() < bytes.len() {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put(bytes);
Ok(())
write_bytes(bytes, w)
}
}
#[sealed::sealed]
impl<const N: usize> EpeeValue for ByteArray<N> {
const MARKER: Marker = Marker::new(InnerMarker::String);
@ -362,19 +317,10 @@ impl<const N: usize> EpeeValue for ByteArray<N> {
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
let bytes = self.take_bytes();
write_varint(N.try_into().unwrap(), w)?;
if w.remaining_mut() < N {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put(bytes);
Ok(())
write_bytes(bytes, w)
}
}
#[sealed]
impl EpeeValue for String {
const MARKER: Marker = Marker::new(InnerMarker::String);
@ -392,18 +338,10 @@ impl EpeeValue for String {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
if w.remaining_mut() < self.len() {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put_slice(self.as_bytes());
Ok(())
write_bytes(self, w)
}
}
#[sealed]
impl<const N: usize> EpeeValue for [u8; N] {
const MARKER: Marker = Marker::new(InnerMarker::String);
@ -418,18 +356,10 @@ impl<const N: usize> EpeeValue for [u8; N] {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
if w.remaining_mut() < self.len() {
return Err(Error::IO("Not enough capacity to write bytes"));
}
w.put_slice(&self);
Ok(())
write_bytes(self, w)
}
}
#[sealed]
impl<const N: usize> EpeeValue for Vec<[u8; N]> {
const MARKER: Marker = <[u8; N]>::MARKER.into_seq();
@ -460,17 +390,12 @@ impl<const N: usize> EpeeValue for Vec<[u8; N]> {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
for item in self.into_iter() {
item.write(w)?;
}
Ok(())
write_iterator(self.into_iter(), w)
}
}
macro_rules! epee_seq {
($val:ty) => {
#[sealed]
impl EpeeValue for Vec<$val> {
const MARKER: Marker = <$val>::MARKER.into_seq();
@ -501,15 +426,10 @@ macro_rules! epee_seq {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
for item in self.into_iter() {
item.write(w)?;
}
Ok(())
write_iterator(self.into_iter(), w)
}
}
#[sealed]
impl<const N: usize> EpeeValue for [$val; N] {
const MARKER: Marker = <$val>::MARKER.into_seq();
@ -524,11 +444,7 @@ macro_rules! epee_seq {
}
fn write<B: BufMut>(self, w: &mut B) -> Result<()> {
write_varint(self.len().try_into()?, w)?;
for item in self.into_iter() {
item.write(w)?;
}
Ok(())
write_iterator(self.into_iter(), w)
}
}
};
@ -548,7 +464,6 @@ epee_seq!(String);
epee_seq!(Bytes);
epee_seq!(BytesMut);
#[sealed]
impl<T: EpeeValue> EpeeValue for Option<T> {
const MARKER: Marker = T::MARKER;

View file

@ -7,6 +7,18 @@ const FITS_IN_ONE_BYTE: u64 = 2_u64.pow(8 - SIZE_OF_SIZE_MARKER) - 1;
const FITS_IN_TWO_BYTES: u64 = 2_u64.pow(16 - SIZE_OF_SIZE_MARKER) - 1;
const FITS_IN_FOUR_BYTES: u64 = 2_u64.pow(32 - SIZE_OF_SIZE_MARKER) - 1;
/// Read an epee variable sized number from `r`.
///
/// ```rust
/// use cuprate_epee_encoding::read_varint;
///
/// assert_eq!(read_varint(&mut [252].as_slice()).unwrap(), 63);
/// assert_eq!(read_varint(&mut [1, 1].as_slice()).unwrap(), 64);
/// assert_eq!(read_varint(&mut [253, 255].as_slice()).unwrap(), 16_383);
/// assert_eq!(read_varint(&mut [2, 0, 1, 0].as_slice()).unwrap(), 16_384);
/// assert_eq!(read_varint(&mut [254, 255, 255, 255].as_slice()).unwrap(), 1_073_741_823);
/// assert_eq!(read_varint(&mut [3, 0, 0, 0, 1, 0, 0, 0].as_slice()).unwrap(), 1_073_741_824);
/// ```
pub fn read_varint<B: Buf>(r: &mut B) -> Result<u64> {
if !r.has_remaining() {
Err(Error::IO("Not enough bytes to build VarInt"))?
@ -26,6 +38,26 @@ pub fn read_varint<B: Buf>(r: &mut B) -> Result<u64> {
Ok(vi)
}
/// Write an epee variable sized number into `w`.
///
/// ```rust
/// use cuprate_epee_encoding::write_varint;
///
/// let mut buf = vec![];
///
/// for (number, expected_bytes) in [
/// (63, [252].as_slice()),
/// (64, [1, 1].as_slice()),
/// (16_383, [253, 255].as_slice()),
/// (16_384, [2, 0, 1, 0].as_slice()),
/// (1_073_741_823, [254, 255, 255, 255].as_slice()),
/// (1_073_741_824, [3, 0, 0, 0, 1, 0, 0, 0].as_slice()),
/// ] {
/// buf.clear();
/// write_varint(number, &mut buf);
/// assert_eq!(buf.as_slice(), expected_bytes);
/// }
/// ```
pub fn write_varint<B: BufMut>(number: u64, w: &mut B) -> Result<()> {
let size_marker = match number {
0..=FITS_IN_ONE_BYTE => 0,

View file

@ -10,7 +10,7 @@ default = ["borsh"]
borsh = ["dep:borsh", "cuprate-pruning/borsh"]
[dependencies]
cuprate-helper = { path = "../../helper" }
cuprate-helper = { path = "../../helper", features = ["asynch"], default-features = false }
cuprate-wire = { path = "../../net/wire", features = ["tracing"] }
cuprate-pruning = { path = "../../pruning" }

View file

@ -10,13 +10,15 @@ use tokio::{
task::JoinHandle,
};
use tokio_util::sync::PollSemaphore;
use tower::Service;
use tower::{Service, ServiceExt};
use tracing::Instrument;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_pruning::PruningSeed;
use crate::{
handles::ConnectionHandle, ConnectionDirection, NetworkZone, PeerError, PeerRequest,
PeerResponse, SharedError,
handles::{ConnectionGuard, ConnectionHandle},
ConnectionDirection, NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
};
mod connection;
@ -25,7 +27,6 @@ pub mod handshaker;
mod timeout_monitor;
pub use connector::{ConnectRequest, Connector};
use cuprate_pruning::PruningSeed;
pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError};
/// An internal identifier for a given peer, will be their address if known
@ -158,11 +159,70 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
permit: Some(permit),
};
self.connection_tx
.try_send(req)
.map_err(|_| ())
.expect("poll_ready should have been called");
if let Err(e) = self.connection_tx.try_send(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
use mpsc::error::TrySendError;
match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);
let _ = req
.response_channel
.send(Err(PeerError::ClientChannelClosed.into()));
}
}
}
rx.into()
}
}
/// Creates a mock [`Client`] for testing purposes.
///
/// `request_handler` will be used to handle requests sent to the [`Client`]
pub fn mock_client<Z: NetworkZone, S>(
info: PeerInformation<Z::Addr>,
connection_guard: ConnectionGuard,
mut request_handler: S,
) -> Client<Z>
where
S: crate::PeerRequestHandler,
{
let (tx, mut rx) = mpsc::channel(1);
let task_span = tracing::error_span!("mock_connection", addr = %info.id);
let task_handle = tokio::spawn(
async move {
let _guard = connection_guard;
loop {
let Some(req): Option<connection::ConnectionTaskRequest> = rx.recv().await else {
tracing::debug!("Channel closed, closing mock connection");
return;
};
tracing::debug!("Received new request: {:?}", req.request.id());
let res = request_handler
.ready()
.await
.unwrap()
.call(req.request)
.await
.unwrap();
tracing::debug!("Sending back response");
let _ = req.response_channel.send(Ok(res));
}
}
.instrument(task_span),
);
let timeout_task = tokio::spawn(futures::future::pending());
let semaphore = Arc::new(Semaphore::new(1));
let error_slot = SharedError::new();
Client::new(info, tx, task_handle, timeout_task, semaphore, error_slot)
}

View file

@ -4,7 +4,6 @@
//! sure the connection is still active.
use std::sync::Arc;
use cuprate_wire::admin::TimedSyncRequest;
use futures::channel::oneshot;
use tokio::{
sync::{mpsc, Semaphore},
@ -13,6 +12,8 @@ use tokio::{
use tower::ServiceExt;
use tracing::instrument;
use cuprate_wire::admin::TimedSyncRequest;
use crate::{
client::{connection::ConnectionTaskRequest, InternalPeerID},
constants::{MAX_PEERS_IN_PEER_LIST_MESSAGE, TIMEOUT_INTERVAL},

View file

@ -11,7 +11,8 @@ cuprate-wire = { path = "../../net/wire" }
cuprate-p2p-core = { path = "../p2p-core", features = ["borsh"] }
cuprate-address-book = { path = "../address-book" }
cuprate-pruning = { path = "../../pruning" }
cuprate-helper = { path = "../../helper", features = ["asynch"] }
cuprate-helper = { path = "../../helper", features = ["asynch"], default-features = false }
cuprate-async-buffer = { path = "../async-buffer" }
monero-serai = { workspace = true, features = ["std"] }
@ -26,13 +27,13 @@ dashmap = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true, features = ["std"] }
indexmap = { workspace = true, features = ["std"] }
rand = { workspace = true, features = ["std", "std_rng"] }
rand_distr = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-subscriber = "0.3.18"
[dev-dependencies]
cuprate-test-utils = { path = "../../test-utils" }
indexmap = { workspace = true }
proptest = { workspace = true }
tokio-test = { workspace = true }

View file

@ -0,0 +1,733 @@
//! # Block Downloader
//!
//! This module contains the [`BlockDownloader`], which finds a chain to
//! download from our connected peers and downloads it. See the actual
//! `struct` documentation for implementation details.
//!
//! The block downloader is started by [`download_blocks`].
use std::{
cmp::{max, min, Reverse},
collections::{BTreeMap, BinaryHeap},
sync::Arc,
time::Duration,
};
use futures::TryFutureExt;
use monero_serai::{block::Block, transaction::Transaction};
use tokio::{
task::JoinSet,
time::{interval, timeout, MissedTickBehavior},
};
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::{BufferAppender, BufferStream};
use cuprate_p2p_core::{
handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerSyncSvc,
};
use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
use crate::{
client_pool::{ClientPool, ClientPoolDropGuard},
constants::{
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN,
MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES,
},
};
mod block_queue;
mod chain_tracker;
mod download_batch;
mod request_chain;
#[cfg(test)]
mod tests;
use block_queue::{BlockQueue, ReadyQueueBatch};
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
use download_batch::download_batch_task;
use request_chain::{initial_chain_search, request_chain_entry_from_peer};
/// A downloaded batch of blocks.
#[derive(Debug, Clone)]
pub struct BlockBatch {
/// The blocks.
pub blocks: Vec<(Block, Vec<Transaction>)>,
/// The size in bytes of this batch.
pub size: usize,
/// The peer that gave us this batch.
pub peer_handle: ConnectionHandle,
}
/// The block downloader config.
#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Ord, Eq)]
pub struct BlockDownloaderConfig {
/// The size in bytes of the buffer between the block downloader and the place which
/// is consuming the downloaded blocks.
pub buffer_size: usize,
/// The size of the in progress queue (in bytes) at which we stop requesting more blocks.
pub in_progress_queue_size: usize,
/// The [`Duration`] between checking the client pool for free peers.
pub check_client_pool_interval: Duration,
/// The target size of a single batch of blocks (in bytes).
pub target_batch_size: usize,
/// The initial amount of blocks to request (in number of blocks)
pub initial_batch_size: usize,
}
/// An error that occurred in the [`BlockDownloader`].
#[derive(Debug, thiserror::Error)]
pub enum BlockDownloadError {
#[error("A request to a peer timed out.")]
TimedOut,
#[error("The block buffer was closed.")]
BufferWasClosed,
#[error("The peers we requested data from did not have all the data.")]
PeerDidNotHaveRequestedData,
#[error("The peers response to a request was invalid.")]
PeersResponseWasInvalid,
#[error("The chain we are following is invalid.")]
ChainInvalid,
#[error("Failed to find a more advanced chain to follow")]
FailedToFindAChainToFollow,
#[error("The peer did not send any overlapping blocks, unknown start height.")]
PeerSentNoOverlappingBlocks,
#[error("Service error: {0}")]
ServiceError(#[from] tower::BoxError),
}
/// The request type for the chain service.
pub enum ChainSvcRequest {
/// A request for the current chain history.
CompactHistory,
/// A request to find the first unknown block ID in a list of block IDs.
FindFirstUnknown(Vec<[u8; 32]>),
/// A request for our current cumulative difficulty.
CumulativeDifficulty,
}
/// The response type for the chain service.
pub enum ChainSvcResponse {
/// The response for [`ChainSvcRequest::CompactHistory`].
CompactHistory {
/// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block.
///
/// These blocks should be in reverse chronological order, not every block is needed.
block_ids: Vec<[u8; 32]>,
/// The current cumulative difficulty of the chain.
cumulative_difficulty: u128,
},
/// The response for [`ChainSvcRequest::FindFirstUnknown`].
///
/// Contains the index of the first unknown block and its expected height.
FindFirstUnknown(usize, u64),
/// The response for [`ChainSvcRequest::CumulativeDifficulty`].
///
/// The current cumulative difficulty of our chain.
CumulativeDifficulty(u128),
}
/// This function starts the block downloader and returns a [`BufferStream`] that will produce
/// a sequential stream of blocks.
///
/// The block downloader will pick the longest chain and will follow it for as long as possible,
/// the blocks given from the [`BufferStream`] will be in order.
///
/// The block downloader may fail before the whole chain is downloaded. If this is the case you can
/// call this function again, so it can start the search again.
#[instrument(level = "error", skip_all, name = "block_downloader")]
pub fn download_blocks<N: NetworkZone, S, C>(
client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S,
our_chain_svc: C,
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
S: PeerSyncSvc<N> + Clone,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
{
let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size);
let block_downloader = BlockDownloader::new(
client_pool,
peer_sync_svc,
our_chain_svc,
buffer_appender,
config,
);
tokio::spawn(
block_downloader
.run()
.inspect_err(|e| tracing::debug!("Error downloading blocks: {e}"))
.instrument(Span::current()),
);
buffer_stream
}
/// # Block Downloader
///
/// This is the block downloader, which finds a chain to follow and attempts to follow it, adding the
/// downloaded blocks to an [`async_buffer`].
///
/// ## Implementation Details
///
/// The first step to downloading blocks is to find a chain to follow, this is done by [`initial_chain_search`],
/// docs can be found on that function for details on how this is done.
///
/// With an initial list of block IDs to follow the block downloader will then look for available peers
/// to download blocks from.
///
/// For each peer we will then allocate a batch of blocks for them to retrieve, as these blocks come in
/// we add them to the [`BlockQueue`] for pushing into the [`async_buffer`], once we have the oldest block downloaded
/// we send it into the buffer, repeating this until the oldest current block is still being downloaded.
///
/// When a peer has finished downloading blocks we add it to our list of ready peers, so it can be used to
/// request more data from.
///
/// Ready peers will either:
/// - download the next batch of blocks
/// - request the next chain entry
/// - download an already requested batch of blocks (this might happen due to an error in the previous request
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it).
struct BlockDownloader<N: NetworkZone, S, C> {
/// The client pool.
client_pool: Arc<ClientPool<N>>,
/// The service that holds the peer's sync states.
peer_sync_svc: S,
/// The service that holds our current chain state.
our_chain_svc: C,
/// The amount of blocks to request in the next batch.
amount_of_blocks_to_request: usize,
/// The height at which [`Self::amount_of_blocks_to_request`] was updated.
amount_of_blocks_to_request_updated_at: u64,
/// The amount of consecutive empty chain entries we received.
///
/// An empty chain entry means we reached the peer's chain tip.
amount_of_empty_chain_entries: usize,
/// The running block download tasks.
block_download_tasks: JoinSet<BlockDownloadTaskResponse<N>>,
/// The running chain entry tasks.
///
/// Returns a result of the chain entry or an error.
#[allow(clippy::type_complexity)]
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
/// The current inflight requests.
///
/// This is a map of batch start heights to block IDs and related information of the batch.
inflight_requests: BTreeMap<u64, BlocksToRetrieve<N>>,
/// A queue of start heights from failed batches that should be retried.
///
/// Wrapped in [`Reverse`] so we prioritize early batches.
failed_batches: BinaryHeap<Reverse<u64>>,
block_queue: BlockQueue,
/// The [`BlockDownloaderConfig`].
config: BlockDownloaderConfig,
}
impl<N: NetworkZone, S, C> BlockDownloader<N, S, C>
where
S: PeerSyncSvc<N> + Clone,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
{
/// Creates a new [`BlockDownloader`]
fn new(
client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S,
our_chain_svc: C,
buffer_appender: BufferAppender<BlockBatch>,
config: BlockDownloaderConfig,
) -> Self {
Self {
client_pool,
peer_sync_svc,
our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_size,
amount_of_blocks_to_request_updated_at: 0,
amount_of_empty_chain_entries: 0,
block_download_tasks: JoinSet::new(),
chain_entry_task: JoinSet::new(),
inflight_requests: BTreeMap::new(),
block_queue: BlockQueue::new(buffer_appender),
failed_batches: BinaryHeap::new(),
config,
}
}
/// Checks if we can make use of any peers that are currently pending requests.
async fn check_pending_peers(
&mut self,
chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
) {
tracing::debug!("Checking if we can give any work to pending peers.");
for (_, peers) in pending_peers.iter_mut() {
while let Some(peer) = peers.pop() {
if peer.info.handle.is_closed() {
// Peer has disconnected, drop it.
continue;
}
if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await {
// This peer is ok however it does not have the data we currently need, this will only happen
// because of its pruning seed so just skip over all peers with this pruning seed.
peers.push(peer);
break;
}
}
}
}
/// Attempts to send another request for an inflight batch
///
/// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request
/// for them.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed.
async fn request_inflight_batch_again(
&mut self,
client: ClientPoolDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> {
tracing::debug!(
"Requesting an inflight batch, current ready queue size: {}",
self.block_queue.size()
);
assert!(
!self.inflight_requests.is_empty(),
"We need requests inflight to be able to send the request again",
);
let oldest_ready_batch = self.block_queue.oldest_ready_batch().unwrap();
for (_, in_flight_batch) in self.inflight_requests.range_mut(0..oldest_ready_batch) {
if in_flight_batch.requests_sent >= 2 {
continue;
}
if !client_has_block_in_range(
&client.info.pruning_seed,
in_flight_batch.start_height,
in_flight_batch.ids.len(),
) {
return Some(client);
}
self.block_download_tasks.spawn(download_batch_task(
client,
in_flight_batch.ids.clone(),
in_flight_batch.prev_id,
in_flight_batch.start_height,
in_flight_batch.requests_sent,
));
return None;
}
tracing::debug!("Could not find an inflight request applicable for this peer.");
Some(client)
}
/// Spawns a task to request blocks from the given peer.
///
/// The batch requested will depend on our current state, failed batches will be prioritised.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed.
async fn request_block_batch(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> {
tracing::trace!("Using peer to request a batch of blocks.");
// First look to see if we have any failed requests.
while let Some(failed_request) = self.failed_batches.peek() {
// Check if we still have the request that failed - another peer could have completed it after
// failure.
let Some(request) = self.inflight_requests.get_mut(&failed_request.0) else {
// We don't have the request in flight so remove the failure.
self.failed_batches.pop();
continue;
};
// Check if this peer has the blocks according to their pruning seed.
if client_has_block_in_range(
&client.info.pruning_seed,
request.start_height,
request.ids.len(),
) {
tracing::debug!("Using peer to request a failed batch");
// They should have the blocks so send the re-request to this peer.
request.requests_sent += 1;
self.block_download_tasks.spawn(download_batch_task(
client,
request.ids.clone(),
request.prev_id,
request.start_height,
request.requests_sent,
));
// Remove the failure, we have just handled it.
self.failed_batches.pop();
return None;
}
// The peer doesn't have the batch according to its pruning seed.
break;
}
// If our ready queue is too large send duplicate requests for the blocks we are waiting on.
if self.block_queue.size() >= self.config.in_progress_queue_size {
return self.request_inflight_batch_again(client).await;
}
// No failed requests that we can handle, request some new blocks.
let Some(mut block_entry_to_get) = chain_tracker
.blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request)
else {
return Some(client);
};
tracing::debug!("Requesting a new batch of blocks");
block_entry_to_get.requests_sent = 1;
self.inflight_requests
.insert(block_entry_to_get.start_height, block_entry_to_get.clone());
self.block_download_tasks.spawn(download_batch_task(
client,
block_entry_to_get.ids.clone(),
block_entry_to_get.prev_id,
block_entry_to_get.start_height,
block_entry_to_get.requests_sent,
));
None
}
/// Attempts to give work to a free client.
///
/// This function will use our current state to decide if we should send a request for a chain entry
/// or if we should request a batch of blocks.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed.
async fn try_handle_free_client(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> {
// We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup.
if self.chain_entry_task.len() < 2
// If we have had too many failures then assume the tip has been found so no more chain entries.
&& self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED
// Check we have a big buffer of pending block IDs to retrieve, we don't want to be waiting around
// for a chain entry.
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 500
// Make sure this peer actually has the chain.
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
tracing::debug!("Requesting next chain entry");
let history = chain_tracker.get_simple_history();
self.chain_entry_task.spawn(
async move {
timeout(
BLOCK_DOWNLOADER_REQUEST_TIMEOUT,
request_chain_entry_from_peer(client, history),
)
.await
.map_err(|_| BlockDownloadError::TimedOut)?
}
.instrument(tracing::debug_span!(
"request_chain_entry",
current_height = chain_tracker.top_height()
)),
);
return None;
}
// Request a batch of blocks instead.
self.request_block_batch(chain_tracker, client).await
}
/// Checks the [`ClientPool`] for free peers.
async fn check_for_free_clients(
&mut self,
chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
) -> Result<(), BlockDownloadError> {
tracing::debug!("Checking for free peers");
// This value might be slightly behind but that's ok.
let ChainSvcResponse::CumulativeDifficulty(current_cumulative_difficulty) = self
.our_chain_svc
.ready()
.await?
.call(ChainSvcRequest::CumulativeDifficulty)
.await?
else {
panic!("Chain service returned wrong response.");
};
let PeerSyncResponse::PeersToSyncFrom(peers) = self
.peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::PeersToSyncFrom {
current_cumulative_difficulty,
block_needed: None,
})
.await?
else {
panic!("Peer sync service returned wrong response.");
};
tracing::debug!("Response received from peer sync service");
for client in self.client_pool.borrow_clients(&peers) {
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
}
self.check_pending_peers(chain_tracker, pending_peers).await;
Ok(())
}
/// Handles a response to a request to get blocks from a peer.
async fn handle_download_batch_res(
&mut self,
start_height: u64,
res: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
) -> Result<(), BlockDownloadError> {
tracing::debug!("Handling block download response");
match res {
Err(e) => {
if matches!(e, BlockDownloadError::ChainInvalid) {
// If the chain was invalid ban the peer who told us about it and error here to stop the
// block downloader.
self.inflight_requests.get(&start_height).inspect(|entry| {
tracing::warn!(
"Received an invalid chain from peer: {}, exiting block downloader (it should be restarted).",
entry.peer_who_told_us
);
entry.peer_who_told_us_handle.ban_peer(LONG_BAN);
});
return Err(e);
}
// Add the request to the failed list.
if let Some(batch) = self.inflight_requests.get_mut(&start_height) {
tracing::debug!("Error downloading batch: {e}");
batch.failures += 1;
if batch.failures > MAX_DOWNLOAD_FAILURES {
tracing::debug!(
"Too many errors downloading blocks, stopping the block downloader."
);
return Err(BlockDownloadError::TimedOut);
}
self.failed_batches.push(Reverse(start_height));
}
Ok(())
}
Ok((client, block_batch)) => {
// Remove the batch from the inflight batches.
if self.inflight_requests.remove(&start_height).is_none() {
tracing::debug!("Already retrieved batch");
// If it was already retrieved then there is nothing else to do.
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(chain_tracker, pending_peers).await;
return Ok(());
};
// If the batch is higher than the last time we updated `amount_of_blocks_to_request`, update it
// again.
if start_height > self.amount_of_blocks_to_request_updated_at {
self.amount_of_blocks_to_request = calculate_next_block_batch_size(
block_batch.size,
block_batch.blocks.len(),
self.config.target_batch_size,
);
tracing::debug!(
"Updating batch size of new batches, new size: {}",
self.amount_of_blocks_to_request
);
self.amount_of_blocks_to_request_updated_at = start_height;
}
self.block_queue
.add_incoming_batch(
ReadyQueueBatch {
start_height,
block_batch,
},
self.inflight_requests.first_key_value().map(|(k, _)| *k),
)
.await?;
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(chain_tracker, pending_peers).await;
Ok(())
}
}
}
/// Starts the main loop of the block downloader.
async fn run(mut self) -> Result<(), BlockDownloadError> {
let mut chain_tracker = initial_chain_search(
&self.client_pool,
self.peer_sync_svc.clone(),
&mut self.our_chain_svc,
)
.await?;
let mut pending_peers = BTreeMap::new();
tracing::info!("Attempting to download blocks from peers, this may take a while.");
let mut check_client_pool_interval = interval(self.config.check_client_pool_interval);
check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.check_for_free_clients(&mut chain_tracker, &mut pending_peers)
.await?;
loop {
tokio::select! {
_ = check_client_pool_interval.tick() => {
tracing::debug!("Checking client pool for free peers, timer fired.");
self.check_for_free_clients(&mut chain_tracker, &mut pending_peers).await?;
// If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
tracing::debug!("Failed to find any more chain entries, probably fround the top");
return Ok(());
}
}
Some(res) = self.block_download_tasks.join_next() => {
let BlockDownloadTaskResponse {
start_height,
result
} = res.expect("Download batch future panicked");
self.handle_download_batch_res(start_height, result, &mut chain_tracker, &mut pending_peers).await?;
// If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
tracing::debug!("Failed to find any more chain entries, probably fround the top");
return Ok(());
}
}
Some(Ok(res)) = self.chain_entry_task.join_next() => {
match res {
Ok((client, entry)) => {
if chain_tracker.add_entry(entry).is_ok() {
tracing::debug!("Successfully added chain entry to chain tracker.");
self.amount_of_empty_chain_entries = 0;
} else {
tracing::debug!("Failed to add incoming chain entry to chain tracker.");
self.amount_of_empty_chain_entries += 1;
}
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(&mut chain_tracker, &mut pending_peers).await;
}
Err(_) => self.amount_of_empty_chain_entries += 1
}
}
}
}
}
}
/// The return value from the block download tasks.
struct BlockDownloadTaskResponse<N: NetworkZone> {
/// The start height of the batch.
start_height: u64,
/// A result containing the batch or an error.
result: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
}
/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].
fn client_has_block_in_range(pruning_seed: &PruningSeed, start_height: u64, length: usize) -> bool {
pruning_seed.has_full_block(start_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
&& pruning_seed.has_full_block(
start_height + u64::try_from(length).unwrap(),
CRYPTONOTE_MAX_BLOCK_HEIGHT,
)
}
/// Calculates the next amount of blocks to request in a batch.
///
/// Parameters:
/// - `previous_batch_size` is the size, in bytes, of the last batch
/// - `previous_batch_len` is the amount of blocks in the last batch
/// - `target_batch_size` is the target size, in bytes, of a batch
fn calculate_next_block_batch_size(
previous_batch_size: usize,
previous_batch_len: usize,
target_batch_size: usize,
) -> usize {
// The average block size of the last batch of blocks, multiplied by 2 as a safety margin for
// future blocks.
let adjusted_average_block_size = max((previous_batch_size * 2) / previous_batch_len, 1);
// Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size.
let next_batch_len = max(target_batch_size / adjusted_average_block_size, 1);
// Cap the amount of growth to 1.5x the previous batch len, to prevent a small block causing us to request
// a huge amount of blocks.
let next_batch_len = min(next_batch_len, (previous_batch_len * 3).div_ceil(2));
// Cap the length to the maximum allowed.
min(next_batch_len, MAX_BLOCK_BATCH_LEN)
}

View file

@ -0,0 +1,172 @@
use std::{cmp::Ordering, collections::BinaryHeap};
use cuprate_async_buffer::BufferAppender;
use super::{BlockBatch, BlockDownloadError};
/// A batch of blocks in the ready queue, waiting for previous blocks to come in, so they can
/// be passed into the buffer.
///
/// The [`Eq`] and [`Ord`] impl on this type will only take into account the `start_height`, this
/// is because the block downloader will only download one chain at once so no 2 batches can have
/// the same `start_height`.
///
/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
#[derive(Debug, Clone)]
pub struct ReadyQueueBatch {
/// The start height of the batch.
pub start_height: u64,
/// The batch of blocks.
pub block_batch: BlockBatch,
}
impl Eq for ReadyQueueBatch {}
impl PartialEq<Self> for ReadyQueueBatch {
fn eq(&self, other: &Self) -> bool {
self.start_height.eq(&other.start_height)
}
}
impl PartialOrd<Self> for ReadyQueueBatch {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ReadyQueueBatch {
fn cmp(&self, other: &Self) -> Ordering {
// reverse the ordering so older blocks (lower height) come first in a [`BinaryHeap`]
self.start_height.cmp(&other.start_height).reverse()
}
}
/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
/// oldest batch has been downloaded.
pub struct BlockQueue {
/// A queue of ready batches.
ready_batches: BinaryHeap<ReadyQueueBatch>,
/// The size, in bytes, of all the batches in [`Self::ready_batches`].
ready_batches_size: usize,
/// The [`BufferAppender`] that gives blocks to Cuprate.
buffer_appender: BufferAppender<BlockBatch>,
}
impl BlockQueue {
/// Creates a new [`BlockQueue`].
pub fn new(buffer_appender: BufferAppender<BlockBatch>) -> BlockQueue {
BlockQueue {
ready_batches: BinaryHeap::new(),
ready_batches_size: 0,
buffer_appender,
}
}
/// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
pub fn oldest_ready_batch(&self) -> Option<u64> {
self.ready_batches.peek().map(|batch| batch.start_height)
}
/// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
pub fn size(&self) -> usize {
self.ready_batches_size
}
/// Adds an incoming batch to the queue and checks if we can push any batches into the [`async_buffer`].
///
/// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
/// there are no batches inflight then this should be [`None`].
pub async fn add_incoming_batch(
&mut self,
new_batch: ReadyQueueBatch,
oldest_in_flight_start_height: Option<u64>,
) -> Result<(), BlockDownloadError> {
self.ready_batches_size += new_batch.block_batch.size;
self.ready_batches.push(new_batch);
// The height to stop pushing batches into the buffer.
let height_to_stop_at = oldest_in_flight_start_height.unwrap_or(u64::MAX);
while self
.ready_batches
.peek()
.is_some_and(|batch| batch.start_height <= height_to_stop_at)
{
let batch = self
.ready_batches
.pop()
.expect("We just checked we have a batch in the buffer");
let batch_size = batch.block_batch.size;
self.ready_batches_size -= batch_size;
self.buffer_appender
.send(batch.block_batch, batch_size)
.await
.map_err(|_| BlockDownloadError::BufferWasClosed)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use std::{collections::BTreeSet, sync::Arc};
use proptest::{collection::vec, prelude::*};
use tokio::sync::Semaphore;
use tokio_test::block_on;
use cuprate_p2p_core::handles::HandleBuilder;
use super::*;
prop_compose! {
fn ready_batch_strategy()(start_height in 0_u64..500_000_000) -> ReadyQueueBatch {
// TODO: The permit will not be needed here when
let (_, peer_handle) = HandleBuilder::new().with_permit(Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap()).build();
ReadyQueueBatch {
start_height,
block_batch: BlockBatch {
blocks: vec![],
size: start_height as usize,
peer_handle,
},
}
}
}
proptest! {
#[test]
fn block_queue_returns_items_in_order(batches in vec(ready_batch_strategy(), 0..10_000)) {
block_on(async move {
let (buffer_tx, mut buffer_rx) = cuprate_async_buffer::new_buffer(usize::MAX);
let mut queue = BlockQueue::new(buffer_tx);
let mut sorted_batches = BTreeSet::from_iter(batches.clone());
let mut soreted_batch_2 = sorted_batches.clone();
for batch in batches {
if sorted_batches.remove(&batch) {
queue.add_incoming_batch(batch, sorted_batches.last().map(|batch| batch.start_height)).await.unwrap();
}
}
assert_eq!(queue.size(), 0);
assert!(queue.oldest_ready_batch().is_none());
drop(queue);
while let Some(batch) = buffer_rx.next().await {
let last_batch = soreted_batch_2.pop_last().unwrap();
assert_eq!(batch.size, last_batch.block_batch.size);
}
});
}
}
}

View file

@ -0,0 +1,211 @@
use std::{cmp::min, collections::VecDeque};
use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_p2p_core::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
use crate::constants::MEDIUM_BAN;
/// A new chain entry to add to our chain tracker.
#[derive(Debug)]
pub(crate) struct ChainEntry<N: NetworkZone> {
/// A list of block IDs.
pub ids: Vec<[u8; 32]>,
/// The peer who told us about this chain entry.
pub peer: InternalPeerID<N::Addr>,
/// The peer who told us about this chain entry's handle
pub handle: ConnectionHandle,
}
/// A batch of blocks to retrieve.
#[derive(Clone)]
pub struct BlocksToRetrieve<N: NetworkZone> {
/// The block IDs to get.
pub ids: ByteArrayVec<32>,
/// The hash of the last block before this batch.
pub prev_id: [u8; 32],
/// The expected height of the first block in [`BlocksToRetrieve::ids`].
pub start_height: u64,
/// The peer who told us about this batch.
pub peer_who_told_us: InternalPeerID<N::Addr>,
/// The peer who told us about this batch's handle.
pub peer_who_told_us_handle: ConnectionHandle,
/// The number of requests sent for this batch.
pub requests_sent: usize,
/// The number of times this batch has been requested from a peer and failed.
pub failures: usize,
}
/// An error returned from the [`ChainTracker`].
#[derive(Debug, Clone)]
pub enum ChainTrackerError {
/// The new chain entry is invalid.
NewEntryIsInvalid,
/// The new chain entry does not follow from the top of our chain tracker.
NewEntryDoesNotFollowChain,
}
/// # Chain Tracker
///
/// This struct allows following a single chain. It takes in [`ChainEntry`]s and
/// allows getting [`BlocksToRetrieve`].
pub struct ChainTracker<N: NetworkZone> {
/// A list of [`ChainEntry`]s, in order.
entries: VecDeque<ChainEntry<N>>,
/// The height of the first block, in the first entry in [`Self::entries`].
first_height: u64,
/// The hash of the last block in the last entry.
top_seen_hash: [u8; 32],
/// The hash of the block one below [`Self::first_height`].
previous_hash: [u8; 32],
/// The hash of the genesis block.
our_genesis: [u8; 32],
}
impl<N: NetworkZone> ChainTracker<N> {
/// Creates a new chain tracker.
pub fn new(
new_entry: ChainEntry<N>,
first_height: u64,
our_genesis: [u8; 32],
previous_hash: [u8; 32],
) -> Self {
let top_seen_hash = *new_entry.ids.last().unwrap();
let mut entries = VecDeque::with_capacity(1);
entries.push_back(new_entry);
Self {
top_seen_hash,
entries,
first_height,
previous_hash,
our_genesis,
}
}
/// Returns `true` if the peer is expected to have the next block after our highest seen block
/// according to their pruning seed.
pub fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool {
seed.has_full_block(self.top_height(), CRYPTONOTE_MAX_BLOCK_HEIGHT)
}
/// Returns the simple history, the highest seen block and the genesis block.
pub fn get_simple_history(&self) -> [[u8; 32]; 2] {
[self.top_seen_hash, self.our_genesis]
}
/// Returns the height of the highest block we are tracking.
pub fn top_height(&self) -> u64 {
let top_block_idx = self
.entries
.iter()
.map(|entry| entry.ids.len())
.sum::<usize>();
self.first_height + u64::try_from(top_block_idx).unwrap()
}
/// Returns the total number of queued batches for a certain `batch_size`.
///
/// # Panics
/// This function panics if `batch_size` is `0`.
pub fn block_requests_queued(&self, batch_size: usize) -> usize {
self.entries
.iter()
.map(|entry| entry.ids.len().div_ceil(batch_size))
.sum()
}
/// Attempts to add an incoming [`ChainEntry`] to the chain tracker.
pub fn add_entry(&mut self, mut chain_entry: ChainEntry<N>) -> Result<(), ChainTrackerError> {
if chain_entry.ids.is_empty() {
// The peer must send at lest one overlapping block.
chain_entry.handle.ban_peer(MEDIUM_BAN);
return Err(ChainTrackerError::NewEntryIsInvalid);
}
if chain_entry.ids.len() == 1 {
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
if self
.entries
.back()
.is_some_and(|last_entry| last_entry.ids.last().unwrap() != &chain_entry.ids[0])
{
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
let new_entry = ChainEntry {
// ignore the first block - we already know it.
ids: chain_entry.ids.split_off(1),
peer: chain_entry.peer,
handle: chain_entry.handle,
};
self.top_seen_hash = *new_entry.ids.last().unwrap();
self.entries.push_back(new_entry);
Ok(())
}
/// Returns a batch of blocks to request.
///
/// The returned batches length will be less than or equal to `max_blocks`
pub fn blocks_to_get(
&mut self,
pruning_seed: &PruningSeed,
max_blocks: usize,
) -> Option<BlocksToRetrieve<N>> {
if !pruning_seed.has_full_block(self.first_height, CRYPTONOTE_MAX_BLOCK_HEIGHT) {
return None;
}
let entry = self.entries.front_mut()?;
// Calculate the ending index for us to get in this batch, it will be one of these:
// - smallest out of `max_blocks`
// - length of the batch
// - index of the next pruned block for this seed
let end_idx = min(
min(entry.ids.len(), max_blocks),
usize::try_from(
pruning_seed
.get_next_pruned_block(self.first_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
.expect("We use local values to calculate height which should be below the sanity limit")
// Use a big value as a fallback if the seed does no pruning.
.unwrap_or(CRYPTONOTE_MAX_BLOCK_HEIGHT)
- self.first_height,
)
.unwrap(),
);
if end_idx == 0 {
return None;
}
let ids_to_get = entry.ids.drain(0..end_idx).collect::<Vec<_>>();
let blocks = BlocksToRetrieve {
ids: ids_to_get.into(),
prev_id: self.previous_hash,
start_height: self.first_height,
peer_who_told_us: entry.peer,
peer_who_told_us_handle: entry.handle.clone(),
requests_sent: 0,
failures: 0,
};
self.first_height += u64::try_from(end_idx).unwrap();
// TODO: improve ByteArrayVec API.
self.previous_hash = blocks.ids[blocks.ids.len() - 1];
if entry.ids.is_empty() {
self.entries.pop_front();
}
Some(blocks)
}
}

View file

@ -0,0 +1,199 @@
use std::collections::HashSet;
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tokio::time::timeout;
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse};
use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
use crate::{
block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse},
client_pool::ClientPoolDropGuard,
constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
};
/// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`].
#[instrument(
level = "debug",
name = "download_batch",
skip_all,
fields(
start_height = expected_start_height,
attempt = _attempt
)
)]
pub async fn download_batch_task<N: NetworkZone>(
client: ClientPoolDropGuard<N>,
ids: ByteArrayVec<32>,
previous_id: [u8; 32],
expected_start_height: u64,
_attempt: usize,
) -> BlockDownloadTaskResponse<N> {
BlockDownloadTaskResponse {
start_height: expected_start_height,
result: request_batch_from_peer(client, ids, previous_id, expected_start_height).await,
}
}
/// Requests a sequential batch of blocks from a peer.
///
/// This function will validate the blocks that were downloaded were the ones asked for and that they match
/// the expected height.
async fn request_batch_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>,
ids: ByteArrayVec<32>,
previous_id: [u8; 32],
expected_start_height: u64,
) -> Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError> {
// Request the blocks.
let blocks_response = timeout(BLOCK_DOWNLOADER_REQUEST_TIMEOUT, async {
let PeerResponse::GetObjects(blocks_response) = client
.ready()
.await?
.call(PeerRequest::GetObjects(GetObjectsRequest {
blocks: ids.clone(),
pruned: false,
}))
.await?
else {
panic!("Connection task returned wrong response.");
};
Ok::<_, BlockDownloadError>(blocks_response)
})
.await
.map_err(|_| BlockDownloadError::TimedOut)??;
// Initial sanity checks
if blocks_response.blocks.len() > ids.len() {
client.info.handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
if blocks_response.blocks.len() != ids.len() {
return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
}
let peer_handle = client.info.handle.clone();
let blocks = rayon_spawn_async(move || {
deserialize_batch(
blocks_response,
expected_start_height,
ids,
previous_id,
peer_handle,
)
})
.await;
let batch = blocks.inspect_err(|e| {
// If the peers response was invalid, ban it.
if matches!(e, BlockDownloadError::PeersResponseWasInvalid) {
client.info.handle.ban_peer(MEDIUM_BAN);
}
})?;
Ok((client, batch))
}
fn deserialize_batch(
blocks_response: GetObjectsResponse,
expected_start_height: u64,
requested_ids: ByteArrayVec<32>,
previous_id: [u8; 32],
peer_handle: ConnectionHandle,
) -> Result<BlockBatch, BlockDownloadError> {
let blocks = blocks_response
.blocks
.into_par_iter()
.enumerate()
.map(|(i, block_entry)| {
let expected_height = u64::try_from(i).unwrap() + expected_start_height;
let mut size = block_entry.block.len();
let block = Block::read(&mut block_entry.block.as_ref())
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
let block_hash = block.hash();
// Check the block matches the one requested and the peer sent enough transactions.
if requested_ids[i] != block_hash || block.txs.len() != block_entry.txs.len() {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
// Check that the previous ID is correct for the first block.
// This is to protect use against banning the wrong peer.
// This must happen after the hash check.
if i == 0 && block.header.previous != previous_id {
tracing::warn!(
"Invalid chain, peer told us a block follows the chain when it doesn't."
);
// This peer probably did nothing wrong, it was the peer who told us this blockID which
// is misbehaving.
return Err(BlockDownloadError::ChainInvalid);
}
// Check the height lines up as expected.
// This must happen after the hash check.
if !block
.number()
.is_some_and(|height| height == expected_height)
{
tracing::warn!(
"Invalid chain, expected height: {expected_height}, got height: {:?}",
block.number()
);
// This peer probably did nothing wrong, it was the peer who told us this blockID which
// is misbehaving.
return Err(BlockDownloadError::ChainInvalid);
}
// Deserialize the transactions.
let txs = block_entry
.txs
.take_normal()
.ok_or(BlockDownloadError::PeersResponseWasInvalid)?
.into_iter()
.map(|tx_blob| {
size += tx_blob.len();
if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
Transaction::read(&mut tx_blob.as_ref())
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)
})
.collect::<Result<Vec<_>, _>>()?;
// Make sure the transactions in the block were the ones the peer sent.
let mut expected_txs = block.txs.iter().collect::<HashSet<_>>();
for tx in &txs {
if !expected_txs.remove(&tx.hash()) {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
}
if !expected_txs.is_empty() {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
Ok(((block, txs), size))
})
.collect::<Result<(Vec<_>, Vec<_>), _>>()?;
Ok(BlockBatch {
blocks: blocks.0,
size: blocks.1.into_iter().sum(),
peer_handle,
})
}

View file

@ -0,0 +1,238 @@
use std::{mem, sync::Arc};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::{task::JoinSet, time::timeout};
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use cuprate_p2p_core::{
client::InternalPeerID,
handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
};
use cuprate_wire::protocol::{ChainRequest, ChainResponse};
use crate::{
block_downloader::{
chain_tracker::{ChainEntry, ChainTracker},
BlockDownloadError, ChainSvcRequest, ChainSvcResponse,
},
client_pool::{ClientPool, ClientPoolDropGuard},
constants::{
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, INITIAL_CHAIN_REQUESTS_TO_SEND,
MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MEDIUM_BAN,
},
};
/// Request a chain entry from a peer.
///
/// Because the block downloader only follows and downloads one chain we only have to send the block hash of
/// top block we have found and the genesis block, this is then called `short_history`.
pub async fn request_chain_entry_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>,
short_history: [[u8; 32]; 2],
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
let PeerResponse::GetChain(chain_res) = client
.ready()
.await?
.call(PeerRequest::GetChain(ChainRequest {
block_ids: short_history.into(),
prune: true,
}))
.await?
else {
panic!("Connection task returned wrong response!");
};
if chain_res.m_block_ids.is_empty()
|| chain_res.m_block_ids.len() > MAX_BLOCKS_IDS_IN_CHAIN_ENTRY
{
client.info.handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
// We must have at least one overlapping block.
if !(chain_res.m_block_ids[0] == short_history[0]
|| chain_res.m_block_ids[0] == short_history[1])
{
client.info.handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
// If the genesis is the overlapping block then this peer does not have our top tracked block in
// its chain.
if chain_res.m_block_ids[0] == short_history[1] {
return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
}
let entry = ChainEntry {
ids: (&chain_res.m_block_ids).into(),
peer: client.info.id,
handle: client.info.handle.clone(),
};
Ok((client, entry))
}
/// Initial chain search, this function pulls [`INITIAL_CHAIN_REQUESTS_TO_SEND`] peers from the [`ClientPool`]
/// and sends chain requests to all of them.
///
/// We then wait for their response and choose the peer who claims the highest cumulative difficulty.
#[instrument(level = "error", skip_all)]
pub async fn initial_chain_search<N: NetworkZone, S, C>(
client_pool: &Arc<ClientPool<N>>,
mut peer_sync_svc: S,
mut our_chain_svc: C,
) -> Result<ChainTracker<N>, BlockDownloadError>
where
S: PeerSyncSvc<N>,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>,
{
tracing::debug!("Getting our chain history");
// Get our history.
let ChainSvcResponse::CompactHistory {
block_ids,
cumulative_difficulty,
} = our_chain_svc
.ready()
.await?
.call(ChainSvcRequest::CompactHistory)
.await?
else {
panic!("chain service sent wrong response.");
};
let our_genesis = *block_ids.last().expect("Blockchain had no genesis block.");
tracing::debug!("Getting a list of peers with higher cumulative difficulty");
let PeerSyncResponse::PeersToSyncFrom(mut peers) = peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::PeersToSyncFrom {
block_needed: None,
current_cumulative_difficulty: cumulative_difficulty,
})
.await?
else {
panic!("peer sync service sent wrong response.");
};
tracing::debug!(
"{} peers claim they have a higher cumulative difficulty",
peers.len()
);
// Shuffle the list to remove any possibility of peers being able to prioritize getting picked.
peers.shuffle(&mut thread_rng());
let mut peers = client_pool.borrow_clients(&peers);
let mut futs = JoinSet::new();
let req = PeerRequest::GetChain(ChainRequest {
block_ids: block_ids.into(),
prune: false,
});
tracing::debug!("Sending requests for chain entries.");
// Send the requests.
while futs.len() < INITIAL_CHAIN_REQUESTS_TO_SEND {
let Some(mut next_peer) = peers.next() else {
break;
};
let cloned_req = req.clone();
futs.spawn(timeout(
BLOCK_DOWNLOADER_REQUEST_TIMEOUT,
async move {
let PeerResponse::GetChain(chain_res) =
next_peer.ready().await?.call(cloned_req).await?
else {
panic!("connection task returned wrong response!");
};
Ok::<_, tower::BoxError>((
chain_res,
next_peer.info.id,
next_peer.info.handle.clone(),
))
}
.instrument(Span::current()),
));
}
let mut res: Option<(ChainResponse, InternalPeerID<_>, ConnectionHandle)> = None;
// Wait for the peers responses.
while let Some(task_res) = futs.join_next().await {
let Ok(Ok(task_res)) = task_res.unwrap() else {
continue;
};
match &mut res {
Some(res) => {
// res has already been set, replace it if this peer claims higher cumulative difficulty
if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() {
let _ = mem::replace(res, task_res);
}
}
None => {
// res has not been set, set it now;
res = Some(task_res);
}
}
}
let Some((chain_res, peer_id, peer_handle)) = res else {
return Err(BlockDownloadError::FailedToFindAChainToFollow);
};
let hashes: Vec<[u8; 32]> = (&chain_res.m_block_ids).into();
// drop this to deallocate the [`Bytes`].
drop(chain_res);
tracing::debug!("Highest chin entry contained {} block Ids", hashes.len());
// Find the first unknown block in the batch.
let ChainSvcResponse::FindFirstUnknown(first_unknown, expected_height) = our_chain_svc
.ready()
.await?
.call(ChainSvcRequest::FindFirstUnknown(hashes.clone()))
.await?
else {
panic!("chain service sent wrong response.");
};
// The peer must send at least one block we already know.
if first_unknown == 0 {
peer_handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeerSentNoOverlappingBlocks);
}
// We know all the blocks already
// TODO: The peer could still be on a different chain, however the chain might just be too far split.
if first_unknown == hashes.len() {
return Err(BlockDownloadError::FailedToFindAChainToFollow);
}
let previous_id = hashes[first_unknown - 1];
let first_entry = ChainEntry {
ids: hashes[first_unknown..].to_vec(),
peer: peer_id,
handle: peer_handle,
};
tracing::debug!(
"Creating chain tracker with {} new block Ids",
first_entry.ids.len()
);
let tracker = ChainTracker::new(first_entry, expected_height, our_genesis, previous_id);
Ok(tracker)
}

View file

@ -0,0 +1,323 @@
use std::{
fmt::{Debug, Formatter},
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use futures::{FutureExt, StreamExt};
use indexmap::IndexMap;
use monero_serai::{
block::{Block, BlockHeader},
ringct::{RctBase, RctPrunable, RctSignatures},
transaction::{Input, Timelock, Transaction, TransactionPrefix},
};
use proptest::{collection::vec, prelude::*};
use tokio::{sync::Semaphore, time::timeout};
use tower::{service_fn, Service};
use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_p2p_core::{
client::{mock_client, Client, InternalPeerID, PeerInformation},
network_zones::ClearNet,
services::{PeerSyncRequest, PeerSyncResponse},
ConnectionDirection, NetworkZone, PeerRequest, PeerResponse,
};
use cuprate_pruning::PruningSeed;
use cuprate_wire::{
common::{BlockCompleteEntry, TransactionBlobs},
protocol::{ChainResponse, GetObjectsResponse},
};
use crate::{
block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
client_pool::ClientPool,
};
proptest! {
#![proptest_config(ProptestConfig {
cases: 4,
max_shrink_iters: 10,
timeout: 60 * 1000,
.. ProptestConfig::default()
})]
#[test]
fn test_block_downloader(blockchain in dummy_blockchain_stragtegy(), peers in 1_usize..128) {
let blockchain = Arc::new(blockchain);
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
tokio_pool.block_on(async move {
timeout(Duration::from_secs(600), async move {
let client_pool = ClientPool::new();
let mut peer_ids = Vec::with_capacity(peers);
for _ in 0..peers {
let client = mock_block_downloader_client(blockchain.clone());
peer_ids.push(client.info.id);
client_pool.add_new_client(client);
}
let stream = download_blocks(
client_pool,
SyncStateSvc(peer_ids) ,
OurChainSvc {
genesis: *blockchain.blocks.first().unwrap().0
},
BlockDownloaderConfig {
buffer_size: 1_000,
in_progress_queue_size: 10_000,
check_client_pool_interval: Duration::from_secs(5),
target_batch_size: 5_000,
initial_batch_size: 1,
});
let blocks = stream.map(|blocks| blocks.blocks).concat().await;
assert_eq!(blocks.len() + 1, blockchain.blocks.len());
for (i, block) in blocks.into_iter().enumerate() {
assert_eq!(&block, blockchain.blocks.get_index(i + 1).unwrap().1);
}
}).await
}).unwrap();
}
}
prop_compose! {
/// Returns a strategy to generate a [`Transaction`] that is valid for the block downloader.
fn dummy_transaction_stragtegy(height: u64)
(
extra in vec(any::<u8>(), 0..1_000),
timelock in 0_usize..50_000_000,
)
-> Transaction {
Transaction {
prefix: TransactionPrefix {
version: 1,
timelock: Timelock::Block(timelock),
inputs: vec![Input::Gen(height)],
outputs: vec![],
extra,
},
signatures: vec![],
rct_signatures: RctSignatures {
base: RctBase {
fee: 0,
pseudo_outs: vec![],
encrypted_amounts: vec![],
commitments: vec![],
},
prunable: RctPrunable::Null
},
}
}
}
prop_compose! {
/// Returns a strategy to generate a [`Block`] that is valid for the block downloader.
fn dummy_block_stragtegy(
height: u64,
previous: [u8; 32],
)
(
miner_tx in dummy_transaction_stragtegy(height),
txs in vec(dummy_transaction_stragtegy(height), 0..25)
)
-> (Block, Vec<Transaction>) {
(
Block {
header: BlockHeader {
major_version: 0,
minor_version: 0,
timestamp: 0,
previous,
nonce: 0,
},
miner_tx,
txs: txs.iter().map(Transaction::hash).collect(),
},
txs
)
}
}
/// A mock blockchain.
struct MockBlockchain {
blocks: IndexMap<[u8; 32], (Block, Vec<Transaction>)>,
}
impl Debug for MockBlockchain {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("MockBlockchain")
}
}
prop_compose! {
/// Returns a strategy to generate a [`MockBlockchain`].
fn dummy_blockchain_stragtegy()(
blocks in vec(dummy_block_stragtegy(0, [0; 32]), 1..50_000),
) -> MockBlockchain {
let mut blockchain = IndexMap::new();
for (height, mut block) in blocks.into_iter().enumerate() {
if let Some(last) = blockchain.last() {
block.0.header.previous = *last.0;
block.0.miner_tx.prefix.inputs = vec![Input::Gen(height as u64)]
}
blockchain.insert(block.0.hash(), block);
}
MockBlockchain {
blocks: blockchain
}
}
}
fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<ClearNet> {
let semaphore = Arc::new(Semaphore::new(1));
let (connection_guard, connection_handle) = cuprate_p2p_core::handles::HandleBuilder::new()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
let request_handler = service_fn(move |req: PeerRequest| {
let bc = blockchain.clone();
async move {
match req {
PeerRequest::GetChain(chain_req) => {
let mut i = 0;
while !bc.blocks.contains_key(&chain_req.block_ids[i]) {
i += 1;
if i == chain_req.block_ids.len() {
i -= 1;
break;
}
}
let block_index = bc.blocks.get_index_of(&chain_req.block_ids[i]).unwrap();
let block_ids = bc
.blocks
.get_range(block_index..)
.unwrap()
.iter()
.map(|(id, _)| *id)
.take(200)
.collect::<Vec<_>>();
Ok(PeerResponse::GetChain(ChainResponse {
start_height: 0,
total_height: 0,
cumulative_difficulty_low64: 1,
cumulative_difficulty_top64: 0,
m_block_ids: block_ids.into(),
m_block_weights: vec![],
first_block: Default::default(),
}))
}
PeerRequest::GetObjects(obj) => {
let mut res = Vec::with_capacity(obj.blocks.len());
for i in 0..obj.blocks.len() {
let block = bc.blocks.get(&obj.blocks[i]).unwrap();
let block_entry = BlockCompleteEntry {
pruned: false,
block: block.0.serialize().into(),
txs: TransactionBlobs::Normal(
block
.1
.iter()
.map(Transaction::serialize)
.map(Into::into)
.collect(),
),
block_weight: 0,
};
res.push(block_entry);
}
Ok(PeerResponse::GetObjects(GetObjectsResponse {
blocks: res,
missed_ids: ByteArrayVec::from([]),
current_blockchain_height: 0,
}))
}
_ => panic!(),
}
}
.boxed()
});
let info = PeerInformation {
id: InternalPeerID::Unknown(rand::random()),
handle: connection_handle,
direction: ConnectionDirection::InBound,
pruning_seed: PruningSeed::NotPruned,
};
mock_client(info, connection_guard, request_handler)
}
#[derive(Clone)]
struct SyncStateSvc<Z: NetworkZone>(Vec<InternalPeerID<Z::Addr>>);
impl Service<PeerSyncRequest<ClearNet>> for SyncStateSvc<ClearNet> {
type Response = PeerSyncResponse<ClearNet>;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerSyncRequest<ClearNet>) -> Self::Future {
let peers = self.0.clone();
async move { Ok(PeerSyncResponse::PeersToSyncFrom(peers)) }.boxed()
}
}
struct OurChainSvc {
genesis: [u8; 32],
}
impl Service<ChainSvcRequest> for OurChainSvc {
type Response = ChainSvcResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
let genesis = self.genesis;
async move {
Ok(match req {
ChainSvcRequest::CompactHistory => ChainSvcResponse::CompactHistory {
block_ids: vec![genesis],
cumulative_difficulty: 1,
},
ChainSvcRequest::FindFirstUnknown(_) => ChainSvcResponse::FindFirstUnknown(1, 1),
ChainSvcRequest::CumulativeDifficulty => ChainSvcResponse::CumulativeDifficulty(1),
})
}
.boxed()
}
}

View file

@ -126,13 +126,16 @@ impl<N: NetworkZone> ClientPool<N> {
pub fn borrow_clients<'a, 'b>(
self: &'a Arc<Self>,
peers: &'b [InternalPeerID<N::Addr>],
) -> impl Iterator<Item = ClientPoolDropGuard<N>> + Captures<(&'a (), &'b ())> {
) -> impl Iterator<Item = ClientPoolDropGuard<N>> + sealed::Captures<(&'a (), &'b ())> {
peers.iter().filter_map(|peer| self.borrow_client(peer))
}
}
/// TODO: Remove me when 2024 Rust
///
/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick
trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {}
mod sealed {
/// TODO: Remove me when 2024 Rust
///
/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick
pub trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {}
}

View file

@ -12,6 +12,12 @@ pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_
/// The durations of a short ban.
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
/// The durations of a medium ban.
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
/// The durations of a long ban.
pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7);
/// The default amount of time between inbound diffusion flushes.
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
@ -34,6 +40,35 @@ pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;
/// TODO: it might be a good idea to make this configurable.
pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500);
/// The initial amount of chain requests to send to find the best chain to sync from.
pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
/// The enforced maximum amount of blocks to request in a batch.
///
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100;
/// The timeout that the block downloader will use for requests.
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
/// The maximum size of a transaction, a sanity limit that all transactions across all hard-forks must
/// be less than.
///
/// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size>
pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
/// The maximum amount of block IDs allowed in a chain entry response.
///
/// ref: <https://github.com/monero-project/monero/blob/cc73fe71162d564ffda8e549b79a350bca53c454/src/cryptonote_config.h#L97>
// TODO: link to the protocol book when this section is added.
pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000;
/// The amount of failures downloading a specific batch before we stop attempting to download it.
pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 5;
/// The amount of empty chain entries to receive before we assume we have found the top of the chain.
pub(crate) const EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED: usize = 5;
#[cfg(test)]
mod tests {
use super::*;
@ -44,4 +79,10 @@ mod tests {
fn outbound_diffusion_flush_shorter_than_inbound() {
assert!(DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND < DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND);
}
/// Checks that the ban time increases from short to long.
#[test]
fn ban_times_sanity_check() {
assert!(SHORT_BAN < MEDIUM_BAN && MEDIUM_BAN < LONG_BAN);
}
}

View file

@ -4,22 +4,24 @@
//! a certain [`NetworkZone`]
use std::sync::Arc;
use cuprate_async_buffer::BufferStream;
use futures::FutureExt;
use tokio::{
sync::{mpsc, watch},
task::JoinSet,
};
use tokio_stream::wrappers::WatchStream;
use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt};
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use cuprate_p2p_core::{
client::Connector,
client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse},
services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest},
CoreSyncSvc, NetworkZone, PeerRequestHandler,
};
mod block_downloader;
mod broadcast;
mod client_pool;
pub mod config;
@ -28,6 +30,7 @@ mod constants;
mod inbound_server;
mod sync_states;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig;
@ -87,7 +90,7 @@ where
let inbound_handshaker = cuprate_p2p_core::client::HandShaker::new(
address_book.clone(),
sync_states_svc,
sync_states_svc.clone(),
core_sync_svc.clone(),
peer_req_handler,
inbound_mkr,
@ -136,6 +139,7 @@ where
broadcast_svc,
top_block_watch,
make_connection_tx,
sync_states_svc,
address_book: address_book.boxed_clone(),
_background_tasks: Arc::new(background_tasks),
})
@ -156,6 +160,8 @@ pub struct NetworkInterface<N: NetworkZone> {
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
/// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
/// The peer's sync states service.
sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
/// Background tasks that will be aborted when this interface is dropped.
_background_tasks: Arc<JoinSet<()>>,
}
@ -166,6 +172,26 @@ impl<N: NetworkZone> NetworkInterface<N> {
self.broadcast_svc.clone()
}
/// Starts the block downloader and returns a stream that will yield sequentially downloaded blocks.
pub fn block_downloader<C>(
&self,
our_chain_service: C,
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
{
block_downloader::download_blocks(
self.pool.clone(),
self.sync_states_svc.clone(),
our_chain_service,
config,
)
}
/// Returns a stream which yields the highest seen sync state from a connected peer.
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> {
WatchStream::from_changes(self.top_block_watch.clone())