From 6e7099866f4700e6975408aff08f715ce0bf654d Mon Sep 17 00:00:00 2001 From: frekky Date: Tue, 4 Jul 2017 21:44:12 +0200 Subject: [PATCH] fix reassembly, ticking and sliding --- src/window.c | 70 +++++++++++++++++++++++++++++----------------------- src/window.h | 2 +- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/src/window.c b/src/window.c index 1e5708f..c771afe 100644 --- a/src/window.c +++ b/src/window.c @@ -158,8 +158,7 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) ssize_t dest = WRAP(w->chunk_start + offset); if (offset > w->length - w->windowsize) { - w->oos++; - WDEBUG("incoming frag far ahead: offs %u > %u, cs %u[%" L "u], id %u[%" L "u]", + WDEBUG("incoming frag ahead: offs %u > %u, cs %u[%" L "u], id %u[%" L "u]", offset, w->length - w->windowsize, w->start_seq_id, w->chunk_start, f->seqID, dest); offset -= w->length - w->windowsize; @@ -204,6 +203,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ { size_t woffs, start; size_t maxlen = *len; + size_t fraglen = 0; uint8_t *dest; //, *fdata_start; *len = 0; dest = data; @@ -246,18 +246,17 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ woffs, f->seqID, curseq); } if (f->start || consecutive_frags >= 1) { - found_frags++; consecutive_frags++; if (drop == 0) { /* Copy next fragment to buffer if not going to drop */ memcpy(dest, f->data, MIN(f->len, maxlen)); } if (f->len > maxlen) { - WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len); + WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", fraglen); drop = 1; } dest += f->len; - *len += f->len; + fraglen += f->len; maxlen -= f->len; if (compression) { @@ -267,9 +266,9 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ } } - WDEBUG("reassemble: id %u, len %" L "u, offs %" \ - L "u, total %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u", - f->seqID, f->len, dest - data, *len, maxlen, found_frags, consecutive_frags); + WDEBUG("reassemble: id %u [%" L "u], len %" L "u, offs %" \ + L "u, total %" L "u, maxlen %" L "u, found %" L "u/%" L "u, consecutive %" L "u", + woffs, f->seqID, f->len, dest - data, *len, maxlen, found_frags, w->numitems, consecutive_frags); if (f->end == 1) { WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)", @@ -286,6 +285,9 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ return 0; } + if (!drop) + *len = fraglen; + WDEBUG("Reassembled %" L "ub from %" L "u frags; comp=%u; holes=%u", *len, consecutive_frags, *compression, holes); /* Clear all used fragments, going backwards from last processed */ @@ -312,11 +314,12 @@ window_sending(struct frag_buffer *w, struct timeval *nextresend) oldest.tv_sec = 0; oldest.tv_usec = 0; + if (nextresend) { + nextresend->tv_sec = 0; + nextresend->tv_usec = 0; + } + if (w->numitems == 0) { - if (nextresend) { - nextresend->tv_sec = 0; - nextresend->tv_usec = 0; - } return 0; } @@ -324,18 +327,18 @@ window_sending(struct frag_buffer *w, struct timeval *nextresend) for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[WRAP(w->window_start + i)]; - if (f->len == 0 || f->acks >= 1) continue; + if (f->len == 0 || f->acks >= 1 || f->retries > w->max_retries) + continue; - if (f->retries < 1 || f->lastsent.tv_sec == 0) { - /* Sending frag for first time - * Note: if retries==0 then lastsent MUST also be 0 */ + if (f->retries < 1) { + /* Sending frag for first time */ tosend++; } else { /* Frag has been sent before so lastsent is a valid timestamp */ timersub(&now, &f->lastsent, &age); - if (!timercmp(&age, &w->timeout, <) && f->retries < w->max_retries) { - /* ACK timeout: Frag will be resent */ + if (!timercmp(&age, &w->timeout, <)) { + /* ACK timeout: Frag will be resent if not to be dropped */ tosend++; } else if (timercmp(&age, &oldest, >)) { /* Hasn't timed out yet and is oldest so far */ @@ -344,7 +347,7 @@ window_sending(struct frag_buffer *w, struct timeval *nextresend) } } - if (nextresend) { + if (nextresend && w->max_retries > 0) { /* nextresend = time before oldest fragment (not being sent now) * will be re-sent = timeout - age */ timersub(&w->timeout, &oldest, nextresend); @@ -368,11 +371,12 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) for (size_t i = 0; i < w->windowsize; i++) { f = &w->frags[WRAP(w->window_start + i)]; - if (f->acks >= 1 || f->len == 0) continue; + if (f->acks >= 1 || f->len == 0 || f->retries > w->max_retries) + continue; timersub(&now, &f->lastsent, &age); - if (f->retries >= 1 && f->retries < w->max_retries && !timercmp(&age, &w->timeout, <)) { + if (f->retries >= 1 && !timercmp(&age, &w->timeout, <)) { /* Resending fragment due to ACK timeout */ WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/max %u/total %u", f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->max_retries, w->resends); @@ -396,7 +400,8 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) f->start &= 1; f->end &= 1; f->retries++; - gettimeofday(&f->lastsent, NULL); + f->lastsent.tv_sec = now.tv_sec; + f->lastsent.tv_usec = now.tv_usec; return f; } @@ -469,12 +474,14 @@ window_tick(struct frag_buffer *w) { unsigned slide = 0; for (size_t i = 0; i < w->windowsize; i++) { - // TODO are ACKs required for reduced arrival guarantee? fragment *f = &w->frags[WRAP(w->window_start + i)]; - if (f->len > 0 && (f->acks >= 1 || (w->max_retries == 0 && f->lastsent.tv_sec != 0))) { - /* count consecutive fragments from start of window that are ACK'd */ + if (f->len > 0 && (f->acks >= 1 || f->retries > w->max_retries)) { + /* count consecutive fragments from start of window that are ACK'd + * or that have been sent/retried maximum times */ slide++; - } else break; + } else { + break; + } } if (slide > 0) window_slide(w, slide, w->direction == WINDOW_SENDING); } @@ -492,29 +499,30 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8 } compressed &= 1; size_t offset = 0; - fragment *f = &w->frags[w->last_write]; + fragment *f; WDEBUG("add_outgoing_data: chunk len %" L "u -> %" L "u frags, max fragsize %u", len, n, w->maxfraglen); for (size_t i = 0; i < n; i++) { + f = &w->frags[w->last_write]; /* copy in new data and reset frag stats */ f->len = MIN(len - offset, w->maxfraglen); f->seqID = w->cur_seq_id; f->compressed = compressed; f->start = (i == 0) ? 1 : 0; f->end = (i == n - 1) ? 1 : 0; - f->retries = 0; f->acks = 0; f->ack_other = -1; f->lastsent.tv_sec = 0; f->lastsent.tv_usec = 0; + WDEBUG(" frags[%" L "u]: len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", + w->last_write, f->len, f->seqID, f->start, f->end, offset); + + memcpy(f->data, data + offset, f->len); w->last_write = WRAP(w->last_write + 1); w->cur_seq_id = WRAPSEQ(w->cur_seq_id + 1); w->numitems ++; - WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", - f->len, f->seqID, f->start, f->end, offset); - memcpy(f->data, data + offset, f->len); offset += f->len; } return n; diff --git a/src/window.h b/src/window.h index 7de8c0c..3c3d14e 100644 --- a/src/window.h +++ b/src/window.h @@ -53,7 +53,7 @@ struct frag_buffer { unsigned maxfraglen; /* Max outgoing fragment data size */ unsigned cur_seq_id; /* Next unused sequence ID */ unsigned start_seq_id; /* lowest seqID that exists in buffer (at index chunk_start) */ - unsigned max_retries; /* max number of resends before dropping (-1 = never drop) */ + unsigned max_retries; /* max number of resends before dropping */ unsigned resends; /* number of fragments resent or number of dupes received */ unsigned oos; /* Number of out-of-sequence fragments received */ int direction; /* WINDOW_SENDING or WINDOW_RECVING */