0%

CS144实验记录

启程

配环境

vscode+remote-ssh+云服务器centos8开发

本想写2023年的新版lab,但是看了看要用g++20编译,试了试更新g++太难,放弃了,期间还删掉了g++,还好服务器有备份可以回滚。

推荐使用 https://kangyupl.gitee.io/cs144.github.io/ 网站镜像备份,g++8编译就可以。

代码也是采用 https://gitee.com/kangyupl/sponge git clone 下来后直接切换分支就可以写。

编译 make
测试 make check_lab0

lab0

中规中矩,热身实验。耗时1.5h左右。

实现 get_url函数,注意端口号部分写 http 代表服务类型就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

//! Construct by resolving a hostname and servicename. 函数重载采用这个函数
Address(const std::string &hostname, const std::string &service);


void get_URL(const string &host, const string &path) {
// Your code here.

TCPSocket mysock;
mysock.connect(Address(host,"http"));

mysock.write("GET "+path+" HTTP/1.1\r\n");
mysock.write("Host: "+host+"\r\n\r\n");

mysock.shutdown(SHUT_WR);

while(!mysock.eof()){// eof代表读取完毕 且发送端也停止发送
auto rec_message=mysock.read();
cout<<rec_message;
}

mysock.closed();

return;

}

内存中的通讯,采取了list的写法。注意下eof() 函数,头文件注意定义变量,别定义少了。

就是一个可以读取和写入的有序字节流,先写入的先被读取到。实现这个类的一些方法。

1
2
3
4
5
6
7
8
9
10
private:
// Your code here -- add private members as necessary.

bool _error{}; //!< Flag indicating that the stream suffered an error.
std::list<char> stream_content;
const size_t capacity= 0;
size_t stream_size=0;// 最后一个写入的字节的序号,其实没用,当初写多了就一直保留了
bool flag_finish=0;
size_t read_size=0;// 已经读取的长度
size_t writen_size=0;// 已经写的长度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
ByteStream::ByteStream(const size_t mycapacity) : stream_content{}, capacity(mycapacity), stream_size(0) {}

size_t ByteStream::write(const string &data) {
if(flag_finish || _error) return 0;

size_t temp_size = writen_size;
for (auto& temp : data) {
if ((writen_size-read_size ) >= capacity)
break;// list中的数据量满了 就退出
stream_size++;
writen_size++;
stream_content.emplace_back(temp);
}
return (writen_size - temp_size);// 返回实际写入的数据长度
}

//! \param[in] len bytes will be copied from the output side of the buffer
string ByteStream::peek_output(const size_t len) const {
if(_error) return nullptr;
size_t temp_len = 0;
string ans(len, '0');

for (auto temp_char : stream_content) {
if (temp_len == len)
break;
temp_len++;
ans[temp_len - 1] = temp_char;// 因为实际内容的长度可能不够 所以还需要一个 temp_len
}
return ans.substr(0, temp_len);
}

//! \param[in] len bytes will be removed from the output side of the buffer
void ByteStream::pop_output(const size_t len) {
if(_error) return ;
size_t cur_size=buffer_size();
for (size_t i = 0; i < len; i++) {
if (cur_size<= 0)
break;// 防止数据量并没有len那么多
cur_size--;
stream_content.pop_front();
read_size++;
}

return;
}

void ByteStream::end_input() {
flag_finish = 1;
return;
}

bool ByteStream::input_ended() const { return flag_finish; }

size_t ByteStream::buffer_size() const { return writen_size-read_size; }

bool ByteStream::buffer_empty() const { return writen_size-read_size==0 ; }

bool ByteStream::eof() const {
return buffer_empty() && flag_finish; // 数据读取完毕 并且 输入端停止输入
}

size_t ByteStream::bytes_written() const {
return writen_size;
}

size_t ByteStream::bytes_read() const { return read_size; }

size_t ByteStream::remaining_capacity() const { return capacity - (writen_size-read_size); }

lab1

实现一个类,把无序可能重复到达的数据组合成有序的并且写入到_output中,_output就是lab0实现的Bytestream。耗时3.5h左右。

