CS144 学习笔记 01

CS144 学习笔记 01

RayAlto OP

个人见解,不一定是最优解

0. 目标

众所周知 TCP 需要向上层提供可靠的字节流,其发送端会把需要发送的数据分成几段发过来,这些数据报可能会在发送途中丢失、重复、顺序混乱(损坏的数据会被丢弃,所以这次不考虑损坏),这次的目标是把这些数据报重新组合成原来的整段数据。

1. 考虑

这次需要实现的接口主要是:

1
void insert( uint64_t first_index, std::string data, bool is_last_substring );
参数作用
first_index表示这段数据的第一个 byte 相对于整个数据的下标,整个数据的下标从 0 开始。
data表示这段数据,是一串二进制数据,可能包含 \0
is_last_substring表示这段数据为整个数据中的最后一段, i.e. 整个数据的长度为 first_index + data.length() - 1

我遇到的问题:

1.a. 缓冲区设计

这次的缓冲区和上次差不多,都类似一个 circlar_buffer<char> ,我选择了 std::string buf_ 作为原型,我以为接口还是和上次一样都是 std::string_view ,用 std::string 做原型可以避免再构造一个新的 std::string ,但回过头来看 ByteStreamvoid push(std::string) 接口不接受 std::string_view ,无论如何都需要构造一个新的 std::string ,所以 std::deque<char> 更方便一些?

这里有一个问题, ByteStream 的 public 接口没有知道它的完整容量的方法( available_capacity 返回的是当前可用容量,而 Reassembler 使用 ByteStream&& 构造自己,这个 ByteStream&& 可能已经被用了),也就不能直接知道缓冲区应该多大,我用了一种丑陋的解决方案,每次 insert 都检测缓冲区长度,如果比 ByteStreamavailable_capacity 小就用 \0 填充到一样大。

1.b. 标记缓冲区

收到的数据可能是不连续的,需要先缓存,等继续收到数据形成一段连续的数据发给 ByteStream ,而 TCP 是字节流,数据可能包含 \0 ,所以直接在缓冲区标记数据是不可行的,我选择额外加了一个和缓冲区一样大小的 std::string indicator_ ,用 1 表示对应位置写入了数据、 \0 表示空。

这里还有个小问题, uint64_t Reassembler::bytes_pending() const 这个接口如何设计,如果每次 insert 都更新的话需要反复遍历 indicator_ ,所以我选择了用 #include <algorithm>count(indicator_.cbegin(), indicator_.cend(), '1') ,也省了在 insert 里计算插入数据的真实长度的麻烦。

1.c. 推数据的逻辑

我的设计是记录下一个需要的 byte (以形成一段连续的数据)所在整个数据的下标,如果 insertdata 包含了这个 byte ,则通过 indicator_ 计算可以推给 ByteStream 的数据长度。

2. 我的答案

点击展开:我的解决方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
diff --git a/src/reassembler.cc b/src/reassembler.cc
index 4e56527..f32bc7c 100644
--- a/src/reassembler.cc
+++ b/src/reassembler.cc
@@ -1,17 +1,62 @@
#include "reassembler.hh"

+#include <algorithm>
+#include <cstdint>
+#include <string_view>
+
using namespace std;

void Reassembler::insert( uint64_t first_index, string data, bool is_last_substring )
{
- // Your code here.
- (void)first_index;
- (void)data;
- (void)is_last_substring;
+ const size_t data_len = data.length();
+ if ( is_last_substring ) {
+ end_index_ = first_index + data_len;
+ }
+ const uint64_t cap_avai = writer().available_capacity();
+ const size_t buf_len = buf_.length();
+ if ( buf_len < cap_avai ) {
+ const size_t n = cap_avai - buf_len;
+ buf_.append( n, '\0' );
+ indicator_.append( n, '\0' );
+ }
+ const uint64_t buf_end = next_index_ + cap_avai;
+ const uint64_t data_end = first_index + data_len;
+ if ( data_end < next_index_ || buf_end < first_index ) {
+ check_eos_();
+ return;
+ }
+ const uint64_t substr_start = max( first_index, next_index_ );
+ const uint64_t substr_end = min( buf_end, first_index + data_len );
+ const uint64_t substr_len = substr_end - substr_start;
+ if ( substr_len == 0 ) {
+ check_eos_();
+ return;
+ }
+ const uint64_t buf_start = substr_start - next_index_;
+ const uint64_t data_start = substr_start - first_index;
+ buf_.replace( buf_start, substr_len, data, data_start, substr_len );
+ indicator_.replace( buf_start, substr_len, substr_len, '1' );
+ if ( substr_start == next_index_ ) {
+ const size_t n_assembled = min( indicator_.find_first_not_of( '1' ), indicator_.length() );
+ output_.writer().push( buf_.substr( 0, n_assembled ) );
+ const size_t n_left = cap_avai - n_assembled;
+ buf_.replace( 0, n_left, string_view { buf_ }.substr( n_assembled ), 0, n_left );
+ buf_.replace( n_left, n_assembled, n_assembled, '\0' );
+ indicator_.replace( 0, n_left, string_view { indicator_ }.substr( n_assembled ), 0, n_left );
+ indicator_.replace( n_left, n_assembled, n_assembled, '\0' );
+ next_index_ += n_assembled;
+ }
+ check_eos_();
}

