diff --git a/src/client.c b/src/client.c index bfe0473..8838a46 100644 --- a/src/client.c +++ b/src/client.c @@ -137,7 +137,7 @@ client_set_hostname_maxlen(size_t i) this.hostname_maxlen = i; this.maxfragsize_up = get_raw_length_from_dns(this.hostname_maxlen - UPSTREAM_HDR, this.dataenc, this.topdomain); if (this.outbuf) - this.outbuf->maxfraglen = this.maxfragsize_up; + window_buffer_resize(this.outbuf, this.outbuf->length, this.maxfragsize_up); } } @@ -536,7 +536,6 @@ send_next_frag() * to get things back in order and keep the packets flowing */ send_ping(1, this.next_downstream_ack, 1, 0); this.next_downstream_ack = -1; - window_tick(this.outbuf); } return; /* nothing to send */ } @@ -572,8 +571,8 @@ send_next_frag() id = send_query(buf); /* Log query ID as being sent now */ query_sent_now(id); - window_tick(this.outbuf); + this.num_frags_sent++; } @@ -1218,9 +1217,6 @@ tunnel_dns() } } } - - /* Move window along after doing all data processing */ - window_tick(this.inbuf); } return read; diff --git a/src/iodine.c b/src/iodine.c index 8bdeafc..a94472e 100644 --- a/src/iodine.c +++ b/src/iodine.c @@ -268,10 +268,11 @@ help() fprintf(stderr, "Fine-tuning options:\n"); fprintf(stderr, " -w downstream fragment window size (default: 8 frags)\n"); fprintf(stderr, " -W upstream fragment window size (default: 8 frags)\n"); +// fprintf(stderr, " -k max retries for sending packets (default: 0, retries disabled)\n"); fprintf(stderr, " -i server-side request timeout in lazy mode (default: auto)\n"); fprintf(stderr, " -j downstream fragment ACK timeout, implies -i4 (default: 2 sec)\n"); fprintf(stderr, " -J downstream fragment ACK delay variance factor (default: 2.0), 0: auto\n"); - //fprintf(stderr, " --nodrop disable TCP packet-dropping optimisations\n"); + fprintf(stderr, " -c 1: use downstream compression (default), 0: disable\n"); fprintf(stderr, " -C 1: use upstream compression (default), 0: disable\n\n"); @@ -591,6 +592,7 @@ main(int argc, char **argv) this.send_interval_ms = atoi(optarg); if (this.send_interval_ms < 0) this.send_interval_ms = 0; + break; case 'w': this.windowsize_down = atoi(optarg); break; diff --git a/src/server.c b/src/server.c index 49775cc..b834bda 100644 --- a/src/server.c +++ b/src/server.c @@ -448,8 +448,6 @@ send_data_or_ping(int userid, struct query *q, int ping, int immediate, char *tc uint8_t pkt[out->maxfraglen + DOWNSTREAM_PING_HDR]; - window_tick(out); - if (!tcperror) { f = window_get_next_sending_fragment(out, &users[userid].next_upstream_ack); } else { @@ -529,7 +527,6 @@ user_process_incoming_data(int userid, int ack) while (can_reassemble == 1) { datalen = sizeof(pkt); can_reassemble = window_reassemble_data(users[userid].incoming, pkt, &datalen, &compressed); - window_tick(users[userid].incoming); /* Update time info */ users[userid].last_pkt = time(NULL); @@ -1287,8 +1284,8 @@ handle_dns_version(int dns_fd, struct query *q, uint8_t *domain, int domain_len) u->down_compression = 1; u->lazy = 0; u->next_upstream_ack = -1; - u->outgoing->maxfraglen = u->encoder->get_raw_length(u->fragsize) - DOWNSTREAM_PING_HDR; - window_buffer_clear(u->outgoing); + window_buffer_resize(u->outgoing, u->outgoing->length, + u->encoder->get_raw_length(u->fragsize) - DOWNSTREAM_PING_HDR); window_buffer_clear(u->incoming); qmem_init(userid); @@ -1669,7 +1666,8 @@ handle_dns_set_options(int dns_fd, struct query *q, int userid, } if (bits) { int f = users[userid].fragsize; - users[userid].outgoing->maxfraglen = (bits * f) / 8 - DOWNSTREAM_PING_HDR; + window_buffer_resize(users[userid].outgoing, users[userid].outgoing->length, + (bits * f) / 8 - DOWNSTREAM_PING_HDR); users[userid].downenc_bits = bits; } @@ -1724,8 +1722,8 @@ handle_dns_set_fragsize(int dns_fd, struct query *q, int userid, write_dns(dns_fd, q, "BADFRAG", 7, users[userid].downenc); } else { users[userid].fragsize = max_frag_size; - users[userid].outgoing->maxfraglen = (users[userid].downenc_bits * max_frag_size) / - 8 - DOWNSTREAM_PING_HDR; + window_buffer_resize(users[userid].outgoing, users[userid].outgoing->length, + (users[userid].downenc_bits * max_frag_size) / 8 - DOWNSTREAM_PING_HDR); write_dns(dns_fd, q, (char *)unpacked, 2, users[userid].downenc); DEBUG(1, "Setting max downstream data length to %u bytes for user %d; %d bits (%c)", diff --git a/src/window.c b/src/window.c index cfc8184..1e5708f 100644 --- a/src/window.c +++ b/src/window.c @@ -39,6 +39,11 @@ struct frag_buffer * window_buffer_init(size_t length, unsigned windowsize, unsigned maxfraglen, int dir) { struct frag_buffer *w; + + /* Note: window buffer DOES NOT WORK with length > MAX_SEQ_ID */ + if (length > MAX_SEQ_ID) + errx(1, "window_buffer_init: length (%" L "u) is greater than MAX_SEQ_ID (%d)!\n"); + w = calloc(1, sizeof(struct frag_buffer)); if (!w) { errx(1, "Failed to allocate window buffer memory!"); @@ -104,12 +109,11 @@ window_buffer_clear(struct frag_buffer *w) w->numitems = 0; w->window_start = 0; - w->window_end = AFTER(w, w->windowsize); w->last_write = 0; w->chunk_start = 0; w->cur_seq_id = 0; w->start_seq_id = 0; - w->max_retries = 5; + w->max_retries = 0; w->resends = 0; w->oos = 0; w->timeout.tv_sec = 5; @@ -143,21 +147,31 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) endid = WRAPSEQ(startid + w->length); offset = SEQ_OFFSET(startid, f->seqID); - if (!INWINDOW_SEQ(startid, endid, f->seqID)) { - WDEBUG("Drop frag ID %u: cannot fit in window buffer (%u-%u)", - f->seqID, startid, endid); - w->oos++; + if (f->len == 0) { + WDEBUG("got incoming frag with len 0! id=%u", f->seqID); return -1; } + /* Place fragment into correct location in buffer, possibly overwriting - * older and not-yet-reassembled fragment */ - ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); + * an older and not-yet-reassembled fragment + * Note: chunk_start != window_start */ + ssize_t dest = WRAP(w->chunk_start + offset); + + if (offset > w->length - w->windowsize) { + w->oos++; + WDEBUG("incoming frag far ahead: offs %u > %u, cs %u[%" L "u], id %u[%" L "u]", + offset, w->length - w->windowsize, w->start_seq_id, w->chunk_start, + f->seqID, dest); + offset -= w->length - w->windowsize; + window_slide(w, offset, 1); + } + WDEBUG(" Putting frag seq %u into frags[%" L "u + %u = %" L "u]", - f->seqID, w->window_start, SEQ_OFFSET(startid, f->seqID), dest); + f->seqID, w->chunk_start, offset, dest); /* Check if fragment already received */ fd = &w->frags[dest]; - if (fd->len == f->len && fd->seqID == f->seqID) { + if (fd->len != 0 && fd->seqID == f->seqID) { /* use retries as counter for dupes */ fd->retries ++; WDEBUG("Received duplicate frag, dropping. (prev %u/new %u, dupes %u)", @@ -206,7 +220,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ int end = 0, drop = 0; /* if packet is dropped */ for (size_t i = 0; found_frags < w->numitems; i++) { woffs = WRAP(w->chunk_start + i); - curseq = WRAPSEQ(w->cur_seq_id + i); + curseq = WRAPSEQ(w->start_seq_id + i); f = &w->frags[woffs]; /* TODO Drop packets if some fragments are missing after reaching max retries @@ -217,7 +231,8 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ * several full packets are sometimes left in buffer unprocessed * so we must not just taking the oldest full packet and ignore newer ones */ if (f->len == 0) { /* Empty fragment */ - WDEBUG("reassemble: hole at frag id %u [%" L "u]", curseq, woffs); + if (holes < 2) + WDEBUG("reassemble: hole at frag id %u [%" L "u]", curseq, woffs); /* reset reassembly things to start over */ consecutive_frags = 0; holes++; @@ -279,18 +294,6 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_ w->frags[p].len = 0; p = (p <= 0) ? w->length - 1 : p - 1; } - if (holes == 0) { - /* slide chunk_start++ only if there are no missing fragments (holes) - * or incomplete packets that we might have skipped */ - w->chunk_start = WRAP(woffs + 1); - w->cur_seq_id = WRAPSEQ(curseq + 1); - } - - unsigned csdist = DISTF(w->length, w->window_start, w->chunk_start); - if (csdist <= w->length / 2) { - /* chunk_start is most likely ahead of window_start, so slide window to catch up */ - window_slide(w, csdist, 1); - } w->numitems -= consecutive_frags; return found_frags >= consecutive_frags; @@ -331,7 +334,7 @@ window_sending(struct frag_buffer *w, struct timeval *nextresend) /* Frag has been sent before so lastsent is a valid timestamp */ timersub(&now, &f->lastsent, &age); - if (!timercmp(&age, &w->timeout, <)) { + if (!timercmp(&age, &w->timeout, <) && f->retries < w->max_retries) { /* ACK timeout: Frag will be resent */ tosend++; } else if (timercmp(&age, &oldest, >)) { @@ -369,13 +372,13 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) timersub(&now, &f->lastsent, &age); - if (f->retries >= 1 && !timercmp(&age, &w->timeout, <)) { + if (f->retries >= 1 && f->retries < w->max_retries && !timercmp(&age, &w->timeout, <)) { /* Resending fragment due to ACK timeout */ WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/max %u/total %u", f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->max_retries, w->resends); w->resends ++; goto found; - } else if (f->retries == 0 && f->len > 0) { + } else if (f->retries == 0) { /* Fragment not sent */ goto found; } @@ -397,36 +400,21 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack) return f; } -/* Gets the seqid of next fragment to be ACK'd (RECV) */ -int -window_get_next_ack(struct frag_buffer *w) -{ - fragment *f; - for (size_t i = 0; i < w->windowsize; i++) { - f = &w->frags[WRAP(w->window_start + i)]; - if (f->len > 0 && f->acks <= 0) { - f->acks = 1; - return f->seqID; - } - } - return -1; -} - /* Sets the fragment with seqid to be ACK'd (SEND) */ void window_ack(struct frag_buffer *w, int seqid) { fragment *f; if (seqid < 0 || seqid > MAX_SEQ_ID) return; - for (size_t i = 0; i < w->windowsize; i++) { - f = &w->frags[AFTER(w, i)]; - if (f->seqID == seqid && f->len > 0) { /* ACK first non-empty frag */ - if (f->acks > 0) - WDEBUG("DUPE ACK: %d ACKs for seqId %u", f->acks, seqid); - f->acks ++; - WDEBUG(" ACK frag seq %u, ACKs %u, len %" L "u, s %u e %u", f->seqID, f->acks, f->len, f->start, f->end); - break; - } + unsigned offset = SEQ_OFFSET(w->start_seq_id, seqid); + + ssize_t dest = WRAP(w->chunk_start + offset); + f = &w->frags[dest]; + if (f->seqID == seqid && f->len > 0) { /* increment ACK counter in frag */ + f->acks ++; + WDEBUG("ACK frag seq %u, ACKs %u, len %" L "u, s %u e %u", f->seqID, f->acks, f->len, f->start, f->end); + } else { + WDEBUG("Tried to ACK nonexistent frag, id %u", seqid); } } @@ -435,7 +423,7 @@ void window_slide(struct frag_buffer *w, unsigned slide, int delete) { WDEBUG("moving window forwards by %u; %" L "u-%" L "u (%u) to %" L "u-%" L "u (%u) len=%" L "u", - slide, w->window_start, w->window_end, w->start_seq_id, AFTER(w, slide), + slide, w->window_start, AFTER(w, w->windowsize), w->start_seq_id, AFTER(w, slide), AFTER(w, w->windowsize + slide), AFTERSEQ(w, slide), w->length); /* Requirements for fragment being cleared (SENDING): @@ -446,6 +434,8 @@ window_slide(struct frag_buffer *w, unsigned slide, int delete) * Fragments (or holes) cleared on RECEIVING must: * ((be received) AND (be ACK'd)) OR (... see window_reassemble_data) */ + /* check if chunk_start has to be moved to prevent window overlapping, + * which results in deleting holes or frags */ if (delete) { /* Clear old frags or holes */ unsigned nfrags = 0; @@ -462,19 +452,18 @@ window_slide(struct frag_buffer *w, unsigned slide, int delete) } } - WDEBUG(" chunk_start: %" L "u -> %" L "u", w->chunk_start, w->window_start); + WDEBUG(" chunk_start: %" L "u -> %" L "u", w->chunk_start, AFTER(w, slide)); w->numitems -= nfrags; w->chunk_start = AFTER(w, slide); + w->start_seq_id = AFTERSEQ(w, slide); } /* Update window status */ w->window_start = AFTER(w, slide); - w->window_end = AFTER(w, w->windowsize); - w->start_seq_id = AFTERSEQ(w, slide); } /* Function to be called after all other processing has been done - * when anything happens (moves window etc) (SEND/RECV) */ + * when anything happens (moves window etc) (SEND only) */ void window_tick(struct frag_buffer *w) { @@ -482,7 +471,7 @@ window_tick(struct frag_buffer *w) for (size_t i = 0; i < w->windowsize; i++) { // TODO are ACKs required for reduced arrival guarantee? fragment *f = &w->frags[WRAP(w->window_start + i)]; - if (f->len > 0 && f->acks >= 1) { + if (f->len > 0 && (f->acks >= 1 || (w->max_retries == 0 && f->lastsent.tv_sec != 0))) { /* count consecutive fragments from start of window that are ACK'd */ slide++; } else break; @@ -509,7 +498,6 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8 for (size_t i = 0; i < n; i++) { /* copy in new data and reset frag stats */ f->len = MIN(len - offset, w->maxfraglen); - memcpy(f->data, data + offset, f->len); f->seqID = w->cur_seq_id; f->compressed = compressed; f->start = (i == 0) ? 1 : 0; @@ -526,6 +514,7 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8 w->numitems ++; WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", f->len, f->seqID, f->start, f->end, offset); + memcpy(f->data, data + offset, f->len); offset += f->len; } return n; diff --git a/src/window.h b/src/window.h index 0482b33..7de8c0c 100644 --- a/src/window.h +++ b/src/window.h @@ -46,14 +46,13 @@ struct frag_buffer { size_t length; /* Length of buffer */ size_t numitems; /* number of non-empty fragments stored in buffer */ size_t window_start; /* Start of window (index) */ - size_t window_end; /* End of window (index) */ size_t last_write; /* Last fragment appended (index) */ - size_t chunk_start; /* Start of current chunk of fragments (index) */ + size_t chunk_start; /* index of oldest fragment slot (lowest seqID) in buffer */ struct timeval timeout; /* Fragment ACK timeout before resend or drop */ unsigned windowsize; /* Max number of fragments in flight */ unsigned maxfraglen; /* Max outgoing fragment data size */ unsigned cur_seq_id; /* Next unused sequence ID */ - unsigned start_seq_id; /* Start of window sequence ID */ + unsigned start_seq_id; /* lowest seqID that exists in buffer (at index chunk_start) */ unsigned max_retries; /* max number of resends before dropping (-1 = never drop) */ unsigned resends; /* number of fragments resent or number of dupes received */ unsigned oos; /* Number of out-of-sequence fragments received */ @@ -149,9 +148,6 @@ size_t window_sending(struct frag_buffer *w, struct timeval *); /* Returns next fragment to be sent or NULL if nothing (SEND) */ fragment *window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack); -/* Gets the seqid of next fragment to be ACK'd (RECV) */ -int window_get_next_ack(struct frag_buffer *w); - /* Sets the fragment with seqid to be ACK'd (SEND) */ void window_ack(struct frag_buffer *w, int seqid);