写之前可以先make测试一下,看看测试程序的样例,不然理解可能会有些偏差,我盯着样例看了一会才明白具体细节。

注意点如下

  1. 到达数据会重叠
  2. 当到达数据全部被接受到(根据传入参数eof判断),且全部有序后,且全部写入output后,需要更新output的结束信号状态。
  3. output的容量可能比StreamReassembler的小,不过这个lab中不用考虑这些…不知道为什么,考虑的话,当output的可用容量小于要写入的字节时候,你还要自己做个判断,并且定义一个新的变量。
  4. 按照道理,流重组器的数据写入顺序字节流后,流重组器的数据就可以删除了,不过这个lab没要求,我也懒得写,也不知道后面lab需不需要考虑。

网上一个参考说明如下,不过这个lab其实并不用考虑

LAB0的顺序字节流和LAB1的流重组器各有各的容量限制。流重组器把字节流写满后,只有当字节流腾出空后才能继续写,相当于字节流满时流重组器出口被“堵住”了。同样当流重组器容量满了后自身也无法被写入新数据,此时到来的新碎片只能被丢弃掉。

选取合适数据结构很重要,写之前模拟半天,一个idea是两个链表,太复杂pass了,一个idea是双端队列+无序set,一个idea是string+无序set。

需要删除流重组器的数据的话用deque更好一些,不过我写的时候没考虑,直接用了string。 到时候看看再重构吧。

写完lab1 做lab2的时候我才知道

lab1的担心是对的,确实需要删除已经写入有序字节流的元素,把lab2代码拉下来后重新测试发现lab1出错了,lab2新增了对lab1的测试,也就是测试有没有删掉已经写入顺序字节流的元素,所以fix bug采用了deque数据结构。

以下是最新代码

1
2
3
4
5
6
7
8
9
10
11
12
private:
// Your code here -- add private members as necessary.

ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
std::deque<char> all_content_str;
std::unordered_set<int> hash_str_unorderset={};
size_t size_char_inorder=0; // 所有有序的数量,包括已经写入被pop出去的
size_t size_char_all=0; // // 所有数据的数量,包括已经写入被pop出去的
size_t size_char_writen=0;// 写入到有序字节流中的数量
size_t start_index=0;// deque中第一个字符的在字节流中的下标,其实和size_char_writen一样
bool eof_flag=false;

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
StreamReassembler::StreamReassembler(const size_t capacity)
: _output(capacity),
_capacity(capacity),
all_content_str(capacity, '0')
{}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
size_t data_length = data.size();

size_t up_length = min(data_length , _capacity+start_index-index); // 实际可以写入的长度

for (size_t i = 0; i < up_length; i++) {
if (hash_str_unorderset.find(i+index) == hash_str_unorderset.end()) {
hash_str_unorderset.emplace(i+index);
all_content_str.at(index-start_index+i) = data[i];
size_char_all++;
}
}// 将收到的data写入缓冲区,同时更新hash表(hash表是判断对应序号的data是否被写入)


for (size_t i = size_char_inorder-start_index; i < _capacity; i++) {
if (hash_str_unorderset.find(i+start_index) == hash_str_unorderset.end())
break;// hash中找不到,也就是该位置对应的data未被写入

size_char_inorder++;
} // 寻找连续序列(也就是重组好的序列),直到出现断层


size_t size_writable=min(_output.remaining_capacity(),size_char_inorder-size_char_writen);
// 防止output内的缓冲长度不够,所以取最小值,同时还要定义一个size_char_written记录已经写入的数据

string temp(size_writable,'0');
for(size_t i=0;i<size_writable;i++) temp[i]=all_content_str.at(i);

_output.write(temp);

for(size_t i=0;i<size_writable;i++){
all_content_str.pop_front();
all_content_str.emplace_back('0'); // pop后还要在deque尾部添加,保持容量不变。
};

size_char_writen+=size_writable; // 更新写入的长度
start_index=size_char_writen;

if(eof) eof_flag=true;

