1
0
Fork 0
mirror of https://github.com/yarrick/iodine.git synced 2025-04-04 13:53:34 +03:00

add window_slide function and adjustments to receiver code

This commit is contained in:
frekky 2017-07-03 08:41:50 +02:00
parent d58dd3185e
commit 0761223a65
2 changed files with 128 additions and 109 deletions

View file

@ -38,20 +38,20 @@ int window_debug = 0;
struct frag_buffer *
window_buffer_init(size_t length, unsigned windowsize, unsigned maxfraglen, int dir)
{
struct frag_buffer *buf;
buf = calloc(1, sizeof(struct frag_buffer));
if (!buf) {
struct frag_buffer *w;
w = calloc(1, sizeof(struct frag_buffer));
if (!w) {
errx(1, "Failed to allocate window buffer memory!");
}
if (dir != WINDOW_RECVING && dir != WINDOW_SENDING) {
errx(1, "Invalid window direction!");
}
window_buffer_resize(buf, length, maxfraglen);
window_buffer_resize(w, length, maxfraglen);
buf->windowsize = windowsize;
buf->direction = dir;
return buf;
w->windowsize = windowsize;
w->direction = dir;
return w;
}
void
@ -127,29 +127,30 @@ ssize_t
window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
/* Handles fragment received from the sending side (RECV)
* Returns index of fragment in window or <0 if dropped
* The next ACK MUST be for this fragment */
* The next ACK MUST be for this fragment
* Slides window forward if fragment received which is just above end seqID */
/* XXX Use whole buffer to receive and reassemble fragments
* Old frags are "cleared" by being overwritten by newly received frags. (TODO)
* Reassemble just starts at oldest slot (chunk_start) in window and continues until all frags
* in buffer have been found. chunk_start incremented only if no holes found (tick).
*/
{
/* Check if packet is in window */
unsigned startid, endid, offset;
int future = 0;
fragment *fd;
startid = w->start_seq_id;
endid = (w->start_seq_id + w->windowsize) % MAX_SEQ_ID;
endid = WRAPSEQ(startid + w->length);
offset = SEQ_OFFSET(startid, f->seqID);
if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
WDEBUG("Drop frag ID %u: cannot fit in window buffer (%u-%u)",
f->seqID, startid, endid);
w->oos++;
if (offset > MIN(w->length - w->numitems, MAX_SEQ_ID / 2)) {
/* Only drop the fragment if it is ancient */
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid);
return -1;
} else {
/* Save "new" fragments to avoid causing other end to advance
* when this fragment is ACK'd despite being dropped */
WDEBUG("WARNING: Got future fragment (%u), offset %u from start %u (wsize %u).",
f->seqID, offset, startid, w->windowsize);
}
return -1;
}
/* Place fragment into correct location in buffer */
/* Place fragment into correct location in buffer, possibly overwriting
* older and not-yet-reassembled fragment */
ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
WDEBUG(" Putting frag seq %u into frags[%" L "u + %u = %" L "u]",
f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest);
@ -187,87 +188,73 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
int
window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_t *compression)
{
if (!len) {
errx(1, "window_reassemble_data: len pointer is NULL!");
}
size_t woffs, fraglen, start;
size_t woffs, start;
size_t maxlen = *len;
uint8_t *dest; //, *fdata_start;
*len = 0;
dest = data;
if (w->direction != WINDOW_RECVING)
return 0;
/* start fragment may be missing, so only stop if w is empty */
if (w->frags[w->chunk_start].start == 0 && w->numitems == 0) {
WDEBUG("chunk_start (%" L "u) != start and w empty (seq %u, len %" L "u)!",
w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len);
/* nothing to try reassembling if w is empty */
if (w->numitems == 0) {
WDEBUG("window buffer empty, nothing to reassemble");
return 0;
}
if (compression) *compression = 1;
fragment *f;
size_t i;
unsigned curseq, consecutive_frags = 0, holes = 0, found_frags = 0;
int end = 0, drop = 0; /* if packet is dropped */
curseq = w->start_seq_id;
for (i = 0; i < w->numitems; ++i) {
for (size_t i = 0; found_frags < w->numitems; i++) {
woffs = WRAP(w->chunk_start + i);
curseq = WRAPSEQ(w->cur_seq_id + i);
f = &w->frags[woffs];
fraglen = f->len;
/* TODO Drop packets if some fragments are missing after reaching max retries
* or packet timeout
* Note: this lowers the guaranteed arrival constraint
* Note: Continue reassembling full packets until none left in buffer;
* Note: this lowers the guaranteed arrival constraint */
/* Note: Continue reassembling full packets until none left in buffer;
* several full packets are sometimes left in buffer unprocessed
* so we must not just taking the oldest full packet and ignore newer ones */
/* Process:
* if buffer contains >0 frags
* for frag in buffer from start {
* if frag empty: skip; else
* if frag.start {
* attempt reassembly as normal;
* continue from end of full packet;
* } else skip;
* endif
* }
*/
if (fraglen == 0) { /* Empty fragment */
WDEBUG("reassemble: hole at frag %u [%" L "u]",
curseq, woffs, f->seqID, fraglen);
if (f->len == 0) { /* Empty fragment */
WDEBUG("reassemble: hole at frag id %u [%" L "u]", curseq, woffs);
/* reset reassembly things to start over */
consecutive_frags = 0;
holes++;
continue;
}
} else if (f->start || consecutive_frags >= 1) {
found_frags++;
if (f->seqID != curseq) {
/* this is a serious bug. exit nastily */
errx(1, "reassemble: frag [%" L "u] seqID mismatch: f=%u, cur=%u",
woffs, f->seqID, curseq);
}
if (f->start || consecutive_frags >= 1) {
found_frags++;
consecutive_frags++;
if (drop == 0) {
/* Copy next fragment to buffer if not going to drop */
memcpy(dest, f->data, MIN(fraglen, maxlen));
memcpy(dest, f->data, MIN(f->len, maxlen));
}
dest += fraglen;
*len += fraglen;
if (f->len > maxlen) {
WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len);
drop = 1;
}
dest += f->len;
*len += f->len;
maxlen -= f->len;
if (compression) {
*compression &= f->compressed & 1;
if (f->compressed != *compression) {
WDEBUG("Inconsistent compression flags in chunk. Will reassemble anyway!");
}
}
if (fraglen > maxlen) {
WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len);
drop = 1;
}
WDEBUG("reassemble: id %u, len %" L "u, offs %" \
L "u, total %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u",
f->seqID, fraglen, dest - data, *len, maxlen, found_frags, consecutive_frags);
/* Move window along to avoid weird issues */
window_tick(w);
f->seqID, f->len, dest - data, *len, maxlen, found_frags, consecutive_frags);
if (f->end == 1) {
WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)",
@ -275,16 +262,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_
end = 1;
break;
}
if (found_frags >= w->numitems) {
/* no point continuing if no full packets yet and no other action */
return 0;
}
}
/* Move position counters and expected next seqID */
maxlen -= fraglen;
curseq = (curseq + 1) % MAX_SEQ_ID;
}
if (end == 0 && drop == 0) {
@ -302,10 +280,18 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_
p = (p <= 0) ? w->length - 1 : p - 1;
}
if (holes == 0) {
/* move start of window forwards only if there are no pending fragments (holes)
/* slide chunk_start++ only if there are no missing fragments (holes)
* or incomplete packets that we might have skipped */
w->chunk_start = WRAP(woffs + 1);
w->cur_seq_id = WRAPSEQ(curseq + 1);
}
unsigned csdist = DISTF(w->length, w->window_start, w->chunk_start);
if (csdist <= w->length / 2) {
/* chunk_start is most likely ahead of window_start, so slide window to catch up */
window_slide(w, csdist, 1);
}
w->numitems -= consecutive_frags;
return found_frags >= consecutive_frags;
}
@ -444,40 +430,64 @@ window_ack(struct frag_buffer *w, int seqid)
}
}
/* Slide window forwards by given number of frags, clearing out old frags */
void
window_slide(struct frag_buffer *w, unsigned slide, int delete)
{
WDEBUG("moving window forwards by %u; %" L "u-%" L "u (%u) to %" L "u-%" L "u (%u) len=%" L "u",
slide, w->window_start, w->window_end, w->start_seq_id, AFTER(w, slide),
AFTER(w, w->windowsize + slide), AFTERSEQ(w, slide), w->length);
/* Requirements for fragment being cleared (SENDING):
* (must have been sent) AND
* (((must be received) AND (must be acknowledged)) OR
* (not acknowledged if ACK not required))
*
* Fragments (or holes) cleared on RECEIVING must:
* ((be received) AND (be ACK'd)) OR (... see window_reassemble_data)
*/
if (delete) {
/* Clear old frags or holes */
unsigned nfrags = 0;
for (unsigned i = 0; i < slide; i++) {
size_t woffs = WRAP(w->window_start + i);
fragment *f = &w->frags[woffs];
if (f->len != 0) {
WDEBUG(" clear frag id %u, len %" L "u at index %" L "u",
f->seqID, f->len, woffs);
f->len = 0;
nfrags ++;
} else {
WDEBUG(" clear hole at index %" L "u", woffs);
}
}
WDEBUG(" chunk_start: %" L "u -> %" L "u", w->chunk_start, w->window_start);
w->numitems -= nfrags;
w->chunk_start = AFTER(w, slide);
}
/* Update window status */
w->window_start = AFTER(w, slide);
w->window_end = AFTER(w, w->windowsize);
w->start_seq_id = AFTERSEQ(w, slide);
}
/* Function to be called after all other processing has been done
* when anything happens (moves window etc) (SEND/RECV) */
void
window_tick(struct frag_buffer *w)
{
unsigned slide = 0;
for (size_t i = 0; i < w->windowsize; i++) {
// TODO are ACKs required for reduced arrival guarantee?
/* Requirements for fragment being cleared (SENDING):
* (must have been sent) AND
* (((must be received) AND (must be acknowledged)) OR
* (not acknowledged if ACK not required))
*
* Fragments (or holes) cleared on RECEIVING must:
* ((be received) AND (be ACK'd)) OR (... see window_reassemble_data)
*/
fragment *f = &w->frags[w->window_start];
fragment *f = &w->frags[WRAP(w->window_start + i)];
if (f->len > 0 && f->acks >= 1) {
#ifdef DEBUG_BUILD
unsigned old_start_id = w->start_seq_id;
#endif
w->start_seq_id = (w->start_seq_id + 1) % MAX_SEQ_ID;
WDEBUG("moving window forwards; %" L "u-%" L "u (%u) to %" L "u-%" L "u (%u) len=%" L "u",
w->window_start, w->window_end, old_start_id, AFTER(w, 1),
AFTER(w, w->windowsize + 1), w->start_seq_id, w->length);
if (w->direction == WINDOW_SENDING) {
WDEBUG("Clearing old fragments in SENDING window.");
w->numitems --; /* Clear old fragments */
f->len = 0;
}
w->window_start = AFTER(w, 1);
w->window_end = AFTER(w, w->windowsize);
/* count consecutive fragments from start of window that are ACK'd */
slide++;
} else break;
}
if (slide > 0) window_slide(w, slide, w->direction == WINDOW_SENDING);
}
/* Splits data into fragments and adds to the end of the window buffer for sending
@ -512,8 +522,8 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8
f->lastsent.tv_usec = 0;
w->last_write = WRAP(w->last_write + 1);
w->cur_seq_id = WRAPSEQ(w->cur_seq_id + 1);
w->numitems ++;
w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID;
WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u",
f->len, f->seqID, f->start, f->end, offset);
offset += f->len;

View file

@ -21,6 +21,7 @@
* These should match the limitations of the protocol. */
#define MAX_SEQ_ID 256
#define MAX_FRAGSIZE 2048
#define MAX_SEQ_AHEAD (MAX_SEQ_ID / 2)
/* Window function definitions. */
#define WINDOW_SENDING 1
@ -64,7 +65,7 @@ extern int window_debug;
/* Window debugging macro */
#ifdef DEBUG_BUILD
#define WDEBUG(...) if (window_debug) {\
TIMEPRINT("[WINDOW-DEBUG] (%s:%d) ", __FILE__, __LINE__);\
TIMEPRINT("[WDEBUG:%s] (%s:%d) ", w->direction == WINDOW_SENDING ? "S" : "R", __FILE__, __LINE__);\
fprintf(stderr, __VA_ARGS__);\
fprintf(stderr, "\n");\
}
@ -78,23 +79,25 @@ extern int window_debug;
/* Gets index of fragment o fragments after window start */
#define AFTER(w, o) ((w->window_start + o) % w->length)
/* Gets seqID of fragment o fragments after window start seqID */
#define AFTERSEQ(w, o) ((w->start_seq_id + o) % MAX_SEQ_ID)
/* Distance (going forwards) between a and b in window of length l */
#define DISTF(l, a, b) (((a > b) ? a-b : l-a+b-1) % l)
#define DISTF(l, a, b) ((a <= b) ? b-a : l-a+b)
/* Distance backwards between a and b in window of length l */
#define DISTB(l, a, b) (((a < b) ? l-b+a-1 : a-b) % l)
#define DISTB(l, a, b) (l-DISTF(l, a, b))
/* Check if fragment index a is within window_buffer *w */
#define INWINDOW_INDEX(w, a) ((w->window_start < w->window_end) ? \
(a >= w->window_start && a <= w->window_end) : \
((a >= w->window_start && a <= w->length - 1) || \
(a >= 0 && a <= w->window_end)))
(a >= w->window_start && a < w->window_end) : \
((a >= w->window_start && a < w->length) || \
(a >= 0 && a < w->window_end)))
/* Check if sequence ID a is within sequence range start to end */
#define INWINDOW_SEQ(start, end, a) ((start < end) ? \
(a >= start && a <= end) : \
((a >= start && a <= MAX_SEQ_ID - 1) || \
(a <= end)))
(a >= start && a < end) : \
((a >= start && a < MAX_SEQ_ID) || (a < end)))
/* Find the wrapped offset between sequence IDs start and a
* Note: the maximum possible offset is MAX_SEQ_ID - 1 */
@ -103,6 +106,10 @@ extern int window_debug;
/* Wrap index x to a value within the window buffer length */
#define WRAP(x) ((x) % w->length)
/* Wrap index x to a value within the seqID range */
#define WRAPSEQ(x) ((x) % MAX_SEQ_ID)
/* Perform wrapped iteration of statement with pos = (begin to end) wrapped at
* max, executing statement f for every value of pos. */
#define ITER_FORWARD(begin, end, max, pos, f) { \
@ -148,6 +155,8 @@ int window_get_next_ack(struct frag_buffer *w);
/* Sets the fragment with seqid to be ACK'd (SEND) */
void window_ack(struct frag_buffer *w, int seqid);
void window_slide(struct frag_buffer *w, unsigned slide, int delete);
/* To be called after all other processing has been done
* when anything happens (moves window etc) (SEND/RECV) */
void window_tick(struct frag_buffer *w);