From d58dd3185eebb1155b95c22bd36412fcde357f03 Mon Sep 17 00:00:00 2001 From: frekky Date: Sat, 1 Jul 2017 21:43:52 +0200 Subject: [PATCH] Fixed major connection stability issues --- src/client.c | 60 +++++++++++++----------- src/server.c | 24 ++++++---- src/window.c | 122 ++++++++++++++++++++++++++++++++----------------- src/window.h | 2 +- tests/window.c | 4 +- 5 files changed, 130 insertions(+), 82 deletions(-) diff --git a/src/client.c b/src/client.c index 492ed6c..bfe0473 100644 --- a/src/client.c +++ b/src/client.c @@ -1027,7 +1027,8 @@ tunnel_tun() if (this.conn == CONN_DNS_NULL) { /* Check if outgoing buffer can hold data */ - if ((0 == this.windowsize_up && 0 != this.outbuf->numitems) || window_buffer_available(this.outbuf) < (read / MAX_FRAGSIZE) + 1) { + if ((this.windowsize_up == 0 && this.outbuf->numitems != 0) || + window_buffer_available(this.outbuf) < (read / MAX_FRAGSIZE) + 1) { DEBUG(1, " Outgoing buffer full (%" L "u/%" L "u), not adding data!", this.outbuf->numitems, this.outbuf->length); return -1; @@ -1049,7 +1050,7 @@ tunnel_dns() size_t datalen, buflen; uint8_t buf[64*1024], cbuf[64*1024], *data, compressed; fragment f; - int read, ping, immediate, error; + int read, ping, immediate, error, pkt = 1; memset(&q, 0, sizeof(q)); memset(buf, 0, sizeof(buf)); @@ -1187,34 +1188,40 @@ tunnel_dns() this.num_frags_recv++; - datalen = window_reassemble_data(this.inbuf, cbuf, sizeof(cbuf), &compressed); - if (datalen > 0) { - if (compressed) { - buflen = sizeof(buf); - if ((ping = uncompress(buf, &buflen, cbuf, datalen)) != Z_OK) { - DEBUG(1, "Uncompress failed (%d) for data len %" L "u: reassembled data corrupted or incomplete!", ping, datalen); - datalen = 0; - } else { - datalen = buflen; - } - data = buf; - } else { - data = cbuf; - } - - if (datalen) { - if (this.use_remote_forward) { - if (write(STDOUT_FILENO, data, datalen) != datalen) { - warn("write_stdout != datalen"); + /* Continue reassembling packets until not possible to do so. + * This prevents a buildup of fully available packets (with one or more fragments each) + * in the incoming window buffer. */ + while (pkt == 1) { + datalen = sizeof(cbuf); + pkt = window_reassemble_data(this.inbuf, cbuf, &datalen, &compressed); + if (datalen > 0) { + if (compressed) { + buflen = sizeof(buf); + if ((ping = uncompress(buf, &buflen, cbuf, datalen)) != Z_OK) { + DEBUG(1, "Uncompress failed (%d) for data len %" L "u: reassembled data corrupted or incomplete!", ping, datalen); + datalen = 0; + } else { + datalen = buflen; } + data = buf; } else { - write_tun(this.tun_fd, data, datalen); + data = cbuf; + } + + if (datalen) { + if (this.use_remote_forward) { + if (write(STDOUT_FILENO, data, datalen) != datalen) { + warn("write_stdout != datalen"); + } + } else { + write_tun(this.tun_fd, data, datalen); + } } } - } - /* Move window along after doing all data processing */ - window_tick(this.inbuf); + /* Move window along after doing all data processing */ + window_tick(this.inbuf); + } return read; } @@ -1254,8 +1261,7 @@ client_tunnel() use_min_send = 0; - if (this.debug >= 5) - window_debug = this.debug - 3; + window_debug = this.debug; while (this.running) { if (!use_min_send) diff --git a/src/server.c b/src/server.c index 0267b4a..49775cc 100644 --- a/src/server.c +++ b/src/server.c @@ -521,19 +521,23 @@ user_process_incoming_data(int userid, int ack) uint8_t pkt[65536]; size_t datalen; uint8_t compressed = 0; + int can_reassemble = 1; window_ack(users[userid].outgoing, ack); window_tick(users[userid].outgoing); - datalen = window_reassemble_data(users[userid].incoming, pkt, sizeof(pkt), &compressed); - window_tick(users[userid].incoming); + while (can_reassemble == 1) { + datalen = sizeof(pkt); + can_reassemble = window_reassemble_data(users[userid].incoming, pkt, &datalen, &compressed); + window_tick(users[userid].incoming); - /* Update time info */ - users[userid].last_pkt = time(NULL); + /* Update time info */ + users[userid].last_pkt = time(NULL); - if (datalen > 0) { - /* Data reassembled successfully + cleared out of buffer */ - handle_full_packet(userid, pkt, datalen, compressed); + if (datalen > 0) { + /* Data reassembled successfully + cleared out of buffer */ + handle_full_packet(userid, pkt, datalen, compressed); + } } } @@ -778,8 +782,7 @@ server_tunnel() struct query *answer_now = NULL; time_t last_action = time(NULL); - if (server.debug >= 5) - window_debug = server.debug - 4; + window_debug = server.debug; while (server.running) { int maxfd; @@ -915,7 +918,8 @@ handle_full_packet(int userid, uint8_t *data, size_t len, int compressed) } } else { - DEBUG(2, "Discarded upstream data from user %d, uncompress() result: %d", userid, ret); + DEBUG(2, "Discarded pkt from user %d, uncompress()==%d, len=%" L "u, rawlen=%" L "u", + userid, ret, len, rawlen); } } diff --git a/src/window.c b/src/window.c index 3500729..9408e21 100644 --- a/src/window.c +++ b/src/window.c @@ -138,9 +138,16 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) if (!INWINDOW_SEQ(startid, endid, f->seqID)) { w->oos++; - /* Only drop the fragment if it is ancient or from the future */ - WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid); - return -1; + if (offset > MIN(w->length - w->numitems, MAX_SEQ_ID / 2)) { + /* Only drop the fragment if it is ancient */ + WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid); + return -1; + } else { + /* Save "new" fragments to avoid causing other end to advance + * when this fragment is ACK'd despite being dropped */ + WDEBUG("WARNING: Got future fragment (%u), offset %u from start %u (wsize %u).", + f->seqID, offset, startid, w->windowsize); + } } /* Place fragment into correct location in buffer */ ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID)); @@ -150,13 +157,13 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) /* Check if fragment already received */ fd = &w->frags[dest]; if (fd->len == f->len && fd->seqID == f->seqID) { - WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID); - if (f->seqID == fd->seqID) { - /* use retries as counter for dupes */ - fd->retries ++; - return -1; - } + /* use retries as counter for dupes */ + fd->retries ++; + WDEBUG("Received duplicate frag, dropping. (prev %u/new %u, dupes %u)", + fd->seqID, f->seqID, fd->retries); + return -1; } + fd->seqID = f->seqID; fd->len = MIN(f->len, w->maxfraglen); fd->compressed = f->compressed; fd->start = f->start; @@ -175,12 +182,19 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f) } /* Reassembles first complete sequence of fragments into data. (RECV) - * Returns length of data reassembled, or 0 if no data reassembled */ -size_t -window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint8_t *compression) + * len should be passed with max space in *data, replaced with amount filled + * Returns 1 if should be called again for another packet, 0 otherwise */ +int +window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_t *compression) { - size_t woffs, fraglen, start, datalen = 0, found_frags = 0; + if (!len) { + errx(1, "window_reassemble_data: len pointer is NULL!"); + } + + size_t woffs, fraglen, start; + size_t maxlen = *len; uint8_t *dest; //, *fdata_start; + *len = 0; dest = data; if (w->direction != WINDOW_RECVING) return 0; @@ -195,9 +209,8 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint fragment *f; size_t i; - unsigned curseq, consecutive_frags = 0; - int end = 0, drop = 0; /* if packet is dropped, clean out old frags */ - curseq = w->frags[w->chunk_start].seqID; + unsigned curseq, consecutive_frags = 0, holes = 0, found_frags = 0; + int end = 0, drop = 0; /* if packet is dropped */ curseq = w->start_seq_id; for (i = 0; i < w->numitems; ++i) { @@ -205,9 +218,12 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint f = &w->frags[woffs]; fraglen = f->len; - /* Drop packets if some fragments are missing after reaching max retries + /* TODO Drop packets if some fragments are missing after reaching max retries * or packet timeout - * Note: this lowers the guaranteed arrival constraint */ + * Note: this lowers the guaranteed arrival constraint + * Note: Continue reassembling full packets until none left in buffer; + * several full packets are sometimes left in buffer unprocessed + * so we must not just taking the oldest full packet and ignore newer ones */ /* Process: * if buffer contains >0 frags * for frag in buffer from start { @@ -225,19 +241,17 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint /* reset reassembly things to start over */ consecutive_frags = 0; + holes++; } else if (f->start || consecutive_frags >= 1) { found_frags++; consecutive_frags++; - WDEBUG("reassemble: frag seq %u, data length %" L "u, data offset %" \ - L "u, total len %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u", - f->seqID, fraglen, dest - data, datalen, maxlen); if (drop == 0) { /* Copy next fragment to buffer if not going to drop */ memcpy(dest, f->data, MIN(fraglen, maxlen)); } dest += fraglen; - datalen += fraglen; + *len += fraglen; if (compression) { *compression &= f->compressed & 1; if (f->compressed != *compression) { @@ -245,15 +259,19 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint } } if (fraglen > maxlen) { - WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", datalen); + WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len); drop = 1; } + WDEBUG("reassemble: id %u, len %" L "u, offs %" \ + L "u, total %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u", + f->seqID, fraglen, dest - data, *len, maxlen, found_frags, consecutive_frags); /* Move window along to avoid weird issues */ window_tick(w); if (f->end == 1) { - WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)", f->seqID, i, datalen); + WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)", + f->seqID, consecutive_frags, *len); end = 1; break; } @@ -275,15 +293,21 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint return 0; } - WDEBUG("Reassembled %" L "u bytes from %" L "u frags; %scompressed!", datalen, i + 1, *compression ? "" : "un"); - /* Clear all used fragments */ - size_t p; - ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p, - memset(&w->frags[p], 0, sizeof(fragment)); - ); - w->chunk_start = WRAP(woffs + 1); - w->numitems -= i + 1; - return drop == 0 ? datalen : 0; + WDEBUG("Reassembled %" L "ub from %" L "u frags; comp=%u; holes=%u", + *len, consecutive_frags, *compression, holes); + /* Clear all used fragments, going backwards from last processed */ + size_t p = woffs; + for (int n = 0; n < consecutive_frags; n++) { + w->frags[p].len = 0; + p = (p <= 0) ? w->length - 1 : p - 1; + } + if (holes == 0) { + /* move start of window forwards only if there are no pending fragments (holes) + * or incomplete packets that we might have skipped */ + w->chunk_start = WRAP(woffs + 1); + } + w->numitems -= consecutive_frags; + return found_frags >= consecutive_frags; } size_t @@ -427,7 +451,16 @@ window_tick(struct frag_buffer *w) { for (size_t i = 0; i < w->windowsize; i++) { // TODO are ACKs required for reduced arrival guarantee? - if (w->frags[w->window_start].acks >= 1) { + /* Requirements for fragment being cleared (SENDING): + * (must have been sent) AND + * (((must be received) AND (must be acknowledged)) OR + * (not acknowledged if ACK not required)) + * + * Fragments (or holes) cleared on RECEIVING must: + * ((be received) AND (be ACK'd)) OR (... see window_reassemble_data) + */ + fragment *f = &w->frags[w->window_start]; + if (f->len > 0 && f->acks >= 1) { #ifdef DEBUG_BUILD unsigned old_start_id = w->start_seq_id; #endif @@ -438,7 +471,7 @@ window_tick(struct frag_buffer *w) if (w->direction == WINDOW_SENDING) { WDEBUG("Clearing old fragments in SENDING window."); w->numitems --; /* Clear old fragments */ - w->frags[w->window_start].len = 0; + f->len = 0; } w->window_start = AFTER(w, 1); @@ -461,25 +494,28 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8 compressed &= 1; size_t offset = 0; fragment *f = &w->frags[w->last_write]; - WDEBUG("add data len %" L "u, %" L "u frags, max fragsize %u", len, n, w->maxfraglen); + WDEBUG("add_outgoing_data: chunk len %" L "u -> %" L "u frags, max fragsize %u", + len, n, w->maxfraglen); for (size_t i = 0; i < n; i++) { + /* copy in new data and reset frag stats */ f->len = MIN(len - offset, w->maxfraglen); memcpy(f->data, data + offset, f->len); f->seqID = w->cur_seq_id; + f->compressed = compressed; f->start = (i == 0) ? 1 : 0; f->end = (i == n - 1) ? 1 : 0; - f->compressed = compressed; - f->ack_other = -1; - /* append fragment */ - if (window_buffer_available(w) < 1) return 0; - memcpy(&w->frags[w->last_write], f, sizeof(fragment)); + f->retries = 0; + f->acks = 0; + f->ack_other = -1; + f->lastsent.tv_sec = 0; + f->lastsent.tv_usec = 0; w->last_write = WRAP(w->last_write + 1); w->numitems ++; - w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID; - WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", f->len, f->seqID, f->start, f->end, offset); + WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", + f->len, f->seqID, f->start, f->end, offset); offset += f->len; } return n; diff --git a/src/window.h b/src/window.h index 997c651..38e4f27 100644 --- a/src/window.h +++ b/src/window.h @@ -134,7 +134,7 @@ ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f); /* Reassembles first complete sequence of fragments into data. (RECV) * Returns length of data reassembled, or 0 if no data reassembled */ -size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint8_t *compression); +int window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *maxlen, uint8_t *compression); /* Returns number of fragments to be sent */ size_t window_sending(struct frag_buffer *w, struct timeval *); diff --git a/tests/window.c b/tests/window.c index 44be034..90ec7a3 100644 --- a/tests/window.c +++ b/tests/window.c @@ -70,7 +70,9 @@ START_TEST(test_window_everything) uint8_t c; for (i = 0; i < 50; i++) { memset(data, 0, 100); - size_t len = window_reassemble_data(in, data, 100, &c); + size_t len = 100; + // TODO test reassemble multiple packets + window_reassemble_data(in, data, &len, &c); fail_if(c != 0, "Compression flag weird"); // printf("Reassembled %lu bytes, num frags %lu: '", len, in->numitems); // for (unsigned i = 0; i < len; i++) {