if(eof_flag && this->empty()){
_output.end_input();
}// 接受到全部data后且全部写入到output后,output就可以更新结束信号


return;
}

size_t StreamReassembler::unassembled_bytes() const { return size_char_all - size_char_inorder; }

bool StreamReassembler::empty() const { return size_char_all == size_char_inorder && size_char_inorder==size_char_writen; }
// 不仅全部要有序,所有的还必须被写入output ,可以直接写出 size_char_all==size_char_written

lab2 花费比较多,8h左右。

tcp receiver

part1

对我挺折磨的,主要是看不懂文档在说啥…,真正理解还是自己测试的时候,看测试代码才知道究竟让干啥,一些细节究竟是什么。

注释比较全面了,不过我的逻辑可能不太好理解,有问题可以给我提issue。

cout是debug时候看的,留着可能下个lab还要用,悲,测试样例不全就是这样的,不能相信之前自己写的lab。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
uint64_t t_num = UINT32_MAX;
t_num++;

n = n % (t_num); // 先回退到第一个循环。
n += isn.raw_value();

n = n % (t_num);

return WrappingInt32(static_cast<uint32_t>(n));
}

uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
// n的绝对序列和checkpoint的差的绝对值一定小于(2^32)-1;
cout << " n isn check " << n << " " << isn << " " << checkpoint << endl;
WrappingInt32 check_in_wrap32 = wrap(checkpoint, isn);
cout << " check in warp32 " << check_in_wrap32 << " " << endl;
uint64_t ans = 0;
uint64_t gap_1 = 0; // 记录n的绝对序列比checkpoint大的情况,当然我们并不知道究竟是大还是小,比较取得最合适的
uint64_t gap_2 = 0; // 记录n的绝对序列比checkpoint小的情况
uint64_t n_raw_value = n.raw_value();
uint64_t check_in_wrap32_raw_value = check_in_wrap32.raw_value();

// gap均为正
// 假设n的绝对序列比check大的情况,比check大的时候,有可能没有多一个循环,有可能多了一个循环
// 比如 checkpoint的相对序列为 3,n的相对序列可能为1(此时n的绝对序列比 check大了 2^32-2) 也可能为 10,但每种情况
// n的绝对序列都要比checkpoint大
if (n_raw_value < check_in_wrap32_raw_value) {
gap_1 =
1 + n_raw_value + (uint64_t)UINT32_MAX -
check_in_wrap32_raw_value; // 这里要加个一,因为跳入一个新的循环,要加1,想不通自己设个小点的数字,自己算一下

} else
gap_1 = n_raw_value - check_in_wrap32_raw_value;

if (n_raw_value > check_in_wrap32_raw_value) {
gap_2 = 1 + (uint64_t)UINT32_MAX + check_in_wrap32_raw_value - n_raw_value; // 这里要加个一
} else
gap_2 = check_in_wrap32_raw_value - n_raw_value; // 假设n的绝对序列比check小的情况

if (gap_1 < gap_2) {
ans = checkpoint + gap_1;
} // 第一种情况更接近checkpoint
else if (checkpoint >= gap_2)
ans = checkpoint - gap_2; // 第二种情况更接近checkpoint
else
ans = checkpoint + gap_1; // 虽然第二种情况更接近checkpoint,也就是n的绝对序列比checkpoint小,但是此时
// checkpoint<gap_2,相减会溢出(而 两者差距不可能那么大),因此其实是第一种情况

// 上面三个if else 判断条件的 等号 要不要加 也要自己琢磨思考一下,不然会出错。
return ans;
}

part2

认真看文档,先理解让干啥再写,我漏看了后面的详细介绍,懵逼了半天,不知道从何下手,然后又看文档才发现后面有详细的介绍。

逻辑不难,本质上就是维护一个滑动窗口,然后确认什么时候接收数据,什么时候不接受。需要注意的是,这里的窗口大小被有序字节流和重组器的共同限制,这下知道我怎么发现lab0的bug了吧。需要注意的细节有点多,慢慢对着测试代码改吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private:
//! Our data structure for re-assembling bytes.
StreamReassembler _reassembler;

