#如何检测丢包

IP网络中数据包丢失是比较常见的,偶尔还会出现一些乱序的情况,可能的原因是链路的负载均衡。对于FEC恢复的包来说,乱序则是一种非常常见的现象了。要检查乱序,也就是说当发现包序号有间隔时,需要再等等,等后的包收到一些后,再看是乱序还是丢包。

先看几个例子,用1表示包收到,0表示包丢失,等待5个包的时间间隔,也就是说收到了5个包之后,第6个收到的包还不是之前期望收到的才认为是丢掉的,序号从1开始,”?”表示当前这个包的状态不确定:

11110 11111 - 这个序列,收到包6时,还不能立即判断包5就丢了,要等到收到包11才能认为之前的包丢了。所以检测的结果是1111? 11111

10101 00111 01 - 这个序列,包2的丢失要等到收到包12收到,后面的包4,6,7,11的状态是未定的。检测的结果是101?1??111?1

10000 01111 11 - 这个序列, 当收到包12时,才能判断前面的包2,3,4,5,6都丢掉了。检测的结果是10000 01111 11

算法实现

用m_seqExpct来存储当前期望收到的包序号,如果当前的包序号nowSeq比期望的m_seqExpct要大,那么[m_seqExpct, nowSeq)之间的包的状态都是未知的,需要等到后面的包收到才能确定。这里就需要对比期望的m_seqExpct要大的nowSeq做一个缓存,缓存的大小就是容忍的乱序大小,缓存内的序号按从小到大的顺序存储,当缓存满了的时候,就可以确定那些序号在[m_seqExpct, min(nowSeq, m_seqCache[0].seq))之间的包都是是丢掉的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
void CLossDetection::RecvPkt(uint16_t seq)
{
PktRecvd(seq);

if(m_initialized) {
if( seq == m_seqExpct ) {
++m_seqExpct;
UpdateCache();
} else if(WrappingCompareLess16(m_seqExpct, seq)) {
if( m_cachedCnt < ARRAY_LENGTH(m_seqCache) ) {
CacheSeq(now, seq);
} else {
//cache is full, check seq less than the first one in cache
while(WrappingCompareLess16(m_seqExpct, m_seqCache[0].seq)) {
if( m_seqExpct != seq ) {
PktDropped(m_seqExpct);
++m_seqExpct;
} else {
++m_seqExpct;
break;
}
}

UpdateCache();

//recheck again because m_seqExpct may changed
if( m_seqExpct == seq ) {
++m_seqExpct;
} else if( WrappingCompareLess16(m_seqExpct, seq) ) {
CacheSeq(now, seq);
}
}
} else {
// duplicate or arrive too late
}
} else {
m_seqExpct = seq + 1;
m_initialized = true;
}
}

void CLossDetection::UpdateCache()
{
// clear recvd pkts and move remaining pkts to front
if(m_cachedCnt <= ARRAY_LENGTH(m_seqCache)) {
uint32_t i = 0;
for(; i< m_cachedCnt; ++i) {
if(m_seqExpct == m_seqCache[i].seq) {
++m_seqExpct;
} else {
break;
}
}

if( i!= 0 ) {
for(uint32_t j=i; j<m_cachedCnt; ++j) {
m_seqCache[j-i] = m_seqCache[j];
}
m_cachedCnt -= i;
}
}
}

void CLossDetection::CacheSeq(uint32_t now, uint16_t seq)
{
//at least one slot for seq
if( m_cachedCnt < ARRAY_LENGTH(m_seqCache) ) {

int32_t pivot = -1;//pos before insert
for(int32_t i=static_cast<int32_t>(m_cachedCnt)-1; i>=0 ; --i) {
if( WrappingCompareLess16(m_seqCache[i].seq, seq) ) {
pivot = i;
break;
} else if( m_seqCache[i].seq == seq ) {
return;
}
}

for(int32_t j=m_cachedCnt-1; j>pivot; --j) {
if(j < static_cast<int32_t>(ARRAY_LENGTH(m_seqCache)-1) && j>=0) {
m_seqCache[j+1] = m_seqCache[j];
} else {
assert(false);
}
}

if(pivot+1 < static_cast<int32_t>(ARRAY_LENGTH(m_seqCache)) ) {
m_seqCache[pivot+1] = PktItem(now, seq);
} else {
assert(false);
}

++m_cachedCnt;
}
}

bool WrappingCompareLess(uint16_t l, uint16_t r)
{

uint16_t distDown = l - r;
uint16_t distUp = r - l;
return distUp < distDown;
}