diff --git a/src/window.c b/src/window.c index 9408e21..cfc8184 100644 --- a/src/window.c +++ b/src/window.c @@ -38,20 +38,20 @@ int window_debug = 0; struct frag_buffer * window_buffer_init(size_t length, unsigned windowsize, unsigned maxfraglen, int dir) { - struct frag_buffer *buf; - buf = calloc(1, sizeof(struct frag_buffer)); - if (!buf) { + struct frag_buffer *w; + w = calloc(1, sizeof(struct frag_buffer)); + if (!w) { errx(1, "Failed to allocate window buffer memory!"); } if (dir != WINDOW_RECVING && dir != WINDOW_SENDING) { errx(1, "Invalid window direction!"); } - window_buffer_resize(buf, length, maxfraglen); + window_buffer_resize(w, length, maxfraglen); - buf->windowsize = windowsize; - buf->direction = dir; - return buf; + w->windowsize = windowsize; + w->direction = dir; + return w; } void @@ -127,29 +127,30 @@ ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f) /* Handles fragment received from the sending side (RECV) * Returns index of fragment in window or <0 if dropped - * The next ACK MUST be for this fragment */ + * The next ACK MUST be for this fragment + * Slides window forward if fragment received which is just above end seqID */ +/* XXX Use whole buffer to receive and reassemble fragments + * Old frags are "cleared" by being overwritten by newly received frags. (TODO) + * Reassemble just starts at oldest slot (chunk_start) in window and continues until all frags + * in buffer have been found. chunk_start incremented only if no holes found (tick). + */ { /* Check if packet is in window */ unsigned startid, endid, offset; + int future = 0; fragment *fd; startid = w->start_seq_id; - endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID; + endid = WRAPSEQ(startid + w->length); offset = SEQ_OFFSET(startid, f->seqID); if (!INWINDOW_SEQ(startid, endid, f->seqID)) { + WDEBUG("Drop frag ID %u: cannot fit in window buffer (%u-%u)", + f->seqID, startid, endid); w->oos++; - if (offset > MIN(w->length - w->numitems, MAX_SEQ_ID / 2)) { - /* Only drop the fragment if it is ancient */ - WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid); - return -1; - } else { - /* Save "new" fragments to avoid causing other end to advance - * when this fragment is ACK'd despite being dropped */ - WDEBUG("WARNING: Got future fragment (%u), offset %u from start %u (wsize %u).", - f->seqID, offset, startid, w->windowsize); - } + return -1; } - /* Place fragment into correct location in buffer */ + /* Place fragment into correct location in buffer, possibly overwriting + * older and not-yet-reassembled fragment */ ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); WDEBUG(" Putting frag seq %u into frags[%" L "u + %u = %" L "u]", f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); @@ -187,87 +188,73 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) int window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_t *compression) { - if (!len) { - errx(1, "window_reassemble_data: len pointer is NULL!"); - } - - size_t woffs, fraglen, start; + size_t woffs, start; size_t maxlen = *len; uint8_t *dest; //, *fdata_start; *len = 0; dest = data; - if (w->direction != WINDOW_RECVING) - return 0; - /* start fragment may be missing, so only stop if w is empty */ - if (w->frags[w->chunk_start].start == 0 && w->numitems == 0) { - WDEBUG("chunk_start (%" L "u) != start and w empty (seq %u, len %" L "u)!", - w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len); + /* nothing to try reassembling if w is empty */ + if (w->numitems == 0) { + WDEBUG("window buffer empty, nothing to reassemble"); return 0; } if (compression) *compression = 1; fragment *f; - size_t i; unsigned curseq, consecutive_frags = 0, holes = 0, found_frags = 0; int end = 0, drop = 0; /* if packet is dropped */ - curseq = w->start_seq_id; - - for (i = 0; i < w->numitems; ++i) { + for (size_t i = 0; found_frags < w->numitems; i++) { woffs = WRAP(w->chunk_start + i); + curseq = WRAPSEQ(w->cur_seq_id + i); f = &w->frags[woffs]; - fraglen = f->len; /* TODO Drop packets if some fragments are missing after reaching max retries * or packet timeout - * Note: this lowers the guaranteed arrival constraint - * Note: Continue reassembling full packets until none left in buffer; + * Note: this lowers the guaranteed arrival constraint */ + + /* Note: Continue reassembling full packets until none left in buffer; * several full packets are sometimes left in buffer unprocessed * so we must not just taking the oldest full packet and ignore newer ones */ - /* Process: - * if buffer contains >0 frags - * for frag in buffer from start { - * if frag empty: skip; else - * if frag.start { - * attempt reassembly as normal; - * continue from end of full packet; - * } else skip; - * endif - * } - */ - if (fraglen == 0) { /* Empty fragment */ - WDEBUG("reassemble: hole at frag %u [%" L "u]", - curseq, woffs, f->seqID, fraglen); - + if (f->len == 0) { /* Empty fragment */ + WDEBUG("reassemble: hole at frag id %u [%" L "u]", curseq, woffs); /* reset reassembly things to start over */ consecutive_frags = 0; holes++; + continue; + } - } else if (f->start || consecutive_frags >= 1) { + found_frags++; + if (f->seqID != curseq) { + /* this is a serious bug. exit nastily */ + errx(1, "reassemble: frag [%" L "u] seqID mismatch: f=%u, cur=%u", + woffs, f->seqID, curseq); + } + if (f->start || consecutive_frags >= 1) { found_frags++; consecutive_frags++; if (drop == 0) { /* Copy next fragment to buffer if not going to drop */ - memcpy(dest, f->data, MIN(fraglen, maxlen)); + memcpy(dest, f->data, MIN(f->len, maxlen)); } - dest += fraglen; - *len += fraglen; + if (f->len > maxlen) { + WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len); + drop = 1; + } + dest += f->len; + *len += f->len; + maxlen -= f->len; + if (compression) { *compression &= f->compressed & 1; if (f->compressed != *compression) { WDEBUG("Inconsistent compression flags in chunk. Will reassemble anyway!"); } } - if (fraglen > maxlen) { - WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len); - drop = 1; - } + WDEBUG("reassemble: id %u, len %" L "u, offs %" \ L "u, total %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u", - f->seqID, fraglen, dest - data, *len, maxlen, found_frags, consecutive_frags); - - /* Move window along to avoid weird issues */ - window_tick(w); + f->seqID, f->len, dest - data, *len, maxlen, found_frags, consecutive_frags); if (f->end == 1) { WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)", @@ -275,16 +262,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ end = 1; break; } - - if (found_frags >= w->numitems) { - /* no point continuing if no full packets yet and no other action */ - return 0; - } } - - /* Move position counters and expected next seqID */ - maxlen -= fraglen; - curseq = (curseq + 1) % MAX_SEQ_ID; } if (end == 0 && drop == 0) { @@ -302,10 +280,18 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ p = (p <= 0) ? w->length - 1 : p - 1; } if (holes == 0) { - /* move start of window forwards only if there are no pending fragments (holes) + /* slide chunk_start++ only if there are no missing fragments (holes) * or incomplete packets that we might have skipped */ w->chunk_start = WRAP(woffs + 1); + w->cur_seq_id = WRAPSEQ(curseq + 1); } + + unsigned csdist = DISTF(w->length, w->window_start, w->chunk_start); + if (csdist <= w->length / 2) { + /* chunk_start is most likely ahead of window_start, so slide window to catch up */ + window_slide(w, csdist, 1); + } + w->numitems -= consecutive_frags; return found_frags >= consecutive_frags; } @@ -444,40 +430,64 @@ window_ack(struct frag_buffer *w, int seqid) } } +/* Slide window forwards by given number of frags, clearing out old frags */ +void +window_slide(struct frag_buffer *w, unsigned slide, int delete) +{ + WDEBUG("moving window forwards by %u; %" L "u-%" L "u (%u) to %" L "u-%" L "u (%u) len=%" L "u", + slide, w->window_start, w->window_end, w->start_seq_id, AFTER(w, slide), + AFTER(w, w->windowsize + slide), AFTERSEQ(w, slide), w->length); + + /* Requirements for fragment being cleared (SENDING): + * (must have been sent) AND + * (((must be received) AND (must be acknowledged)) OR + * (not acknowledged if ACK not required)) + * + * Fragments (or holes) cleared on RECEIVING must: + * ((be received) AND (be ACK'd)) OR (... see window_reassemble_data) + */ + if (delete) { + /* Clear old frags or holes */ + unsigned nfrags = 0; + for (unsigned i = 0; i < slide; i++) { + size_t woffs = WRAP(w->window_start + i); + fragment *f = &w->frags[woffs]; + if (f->len != 0) { + WDEBUG(" clear frag id %u, len %" L "u at index %" L "u", + f->seqID, f->len, woffs); + f->len = 0; + nfrags ++; + } else { + WDEBUG(" clear hole at index %" L "u", woffs); + } + } + + WDEBUG(" chunk_start: %" L "u -> %" L "u", w->chunk_start, w->window_start); + w->numitems -= nfrags; + w->chunk_start = AFTER(w, slide); + } + + /* Update window status */ + w->window_start = AFTER(w, slide); + w->window_end = AFTER(w, w->windowsize); + w->start_seq_id = AFTERSEQ(w, slide); +} + /* Function to be called after all other processing has been done * when anything happens (moves window etc) (SEND/RECV) */ void window_tick(struct frag_buffer *w) { + unsigned slide = 0; for (size_t i = 0; i < w->windowsize; i++) { // TODO are ACKs required for reduced arrival guarantee? - /* Requirements for fragment being cleared (SENDING): - * (must have been sent) AND - * (((must be received) AND (must be acknowledged)) OR - * (not acknowledged if ACK not required)) - * - * Fragments (or holes) cleared on RECEIVING must: - * ((be received) AND (be ACK'd)) OR (... see window_reassemble_data) - */ - fragment *f = &w->frags[w->window_start]; + fragment *f = &w->frags[WRAP(w->window_start + i)]; if (f->len > 0 && f->acks >= 1) { -#ifdef DEBUG_BUILD - unsigned old_start_id = w->start_seq_id; -#endif - w->start_seq_id = (w->start_seq_id + 1) % MAX_SEQ_ID; - WDEBUG("moving window forwards; %" L "u-%" L "u (%u) to %" L "u-%" L "u (%u) len=%" L "u", - w->window_start, w->window_end, old_start_id, AFTER(w, 1), - AFTER(w, w->windowsize + 1), w->start_seq_id, w->length); - if (w->direction == WINDOW_SENDING) { - WDEBUG("Clearing old fragments in SENDING window."); - w->numitems --; /* Clear old fragments */ - f->len = 0; - } - w->window_start = AFTER(w, 1); - - w->window_end = AFTER(w, w->windowsize); + /* count consecutive fragments from start of window that are ACK'd */ + slide++; } else break; } + if (slide > 0) window_slide(w, slide, w->direction == WINDOW_SENDING); } /* Splits data into fragments and adds to the end of the window buffer for sending @@ -512,8 +522,8 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8 f->lastsent.tv_usec = 0; w->last_write = WRAP(w->last_write + 1); + w->cur_seq_id = WRAPSEQ(w->cur_seq_id + 1); w->numitems ++; - w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID; WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", f->len, f->seqID, f->start, f->end, offset); offset += f->len; diff --git a/src/window.h b/src/window.h index 38e4f27..0482b33 100644 --- a/src/window.h +++ b/src/window.h @@ -21,6 +21,7 @@ * These should match the limitations of the protocol. */ #define MAX_SEQ_ID 256 #define MAX_FRAGSIZE 2048 +#define MAX_SEQ_AHEAD (MAX_SEQ_ID / 2) /* Window function definitions. */ #define WINDOW_SENDING 1 @@ -64,7 +65,7 @@ extern int window_debug; /* Window debugging macro */ #ifdef DEBUG_BUILD #define WDEBUG(...) if (window_debug) {\ - TIMEPRINT("[WINDOW-DEBUG] (%s:%d) ", __FILE__, __LINE__);\ + TIMEPRINT("[WDEBUG:%s] (%s:%d) ", w->direction == WINDOW_SENDING ? "S" : "R", __FILE__, __LINE__);\ fprintf(stderr, __VA_ARGS__);\ fprintf(stderr, "\n");\ } @@ -78,23 +79,25 @@ extern int window_debug; /* Gets index of fragment o fragments after window start */ #define AFTER(w, o) ((w->window_start + o) % w->length) +/* Gets seqID of fragment o fragments after window start seqID */ +#define AFTERSEQ(w, o) ((w->start_seq_id + o) % MAX_SEQ_ID) + /* Distance (going forwards) between a and b in window of length l */ -#define DISTF(l, a, b) (((a > b) ? a-b : l-a+b-1) % l) +#define DISTF(l, a, b) ((a <= b) ? b-a : l-a+b) /* Distance backwards between a and b in window of length l */ -#define DISTB(l, a, b) (((a < b) ? l-b+a-1 : a-b) % l) +#define DISTB(l, a, b) (l-DISTF(l, a, b)) /* Check if fragment index a is within window_buffer *w */ #define INWINDOW_INDEX(w, a) ((w->window_start < w->window_end) ? \ - (a >= w->window_start && a <= w->window_end) : \ - ((a >= w->window_start && a <= w->length - 1) || \ - (a >= 0 && a <= w->window_end))) + (a >= w->window_start && a < w->window_end) : \ + ((a >= w->window_start && a < w->length) || \ + (a >= 0 && a < w->window_end))) /* Check if sequence ID a is within sequence range start to end */ #define INWINDOW_SEQ(start, end, a) ((start < end) ? \ - (a >= start && a <= end) : \ - ((a >= start && a <= MAX_SEQ_ID - 1) || \ - (a <= end))) + (a >= start && a < end) : \ + ((a >= start && a < MAX_SEQ_ID) || (a < end))) /* Find the wrapped offset between sequence IDs start and a * Note: the maximum possible offset is MAX_SEQ_ID - 1 */ @@ -103,6 +106,10 @@ extern int window_debug; /* Wrap index x to a value within the window buffer length */ #define WRAP(x) ((x) % w->length) +/* Wrap index x to a value within the seqID range */ +#define WRAPSEQ(x) ((x) % MAX_SEQ_ID) + + /* Perform wrapped iteration of statement with pos = (begin to end) wrapped at * max, executing statement f for every value of pos. */ #define ITER_FORWARD(begin, end, max, pos, f) { \ @@ -148,6 +155,8 @@ int window_get_next_ack(struct frag_buffer *w); /* Sets the fragment with seqid to be ACK'd (SEND) */ void window_ack(struct frag_buffer *w, int seqid); +void window_slide(struct frag_buffer *w, unsigned slide, int delete); + /* To be called after all other processing has been done * when anything happens (moves window etc) (SEND/RECV) */ void window_tick(struct frag_buffer *w);