//! The maximum number of bytes we'll store.
size_t _capacity;
size_t seg_length = 0; //// 当前TCPSegment的length
bool seg_syn_flag = false;
bool seg_fin_flag = false;
uint64_t checkpoint{0}; // 检查号
uint32_t offset_syn = 0; // 如果当前段是syn的话,因为需要转换stream index 所以需要+1,syn是0 但是在stream
// index中不占位置,syn的下一位才是0

WrappingInt32 syn_num{0}; // 整个字节流的初始序列号 也就是isn
WrappingInt32 seq_num{0}; // 当前TCPSegment的序列号

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
bool TCPReceiver::segment_received(const TCPSegment &seg) {
if (seg.header().syn) {
if (seg_syn_flag)
return false; // second SYN is rejected
offset_syn = 1;
seg_syn_flag = true;
syn_num = seg.header().seqno;
checkpoint = static_cast<uint64_t>(0); // 遇到syn的时候,设置初始序列号 和检查位,此时检查位应该为 0,因为此刻还没有开始转换,因此就选择0
}

if (!seg_syn_flag)
return false; // 没有开始的话就直接丢弃

if (seg.header().fin && seg_fin_flag)
return false; // second FIN is rejected
if (seg.header().fin)
seg_fin_flag = true;


WrappingInt32 temp_seq_num{seg.header().seqno};
uint64_t temp_cur_index = unwrap(temp_seq_num + offset_syn, syn_num, checkpoint); // 该segment的绝对序列号
cout<< " 收到的数据段的绝对序列号 "<<temp_cur_index<<endl;


//_reassembler.ack_n()+1 期望收到的下一个数据的绝对序列号
if (_reassembler.ack_n() + 1 + _reassembler.empty_length() <= temp_cur_index){
return false; // 当前段的序列超过窗口范围 丢弃
cout<<" 当前段的序列超过窗口范围 丢弃 "<<endl;
}

seg_length = seg.length_in_sequence_space()-seg.header().syn-seg.header().fin;




if(seg_length==0){
if(wrap(temp_cur_index, syn_num)-wrap(_reassembler.ack_n() + 1, syn_num) <0){
return false;
}// 重复空报文
}
else if (wrap(temp_cur_index, syn_num) + static_cast<uint32_t>(seg_length-1+seg.header().fin)
-wrap(_reassembler.ack_n() + 1, syn_num) <0){

cout<<" 判断接受到的 重叠且这个数据段没有新的信息 直接丢弃 "<<endl;
return false; // 判断接受到的 重叠且这个数据段没有新的信息 直接丢弃,fin报文也算数据 因此也要计算

}

seq_num = seg.header().seqno;
uint64_t cur_index = unwrap(seq_num + offset_syn, syn_num, checkpoint);
// 绝对序列号,syn不占流里面的下标位置 因此有syn的时候 算的是syn之后一位的位置。

checkpoint = cur_index + seg_length - 1; // 最近确认的绝对序列号

cout<<" 进入重组 "<<cur_index<<endl;
_reassembler.push_substring(seg.payload().copy(), cur_index - 1, seg_fin_flag);


offset_syn = 0;
return true;
}

optional<WrappingInt32> TCPReceiver::ackno() const {
if (seg_syn_flag == false)
return nullopt;
uint64_t n = _reassembler.ack_n() + 1; // 期望收到的绝对序列号,ack_n返回的是stream_index
cout<<" 期望收到的绝对序列号n "<<n<<endl;
return wrap(n, syn_num);
}

size_t TCPReceiver::window_size() const { return _reassembler.empty_length(); }

在lab1中的流重组器中加了两个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    //   可以被写入的数据长度,也就是有序字节流中还剩下的空间 和 重组器中剩下的空间减去已经有序但还未进入重组器的长度  的较小值。
uint64_t empty_length() const;

// 返回下一个期望收到的字符的stream index
uint64_t ack_n() const;

