mirror of
https://github.com/yarrick/iodine.git
synced 2025-04-10 04:21:01 +00:00
fix reassembly, ticking and sliding
This commit is contained in:
parent
eee0d14a69
commit
6e7099866f
2 changed files with 40 additions and 32 deletions
70
src/window.c
70
src/window.c
|
@ -158,8 +158,7 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
||||||
ssize_t dest = WRAP(w->chunk_start + offset);
|
ssize_t dest = WRAP(w->chunk_start + offset);
|
||||||
|
|
||||||
if (offset > w->length - w->windowsize) {
|
if (offset > w->length - w->windowsize) {
|
||||||
w->oos++;
|
WDEBUG("incoming frag ahead: offs %u > %u, cs %u[%" L "u], id %u[%" L "u]",
|
||||||
WDEBUG("incoming frag far ahead: offs %u > %u, cs %u[%" L "u], id %u[%" L "u]",
|
|
||||||
offset, w->length - w->windowsize, w->start_seq_id, w->chunk_start,
|
offset, w->length - w->windowsize, w->start_seq_id, w->chunk_start,
|
||||||
f->seqID, dest);
|
f->seqID, dest);
|
||||||
offset -= w->length - w->windowsize;
|
offset -= w->length - w->windowsize;
|
||||||
|
@ -204,6 +203,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_
|
||||||
{
|
{
|
||||||
size_t woffs, start;
|
size_t woffs, start;
|
||||||
size_t maxlen = *len;
|
size_t maxlen = *len;
|
||||||
|
size_t fraglen = 0;
|
||||||
uint8_t *dest; //, *fdata_start;
|
uint8_t *dest; //, *fdata_start;
|
||||||
*len = 0;
|
*len = 0;
|
||||||
dest = data;
|
dest = data;
|
||||||
|
@ -246,18 +246,17 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_
|
||||||
woffs, f->seqID, curseq);
|
woffs, f->seqID, curseq);
|
||||||
}
|
}
|
||||||
if (f->start || consecutive_frags >= 1) {
|
if (f->start || consecutive_frags >= 1) {
|
||||||
found_frags++;
|
|
||||||
consecutive_frags++;
|
consecutive_frags++;
|
||||||
if (drop == 0) {
|
if (drop == 0) {
|
||||||
/* Copy next fragment to buffer if not going to drop */
|
/* Copy next fragment to buffer if not going to drop */
|
||||||
memcpy(dest, f->data, MIN(f->len, maxlen));
|
memcpy(dest, f->data, MIN(f->len, maxlen));
|
||||||
}
|
}
|
||||||
if (f->len > maxlen) {
|
if (f->len > maxlen) {
|
||||||
WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len);
|
WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", fraglen);
|
||||||
drop = 1;
|
drop = 1;
|
||||||
}
|
}
|
||||||
dest += f->len;
|
dest += f->len;
|
||||||
*len += f->len;
|
fraglen += f->len;
|
||||||
maxlen -= f->len;
|
maxlen -= f->len;
|
||||||
|
|
||||||
if (compression) {
|
if (compression) {
|
||||||
|
@ -267,9 +266,9 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
WDEBUG("reassemble: id %u, len %" L "u, offs %" \
|
WDEBUG("reassemble: id %u [%" L "u], len %" L "u, offs %" \
|
||||||
L "u, total %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u",
|
L "u, total %" L "u, maxlen %" L "u, found %" L "u/%" L "u, consecutive %" L "u",
|
||||||
f->seqID, f->len, dest - data, *len, maxlen, found_frags, consecutive_frags);
|
woffs, f->seqID, f->len, dest - data, *len, maxlen, found_frags, w->numitems, consecutive_frags);
|
||||||
|
|
||||||
if (f->end == 1) {
|
if (f->end == 1) {
|
||||||
WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)",
|
WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)",
|
||||||
|
@ -286,6 +285,9 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!drop)
|
||||||
|
*len = fraglen;
|
||||||
|
|
||||||
WDEBUG("Reassembled %" L "ub from %" L "u frags; comp=%u; holes=%u",
|
WDEBUG("Reassembled %" L "ub from %" L "u frags; comp=%u; holes=%u",
|
||||||
*len, consecutive_frags, *compression, holes);
|
*len, consecutive_frags, *compression, holes);
|
||||||
/* Clear all used fragments, going backwards from last processed */
|
/* Clear all used fragments, going backwards from last processed */
|
||||||
|
@ -312,11 +314,12 @@ window_sending(struct frag_buffer *w, struct timeval *nextresend)
|
||||||
oldest.tv_sec = 0;
|
oldest.tv_sec = 0;
|
||||||
oldest.tv_usec = 0;
|
oldest.tv_usec = 0;
|
||||||
|
|
||||||
|
if (nextresend) {
|
||||||
|
nextresend->tv_sec = 0;
|
||||||
|
nextresend->tv_usec = 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (w->numitems == 0) {
|
if (w->numitems == 0) {
|
||||||
if (nextresend) {
|
|
||||||
nextresend->tv_sec = 0;
|
|
||||||
nextresend->tv_usec = 0;
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,18 +327,18 @@ window_sending(struct frag_buffer *w, struct timeval *nextresend)
|
||||||
|
|
||||||
for (size_t i = 0; i < w->windowsize; i++) {
|
for (size_t i = 0; i < w->windowsize; i++) {
|
||||||
f = &w->frags[WRAP(w->window_start + i)];
|
f = &w->frags[WRAP(w->window_start + i)];
|
||||||
if (f->len == 0 || f->acks >= 1) continue;
|
if (f->len == 0 || f->acks >= 1 || f->retries > w->max_retries)
|
||||||
|
continue;
|
||||||
|
|
||||||
if (f->retries < 1 || f->lastsent.tv_sec == 0) {
|
if (f->retries < 1) {
|
||||||
/* Sending frag for first time
|
/* Sending frag for first time */
|
||||||
* Note: if retries==0 then lastsent MUST also be 0 */
|
|
||||||
tosend++;
|
tosend++;
|
||||||
} else {
|
} else {
|
||||||
/* Frag has been sent before so lastsent is a valid timestamp */
|
/* Frag has been sent before so lastsent is a valid timestamp */
|
||||||
timersub(&now, &f->lastsent, &age);
|
timersub(&now, &f->lastsent, &age);
|
||||||
|
|
||||||
if (!timercmp(&age, &w->timeout, <) && f->retries < w->max_retries) {
|
if (!timercmp(&age, &w->timeout, <)) {
|
||||||
/* ACK timeout: Frag will be resent */
|
/* ACK timeout: Frag will be resent if not to be dropped */
|
||||||
tosend++;
|
tosend++;
|
||||||
} else if (timercmp(&age, &oldest, >)) {
|
} else if (timercmp(&age, &oldest, >)) {
|
||||||
/* Hasn't timed out yet and is oldest so far */
|
/* Hasn't timed out yet and is oldest so far */
|
||||||
|
@ -344,7 +347,7 @@ window_sending(struct frag_buffer *w, struct timeval *nextresend)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nextresend) {
|
if (nextresend && w->max_retries > 0) {
|
||||||
/* nextresend = time before oldest fragment (not being sent now)
|
/* nextresend = time before oldest fragment (not being sent now)
|
||||||
* will be re-sent = timeout - age */
|
* will be re-sent = timeout - age */
|
||||||
timersub(&w->timeout, &oldest, nextresend);
|
timersub(&w->timeout, &oldest, nextresend);
|
||||||
|
@ -368,11 +371,12 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
|
||||||
|
|
||||||
for (size_t i = 0; i < w->windowsize; i++) {
|
for (size_t i = 0; i < w->windowsize; i++) {
|
||||||
f = &w->frags[WRAP(w->window_start + i)];
|
f = &w->frags[WRAP(w->window_start + i)];
|
||||||
if (f->acks >= 1 || f->len == 0) continue;
|
if (f->acks >= 1 || f->len == 0 || f->retries > w->max_retries)
|
||||||
|
continue;
|
||||||
|
|
||||||
timersub(&now, &f->lastsent, &age);
|
timersub(&now, &f->lastsent, &age);
|
||||||
|
|
||||||
if (f->retries >= 1 && f->retries < w->max_retries && !timercmp(&age, &w->timeout, <)) {
|
if (f->retries >= 1 && !timercmp(&age, &w->timeout, <)) {
|
||||||
/* Resending fragment due to ACK timeout */
|
/* Resending fragment due to ACK timeout */
|
||||||
WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/max %u/total %u",
|
WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/max %u/total %u",
|
||||||
f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->max_retries, w->resends);
|
f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->max_retries, w->resends);
|
||||||
|
@ -396,7 +400,8 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
|
||||||
f->start &= 1;
|
f->start &= 1;
|
||||||
f->end &= 1;
|
f->end &= 1;
|
||||||
f->retries++;
|
f->retries++;
|
||||||
gettimeofday(&f->lastsent, NULL);
|
f->lastsent.tv_sec = now.tv_sec;
|
||||||
|
f->lastsent.tv_usec = now.tv_usec;
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -469,12 +474,14 @@ window_tick(struct frag_buffer *w)
|
||||||
{
|
{
|
||||||
unsigned slide = 0;
|
unsigned slide = 0;
|
||||||
for (size_t i = 0; i < w->windowsize; i++) {
|
for (size_t i = 0; i < w->windowsize; i++) {
|
||||||
// TODO are ACKs required for reduced arrival guarantee?
|
|
||||||
fragment *f = &w->frags[WRAP(w->window_start + i)];
|
fragment *f = &w->frags[WRAP(w->window_start + i)];
|
||||||
if (f->len > 0 && (f->acks >= 1 || (w->max_retries == 0 && f->lastsent.tv_sec != 0))) {
|
if (f->len > 0 && (f->acks >= 1 || f->retries > w->max_retries)) {
|
||||||
/* count consecutive fragments from start of window that are ACK'd */
|
/* count consecutive fragments from start of window that are ACK'd
|
||||||
|
* or that have been sent/retried maximum times */
|
||||||
slide++;
|
slide++;
|
||||||
} else break;
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (slide > 0) window_slide(w, slide, w->direction == WINDOW_SENDING);
|
if (slide > 0) window_slide(w, slide, w->direction == WINDOW_SENDING);
|
||||||
}
|
}
|
||||||
|
@ -492,29 +499,30 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8
|
||||||
}
|
}
|
||||||
compressed &= 1;
|
compressed &= 1;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
fragment *f = &w->frags[w->last_write];
|
fragment *f;
|
||||||
WDEBUG("add_outgoing_data: chunk len %" L "u -> %" L "u frags, max fragsize %u",
|
WDEBUG("add_outgoing_data: chunk len %" L "u -> %" L "u frags, max fragsize %u",
|
||||||
len, n, w->maxfraglen);
|
len, n, w->maxfraglen);
|
||||||
for (size_t i = 0; i < n; i++) {
|
for (size_t i = 0; i < n; i++) {
|
||||||
|
f = &w->frags[w->last_write];
|
||||||
/* copy in new data and reset frag stats */
|
/* copy in new data and reset frag stats */
|
||||||
f->len = MIN(len - offset, w->maxfraglen);
|
f->len = MIN(len - offset, w->maxfraglen);
|
||||||
f->seqID = w->cur_seq_id;
|
f->seqID = w->cur_seq_id;
|
||||||
f->compressed = compressed;
|
f->compressed = compressed;
|
||||||
f->start = (i == 0) ? 1 : 0;
|
f->start = (i == 0) ? 1 : 0;
|
||||||
f->end = (i == n - 1) ? 1 : 0;
|
f->end = (i == n - 1) ? 1 : 0;
|
||||||
|
|
||||||
f->retries = 0;
|
f->retries = 0;
|
||||||
f->acks = 0;
|
f->acks = 0;
|
||||||
f->ack_other = -1;
|
f->ack_other = -1;
|
||||||
f->lastsent.tv_sec = 0;
|
f->lastsent.tv_sec = 0;
|
||||||
f->lastsent.tv_usec = 0;
|
f->lastsent.tv_usec = 0;
|
||||||
|
|
||||||
|
WDEBUG(" frags[%" L "u]: len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u",
|
||||||
|
w->last_write, f->len, f->seqID, f->start, f->end, offset);
|
||||||
|
|
||||||
|
memcpy(f->data, data + offset, f->len);
|
||||||
w->last_write = WRAP(w->last_write + 1);
|
w->last_write = WRAP(w->last_write + 1);
|
||||||
w->cur_seq_id = WRAPSEQ(w->cur_seq_id + 1);
|
w->cur_seq_id = WRAPSEQ(w->cur_seq_id + 1);
|
||||||
w->numitems ++;
|
w->numitems ++;
|
||||||
WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u",
|
|
||||||
f->len, f->seqID, f->start, f->end, offset);
|
|
||||||
memcpy(f->data, data + offset, f->len);
|
|
||||||
offset += f->len;
|
offset += f->len;
|
||||||
}
|
}
|
||||||
return n;
|
return n;
|
||||||
|
|
|
@ -53,7 +53,7 @@ struct frag_buffer {
|
||||||
unsigned maxfraglen; /* Max outgoing fragment data size */
|
unsigned maxfraglen; /* Max outgoing fragment data size */
|
||||||
unsigned cur_seq_id; /* Next unused sequence ID */
|
unsigned cur_seq_id; /* Next unused sequence ID */
|
||||||
unsigned start_seq_id; /* lowest seqID that exists in buffer (at index chunk_start) */
|
unsigned start_seq_id; /* lowest seqID that exists in buffer (at index chunk_start) */
|
||||||
unsigned max_retries; /* max number of resends before dropping (-1 = never drop) */
|
unsigned max_retries; /* max number of resends before dropping */
|
||||||
unsigned resends; /* number of fragments resent or number of dupes received */
|
unsigned resends; /* number of fragments resent or number of dupes received */
|
||||||
unsigned oos; /* Number of out-of-sequence fragments received */
|
unsigned oos; /* Number of out-of-sequence fragments received */
|
||||||
int direction; /* WINDOW_SENDING or WINDOW_RECVING */
|
int direction; /* WINDOW_SENDING or WINDOW_RECVING */
|
||||||
|
|
Loading…
Add table
Reference in a new issue