uint64_t Reassembler::bytes_pending() const
{
- // Your code here.
- return {};
+ return count( indicator_.cbegin(), indicator_.cend(), '1' );
+}
+
+void Reassembler::check_eos_()
+{
+ if ( next_index_ == end_index_ ) {
+ output_.writer().close();
+ }
}
diff --git a/src/reassembler.hh b/src/reassembler.hh
index 7e988ff..7be0828 100644
--- a/src/reassembler.hh
+++ b/src/reassembler.hh
@@ -1,12 +1,21 @@
#pragma once

+#include <string>
+
#include "byte_stream.hh"

class Reassembler
{
public:
// Construct Reassembler to write into given ByteStream.
- explicit Reassembler( ByteStream&& output ) : output_( std::move( output ) ) {}
+ explicit Reassembler( ByteStream&& output )
+ : output_( std::move( output ) )
+ , buf_ {}
+ , indicator_ {}
+ , nbuffed_ { 0 }
+ , next_index_ { 0 }
+ , end_index_( std::string::npos )
+ {}

/*
* Insert a new substring to be reassembled into a ByteStream.
@@ -42,4 +51,12 @@ public:

private:
ByteStream output_; // the Reassembler writes to this ByteStream
+ std::string buf_;
+ std::string indicator_;
+ uint64_t nbuffed_;
+ uint64_t next_index_;
+ uint64_t end_index_;
+
+ // close the ByteStream if the end of the stream is reached
+ void check_eos_();
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/* reassembler.hh */

#pragma once

#include <string>

#include "byte_stream.hh"

class Reassembler
{
public:
// Construct Reassembler to write into given ByteStream.
explicit Reassembler( ByteStream&& output )
: output_( std::move( output ) )
, buf_ {}
, indicator_ {}
, nbuffed_ { 0 }
, next_index_ { 0 }
, end_index_( std::string::npos )
{}

/*
* Insert a new substring to be reassembled into a ByteStream.
* `first_index`: the index of the first byte of the substring
* `data`: the substring itself
* `is_last_substring`: this substring represents the end of the stream
* `output`: a mutable reference to the Writer
*
* The Reassembler's job is to reassemble the indexed substrings (possibly out-of-order
* and possibly overlapping) back into the original ByteStream. As soon as the Reassembler
* learns the next byte in the stream, it should write it to the output.
*
* If the Reassembler learns about bytes that fit within the stream's available capacity
* but can't yet be written (because earlier bytes remain unknown), it should store them
* internally until the gaps are filled in.
*
* The Reassembler should discard any bytes that lie beyond the stream's available capacity
* (i.e., bytes that couldn't be written even if earlier gaps get filled in).
*
* The Reassembler should close the stream after writing the last byte.
*/
void insert( uint64_t first_index, std::string data, bool is_last_substring );

// How many bytes are stored in the Reassembler itself?
uint64_t bytes_pending() const;

// Access output stream reader
Reader& reader() { return output_.reader(); }
const Reader& reader() const { return output_.reader(); }

// Access output stream writer, but const-only (can't write from outside)
const Writer& writer() const { return output_.writer(); }

private:
ByteStream output_; // the Reassembler writes to this ByteStream
std::string buf_;
std::string indicator_;
uint64_t nbuffed_;
uint64_t next_index_;
uint64_t end_index_;

// close the ByteStream if the end of the stream is reached
void check_eos_();
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/* reassembler.cc */

#include "reassembler.hh"

#include <algorithm>
#include <cstdint>
#include <string_view>

using namespace std;

void Reassembler::insert( uint64_t first_index, string data, bool is_last_substring )
{
const size_t data_len = data.length();
if ( is_last_substring ) {
end_index_ = first_index + data_len;
}
const uint64_t cap_avai = writer().available_capacity();
const size_t buf_len = buf_.length();
if ( buf_len < cap_avai ) {
const size_t n = cap_avai - buf_len;
buf_.append( n, '\0' );
indicator_.append( n, '\0' );
}
const uint64_t buf_end = next_index_ + cap_avai;
const uint64_t data_end = first_index + data_len;
if ( data_end < next_index_ || buf_end < first_index ) {
check_eos_();
return;
}
const uint64_t substr_start = max( first_index, next_index_ );
const uint64_t substr_end = min( buf_end, first_index + data_len );
const uint64_t substr_len = substr_end - substr_start;
if ( substr_len == 0 ) {
check_eos_();
return;
}
const uint64_t buf_start = substr_start - next_index_;
const uint64_t data_start = substr_start - first_index;
buf_.replace( buf_start, substr_len, data, data_start, substr_len );
indicator_.replace( buf_start, substr_len, substr_len, '1' );
if ( substr_start == next_index_ ) {
const size_t n_assembled = min( indicator_.find_first_not_of( '1' ), indicator_.length() );
output_.writer().push( buf_.substr( 0, n_assembled ) );
const size_t n_left = cap_avai - n_assembled;
buf_.replace( 0, n_left, string_view { buf_ }.substr( n_assembled ), 0, n_left );
buf_.replace( n_left, n_assembled, n_assembled, '\0' );
indicator_.replace( 0, n_left, string_view { indicator_ }.substr( n_assembled ), 0, n_left );
indicator_.replace( n_left, n_assembled, n_assembled, '\0' );
next_index_ += n_assembled;
}
check_eos_();
}

uint64_t Reassembler::bytes_pending() const
{
return count( indicator_.cbegin(), indicator_.cend(), '1' );
}

void Reassembler::check_eos_()
{
if ( next_index_ == end_index_ ) {
output_.writer().close();
}
}

测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
$ cmake --build ./build --target check1
Test project /home/rayalto/Projects/cpp/minnow/build
Connected to MAKE jobserver
Start 1: compile with bug-checkers
1/17 Test #1: compile with bug-checkers ........ Passed 0.29 sec
Start 3: byte_stream_basics
2/17 Test #3: byte_stream_basics ............... Passed 0.01 sec
Start 4: byte_stream_capacity
3/17 Test #4: byte_stream_capacity ............. Passed 0.02 sec
Start 5: byte_stream_one_write
4/17 Test #5: byte_stream_one_write ............ Passed 0.01 sec
Start 6: byte_stream_two_writes
5/17 Test #6: byte_stream_two_writes ........... Passed 0.01 sec
Start 7: byte_stream_many_writes
6/17 Test #7: byte_stream_many_writes .......... Passed 0.05 sec
Start 8: byte_stream_stress_test
7/17 Test #8: byte_stream_stress_test .......... Passed 0.03 sec
Start 9: reassembler_single
8/17 Test #9: reassembler_single ............... Passed 0.01 sec
Start 10: reassembler_cap
9/17 Test #10: reassembler_cap .................. Passed 0.01 sec
Start 11: reassembler_seq
10/17 Test #11: reassembler_seq .................. Passed 0.03 sec
Start 12: reassembler_dup
11/17 Test #12: reassembler_dup .................. Passed 0.04 sec
Start 13: reassembler_holes
12/17 Test #13: reassembler_holes ................ Passed 0.01 sec
Start 14: reassembler_overlapping
13/17 Test #14: reassembler_overlapping .......... Passed 0.01 sec
Start 15: reassembler_win
14/17 Test #15: reassembler_win .................. Passed 0.32 sec
Start 37: compile with optimization
15/17 Test #37: compile with optimization ........ Passed 0.12 sec
Start 38: byte_stream_speed_test
ByteStream throughput: 18.26 Gbit/s
16/17 Test #38: byte_stream_speed_test ........... Passed 0.19 sec
Start 39: reassembler_speed_test
Reassembler throughput: 8.87 Gbit/s
17/17 Test #39: reassembler_speed_test ........... Passed 0.31 sec

100% tests passed, 0 tests failed out of 17

Total Test time (real) = 1.50 sec
Built target check1