uint64_t StreamReassembler::ack_n() const {
if (eof_flag)
return size_char_inorder + 1;// +1是因为fin也被看做一个字节 但是不会被写进去
return size_char_inorder; // 返回下一个期望收到的字符的stream index
}

uint64_t StreamReassembler::empty_length() const {
return min(_output.remaining_capacity()-(size_char_inorder-size_char_writen), _capacity - (size_char_inorder - size_char_writen));
}

中期总结

lab0实现了内存中的顺序读取结构,也就是bytestream。

lab1在此基础上实现了重组器,把不按照顺序到达的数据重新组装起来,流重组器控制传入数据读取多少,需要读取哪些数据,下一个需要读取的数据序号(ack的序号)。

lab2在两者基础上维护了一个滑动窗口,根据窗口大小,还有数据的序号关系,各种特殊情况怎么处理,选择是否接受数据,把数据传入流重组器(数据接收并不一定被全部接收,可能被流重组器部分接收,不过这不需要管,只要有数据需要被接收就传入流重组器,receiver不需要考虑接收多少,这是流重组器需要处理的)。

lab3

实现tcp sender

根据远方接收者的窗口内可接收新数据的长度 以及 我们可以发送的数据长度 发送tcp segment给receiver。
我们要发送的数据都在bytestream里面,也就是有序字节流中。

当然也要实现最开始发送syn空报文,结束发送fin报文的功能。

重传机制:设置重传计时器和超时时间,收到新的报文ack就重新启动重传计时器和超时时间,超时就需要重传最小的未被确认的报文。

未被确认的报文都需要进行缓存,确认后再pop出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;

//! outbound queue of segments that the TCPSender wants sent,这个队列我们并不能控制
std::queue<TCPSegment> _segments_out{};

// 等待被确认的,因为可能需要重传 所以需要缓存下,
std::queue<TCPSegment> _segments_wait{};

//! retransmission timer for the connection rto的初始值
unsigned int _initial_retransmission_timeout;

//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;

//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};

bool syn_sent=false;// 是否发送最开始的syn
bool fin_sent=false;// 是否发送结束的fin
size_t remote_window_size=1;// 远方接收者的窗口,限制了我们发送数据的大小

size_t cur_all_time=0;// 当前已经过去的时间
size_t next_retran_time=0;// 下次需要重传的时间,也就是说 调用tick时候 假如当前时间超过了next_retran_time 就代表超时
bool tcp_timer_running=false; // 计时器是否正在运行
size_t cur_rto=0;// 当前的rto
size_t continuous_retran_num=0;// 连续重传的数量
uint64_t cur_bytes_length_in_flight=0;// 未被确认的序列号长度 syn和fin也算一个长度'

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, syn_sent{false} {}

uint64_t TCPSender::bytes_in_flight() const { return cur_bytes_length_in_flight; }

void TCPSender::fill_window() {

if(fin_sent) return;

if (syn_sent == false) {
TCPSegment syn_seg;
syn_seg.header().syn = true;
syn_seg.header().seqno = _isn;
_next_seqno=1; // 因为最开始的syn报文长度为1,所以+1就ok了
_segments_out.push(syn_seg);
_segments_wait.push(syn_seg);
syn_sent = true;

if (tcp_timer_running==false) {
tcp_timer_running = true;
cur_rto = _initial_retransmission_timeout;
next_retran_time = cur_all_time + cur_rto;
continuous_retran_num = 0;
}

cur_bytes_length_in_flight = static_cast<uint64_t>(syn_seg.length_in_sequence_space());

return;
}

size_t cur_send_size = min(_stream.buffer_size(), (remote_window_size - static_cast<size_t>(cur_bytes_length_in_flight)));

size_t temp_i = 1450; // 用temp_i 主要是为了使用min函数 不然类型不匹配
size_t seg_num = cur_send_size / temp_i;
size_t end_length=cur_send_size-seg_num*temp_i;
if (cur_send_size > 0)
seg_num++;// 当前能够发送的报文数量


// 发送fin报文
if(cur_bytes_length_in_flight<remote_window_size && _stream.eof()){// 结束输入 并且要保证接收窗口有位置接受fin报文
fin_sent = true;

TCPSegment fin_seg;
fin_seg.header().fin = true;
fin_seg.header().seqno = wrap(_next_seqno, _isn);
_next_seqno += static_cast<uint64_t>(fin_seg.length_in_sequence_space());
_segments_out.push(fin_seg);
_segments_wait.push(fin_seg);

if (tcp_timer_running==false) {
tcp_timer_running = true;
cur_rto = _initial_retransmission_timeout;
next_retran_time = cur_all_time + cur_rto;
continuous_retran_num = 0;
}

cur_bytes_length_in_flight += static_cast<uint64_t>(fin_seg.length_in_sequence_space());

return;
}


for (size_t i = 0; i < seg_num; i++) {

if (tcp_timer_running == false) {
tcp_timer_running = true;
cur_rto = _initial_retransmission_timeout;
next_retran_time = cur_all_time + cur_rto;
continuous_retran_num = 0;
}

string temp_seg_content;

if(i==seg_num-1) temp_seg_content = _stream.peek_output(end_length);
else temp_seg_content = _stream.peek_output(temp_i);


_stream.pop_output(min(temp_i, cur_send_size)); // 用temp_i 主要是为了使用min函数 不然类型不匹配
TCPSegment cur_seg;

cur_seg.payload() = Buffer(std::move(temp_seg_content)); // 发送的segment的内容
cur_seg.header().seqno = wrap(_next_seqno, _isn); // 绝对序列号转换为相对序列号

if (i == seg_num - 1 && _stream.eof())
cur_seg.header().fin = true; // 判断是否结束 fin标志位设置,只有最后输出的时候可能是fin报文

_next_seqno += static_cast<uint64_t>(cur_seg.length_in_sequence_space()); // 绝对序列号增加

_segments_out.push(cur_seg);
_segments_wait.push(cur_seg);

cur_bytes_length_in_flight += static_cast<uint64_t>(cur_seg.length_in_sequence_space());

}

return;
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
//! \returns `false` if the ackno appears invalid (acknowledges something the TCPSender hasn't sent yet)
bool TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
if (ackno - next_seqno() > 0) {
return false; // 确认的是还没有发送的数据,说明出错了
}

size_t old_rece_free_length = remote_window_size - static_cast<size_t>(cur_bytes_length_in_flight);
// 我们认为的远方接收者的 可以接收我们发送的新数据的剩余空间(旧的数据远方接收者收到但未给我们确认的话,这是重传机制需要搞定的事情,这里不需要管)

while (!_segments_wait.empty()) {

if (_segments_wait.front().header().seqno - ackno < 0) { // 按照道理这里还应该加上数据长度,但tcp的segment不可能被分开(可能吗,我现在感觉不可能),所以不加也没问题吧

cur_rto = _initial_retransmission_timeout;
continuous_retran_num = 0;
tcp_timer_running = true;
next_retran_time = cur_all_time + cur_rto;
// 不为空就重启定时器,设置好重传时间 rto 重传次数

cur_bytes_length_in_flight -= static_cast<uint64_t>(_segments_wait.front().length_in_sequence_space());

_segments_wait.pop();
} else {
break;
}

} // 移除已经被确认的 segment

if (_segments_wait.empty()) tcp_timer_running = false; // 全部确认 计时器关闭

remote_window_size = static_cast<size_t>(window_size); // 更新窗口

size_t cur_rece_free_length=remote_window_size-static_cast<size_t>(cur_bytes_length_in_flight);

if (cur_rece_free_length>old_rece_free_length && _stream.buffer_size() > 0) {
fill_window();
} // rece有空间可以接收 且 有数据可以发送 就继续发送

return true;
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
cur_all_time += ms_since_last_tick;

mytcp_timer();

return;
}

void TCPSender::mytcp_timer() {
if (tcp_timer_running == false)
return; // 实际上tick和计时器是两个东西 tick是用来获取时间的 计时器是计时器
// 不过计时器基于tick,因此写在tick里面也ok。

if (cur_all_time >= next_retran_time && !_segments_wait.empty()) {
_segments_out.push(_segments_wait.front());
continuous_retran_num++;
cur_rto *= 2;
next_retran_time = cur_all_time + cur_rto;
}

return;
}

unsigned int TCPSender::consecutive_retransmissions() const { return continuous_retran_num; }

void TCPSender::send_empty_segment() {
TCPSegment cur_seg;
cur_seg.header().seqno = wrap(_next_seqno, _isn); // 绝对序列号转换为相对序列号
_segments_out.push(cur_seg);

return;
}

bool& TCPSender::sender_syn_sent () {
return syn_sent;
}

lab4

tcp connection

把前面的4个lab内容结合起来,实现一个完整的tcp自动机。

需要处理的细节很多很多很多…….

主要就四个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    void connect(); // 初始连接 发送syn报文

size_t write(const std::string &data); // 把需要被发送的数据写入有序字节流等待被发送 同时让sender类发送这些数据(假如根据窗口大小判断 能发送的话)

void tick(const size_t ms_since_last_tick);// 这个函数由操作系统调用,获取过去的时间,超时就进行重传操作,重传次数太多会发送rst报文结束连接。

void segment_received(const TCPSegment &seg); // 接收到了新的报文 就调用receiver更新信息,同时调用sender发送ack报文和数据。同时也要处理一些异常情况


using namespace std;

size_t TCPConnection::remaining_outbound_capacity() const {
return _sender.stream_in().remaining_capacity();
}

size_t TCPConnection::bytes_in_flight() const {
return _sender.bytes_in_flight();
}

size_t TCPConnection::unassembled_bytes() const {
return _receiver.unassembled_bytes();
}

size_t TCPConnection::time_since_last_segment_received() const {
return cur_time_since_last_segment_received;
}

void TCPConnection::segment_received(const TCPSegment &seg) {
if(!_sender.sender_syn_sent() && !seg.header().syn && !recv_start) return; // 如果我们没发送syn 对面发送的这个数据也不是syn 并且也没有启动(没这个标记的话 对面发送syn 之后的数据就没法接收了) 那就拒绝接收数据
recv_start=true;// 开始启动
if(rst_receved) return;

if(seg.header().rst){
_receiver.stream_out().set_error();
_sender.stream_in().set_error();
_linger_after_streams_finish=false;
rst_receved=true;
active_tcp=false;
}

cur_time_since_last_segment_received=0;// 重设间隔时间

_receiver.segment_received(seg);
_sender.ack_received(seg.header().ackno, seg.header().win); // 数据传输给接收类,发送类发送对应数据

if (_receiver.stream_out().input_ended() && !_sender.stream_in().eof()) {
// 如果入站流在TCPConnection到达其出站流的EOF之前结束,则需要将此变量设置为false
_linger_after_streams_finish = false;
// 被动关闭 对面fin之后 我们发送ack 然后接收类就处理数据发送完毕后 会发送fin给对方 后半部分的逻辑和主动关闭差不多
}


if (_sender.segments_out().empty()) {// 没有数据可以发送时候 收到了数据包 可能是三次握手的syn 四次挥手的fin 和 ack,这时候发送类不会不会产生包来发送
//需要产生一个空段进行ack
if (_receiver.stream_out().input_ended() && !seg.header().fin) {
// 已经关闭了 被动或者主动关闭 这时候收到不是fin的包就不产生空包回应

}
/*else if(seg.length_in_sequence_space() == 0 &&
_sender.next_seqno_absolute() > _sender.bytes_in_flight()
&& !_sender.stream_in().eof()){
// 连接已经建立时候 需要回复空包
_sender.send_empty_segment(); // 发送空包进行ack
}*/
else if (seg.length_in_sequence_space() == 0 ) {
// 连接未建立时候就不发送空包回复
// 不直接return是因为 可能是有其他数据需要发送
// cout<<" 没有建立连接的时候就不发送空包回复 "<<endl;
} else {
// cout<<" 发送空包 "<<endl;
_sender.send_empty_segment(); // 发送空包进行ack
}
}


send_segments();

}

bool TCPConnection::active() const {

if(_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended() && (cur_time_since_last_segment_received >= _cfg.rt_timeout * 10 || _linger_after_streams_finish==false))
return false; // 数据全部接收处理 发送 被确认 结束 且 time_wait时间结束(或者是被动关闭 不需要time_wait)


if (rst_receved || !active_tcp) return false;


return true;
}

size_t TCPConnection::write(const string &data) {
size_t writen_num=_sender.stream_in().write(data);
_sender.fill_window();
send_segments();
return writen_num;
}

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
_sender.tick(ms_since_last_tick);
cur_time_since_last_segment_received+=ms_since_last_tick;
//cout<<" 间隔时间 "<<cur_time_since_last_segment_received<<" "<<endl;
send_segments();// 时间流逝后 发送类可能会有新的数据需要发送 因此需要send_segments()

if (_time_wait && cur_time_since_last_segment_received >= _cfg.rt_timeout * 10) {
// 关闭连接
//cout<<" 关闭连接 "<<endl;
_time_wait = false;
recv_start=false;
active_tcp=false;
}

if (_sender.consecutive_retransmissions() > _cfg.MAX_RETX_ATTEMPTS) {
// 重传次数太多 就进入rst状态
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
_linger_after_streams_finish = false;
while (!_sender.segments_out().empty()) {
// pop出数据
_sender.segments_out().pop();
}
_sender.send_empty_segment();
TCPSegment& seg = _sender.segments_out().front();
seg.header().rst = true;
active_tcp=false;
rst_receved=false;
send_segments();
}


return;
}

void TCPConnection::end_input_stream() {
_sender.stream_in().end_input(); //结束输出流,因此先关闭有序字节的写端
_sender.fill_window();// 然后输出剩余的所有 未被发送的 数据
send_segments();
return;
}

void TCPConnection::connect() {
if(!_sender.sender_syn_sent()){
_sender.fill_window();
TCPSegment& syn_seg = _sender.segments_out().front();
syn_seg.header().win = min(_receiver.window_size(), static_cast<size_t>(UINT16_MAX));// uint16_t win = 0; window size ,防止传递窗口过大
_segments_out.push(syn_seg);
_sender.segments_out().pop();
}
return;
}

void TCPConnection::send_segments() {
if (!active_tcp || rst_receved) return;
while (!_sender.segments_out().empty()) {
TCPSegment& seg = _sender.segments_out().front();
if (_receiver.ackno().has_value()) {
seg.header().ack = true;
seg.header().ackno = _receiver.ackno().value();
//cout<< " 发送报文 设置ack "<<seg.header().ackno<<endl;
}// 设置ack
seg.header().win = min(_receiver.window_size(),static_cast<size_t>(UINT16_MAX));
_segments_out.push(seg);
_sender.segments_out().pop();
//if(seg.header().fin) cout<<" 发送了fin 报文 "<<endl;
}

if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended()) {
if (_linger_after_streams_finish) {// 接收类接收完毕 发送类发送完毕且发送的包也全部被确认 那就主动关闭
_time_wait = true;
// cout<<" 进入time_wait "<<endl;
}
}

return;
}

TCPConnection::~TCPConnection() {
try {
if (active()) {
//cerr << "Warning: Unclean shutdown of TCPConnection\n";
_sender.stream_in().set_error();
_receiver.stream_out().set_error();
rst_receved=true;
active_tcp=false;
_linger_after_streams_finish = false;
while (!_sender.segments_out().empty()) {
// pop all segments
_sender.segments_out().pop();
}
_sender.send_empty_segment();
TCPSegment& seg = _sender.segments_out().front();
seg.header().rst = true;
send_segments();
// Your code here: need to send a RST segment to the peer
}
} catch (const exception &e) {
//std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
}
}




优化

用官方提供的sponge/libsponge/util/buffer.cc中的 BufferList类来作为ByteStream的容器,使得原来基于内存拷贝的存储方法变为基于内存所有权转移。

用c++的 std::move assign 右值引用等新特性对 ByteStream进行改造。