1
0
Fork 0
mirror of https://github.com/yarrick/iodine.git synced 2025-04-04 13:53:34 +03:00

Fixed major connection stability issues

This commit is contained in:
frekky 2017-07-01 21:43:52 +02:00
parent e99fb20bbb
commit d58dd3185e
5 changed files with 130 additions and 82 deletions

View file

@ -1027,7 +1027,8 @@ tunnel_tun()
if (this.conn == CONN_DNS_NULL) {
/* Check if outgoing buffer can hold data */
if ((0 == this.windowsize_up && 0 != this.outbuf->numitems) || window_buffer_available(this.outbuf) < (read / MAX_FRAGSIZE) + 1) {
if ((this.windowsize_up == 0 && this.outbuf->numitems != 0) ||
window_buffer_available(this.outbuf) < (read / MAX_FRAGSIZE) + 1) {
DEBUG(1, " Outgoing buffer full (%" L "u/%" L "u), not adding data!",
this.outbuf->numitems, this.outbuf->length);
return -1;
@ -1049,7 +1050,7 @@ tunnel_dns()
size_t datalen, buflen;
uint8_t buf[64*1024], cbuf[64*1024], *data, compressed;
fragment f;
int read, ping, immediate, error;
int read, ping, immediate, error, pkt = 1;
memset(&q, 0, sizeof(q));
memset(buf, 0, sizeof(buf));
@ -1187,34 +1188,40 @@ tunnel_dns()
this.num_frags_recv++;
datalen = window_reassemble_data(this.inbuf, cbuf, sizeof(cbuf), &compressed);
if (datalen > 0) {
if (compressed) {
buflen = sizeof(buf);
if ((ping = uncompress(buf, &buflen, cbuf, datalen)) != Z_OK) {
DEBUG(1, "Uncompress failed (%d) for data len %" L "u: reassembled data corrupted or incomplete!", ping, datalen);
datalen = 0;
} else {
datalen = buflen;
}
data = buf;
} else {
data = cbuf;
}
if (datalen) {
if (this.use_remote_forward) {
if (write(STDOUT_FILENO, data, datalen) != datalen) {
warn("write_stdout != datalen");
/* Continue reassembling packets until not possible to do so.
* This prevents a buildup of fully available packets (with one or more fragments each)
* in the incoming window buffer. */
while (pkt == 1) {
datalen = sizeof(cbuf);
pkt = window_reassemble_data(this.inbuf, cbuf, &datalen, &compressed);
if (datalen > 0) {
if (compressed) {
buflen = sizeof(buf);
if ((ping = uncompress(buf, &buflen, cbuf, datalen)) != Z_OK) {
DEBUG(1, "Uncompress failed (%d) for data len %" L "u: reassembled data corrupted or incomplete!", ping, datalen);
datalen = 0;
} else {
datalen = buflen;
}
data = buf;
} else {
write_tun(this.tun_fd, data, datalen);
data = cbuf;
}
if (datalen) {
if (this.use_remote_forward) {
if (write(STDOUT_FILENO, data, datalen) != datalen) {
warn("write_stdout != datalen");
}
} else {
write_tun(this.tun_fd, data, datalen);
}
}
}
}
/* Move window along after doing all data processing */
window_tick(this.inbuf);
/* Move window along after doing all data processing */
window_tick(this.inbuf);
}
return read;
}
@ -1254,8 +1261,7 @@ client_tunnel()
use_min_send = 0;
if (this.debug >= 5)
window_debug = this.debug - 3;
window_debug = this.debug;
while (this.running) {
if (!use_min_send)

View file

@ -521,19 +521,23 @@ user_process_incoming_data(int userid, int ack)
uint8_t pkt[65536];
size_t datalen;
uint8_t compressed = 0;
int can_reassemble = 1;
window_ack(users[userid].outgoing, ack);
window_tick(users[userid].outgoing);
datalen = window_reassemble_data(users[userid].incoming, pkt, sizeof(pkt), &compressed);
window_tick(users[userid].incoming);
while (can_reassemble == 1) {
datalen = sizeof(pkt);
can_reassemble = window_reassemble_data(users[userid].incoming, pkt, &datalen, &compressed);
window_tick(users[userid].incoming);
/* Update time info */
users[userid].last_pkt = time(NULL);
/* Update time info */
users[userid].last_pkt = time(NULL);
if (datalen > 0) {
/* Data reassembled successfully + cleared out of buffer */
handle_full_packet(userid, pkt, datalen, compressed);
if (datalen > 0) {
/* Data reassembled successfully + cleared out of buffer */
handle_full_packet(userid, pkt, datalen, compressed);
}
}
}
@ -778,8 +782,7 @@ server_tunnel()
struct query *answer_now = NULL;
time_t last_action = time(NULL);
if (server.debug >= 5)
window_debug = server.debug - 4;
window_debug = server.debug;
while (server.running) {
int maxfd;
@ -915,7 +918,8 @@ handle_full_packet(int userid, uint8_t *data, size_t len, int compressed)
}
} else {
DEBUG(2, "Discarded upstream data from user %d, uncompress() result: %d", userid, ret);
DEBUG(2, "Discarded pkt from user %d, uncompress()==%d, len=%" L "u, rawlen=%" L "u",
userid, ret, len, rawlen);
}
}

View file

@ -138,9 +138,16 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
w->oos++;
/* Only drop the fragment if it is ancient or from the future */
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid);
return -1;
if (offset > MIN(w->length - w->numitems, MAX_SEQ_ID / 2)) {
/* Only drop the fragment if it is ancient */
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid);
return -1;
} else {
/* Save "new" fragments to avoid causing other end to advance
* when this fragment is ACK'd despite being dropped */
WDEBUG("WARNING: Got future fragment (%u), offset %u from start %u (wsize %u).",
f->seqID, offset, startid, w->windowsize);
}
}
/* Place fragment into correct location in buffer */
ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
@ -150,13 +157,13 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
/* Check if fragment already received */
fd = &w->frags[dest];
if (fd->len == f->len && fd->seqID == f->seqID) {
WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
if (f->seqID == fd->seqID) {
/* use retries as counter for dupes */
fd->retries ++;
return -1;
}
/* use retries as counter for dupes */
fd->retries ++;
WDEBUG("Received duplicate frag, dropping. (prev %u/new %u, dupes %u)",
fd->seqID, f->seqID, fd->retries);
return -1;
}
fd->seqID = f->seqID;
fd->len = MIN(f->len, w->maxfraglen);
fd->compressed = f->compressed;
fd->start = f->start;
@ -175,12 +182,19 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
}
/* Reassembles first complete sequence of fragments into data. (RECV)
* Returns length of data reassembled, or 0 if no data reassembled */
size_t
window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint8_t *compression)
* len should be passed with max space in *data, replaced with amount filled
* Returns 1 if should be called again for another packet, 0 otherwise */
int
window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *len, uint8_t *compression)
{
size_t woffs, fraglen, start, datalen = 0, found_frags = 0;
if (!len) {
errx(1, "window_reassemble_data: len pointer is NULL!");
}
size_t woffs, fraglen, start;
size_t maxlen = *len;
uint8_t *dest; //, *fdata_start;
*len = 0;
dest = data;
if (w->direction != WINDOW_RECVING)
return 0;
@ -195,9 +209,8 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint
fragment *f;
size_t i;
unsigned curseq, consecutive_frags = 0;
int end = 0, drop = 0; /* if packet is dropped, clean out old frags */
curseq = w->frags[w->chunk_start].seqID;
unsigned curseq, consecutive_frags = 0, holes = 0, found_frags = 0;
int end = 0, drop = 0; /* if packet is dropped */
curseq = w->start_seq_id;
for (i = 0; i < w->numitems; ++i) {
@ -205,9 +218,12 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint
f = &w->frags[woffs];
fraglen = f->len;
/* Drop packets if some fragments are missing after reaching max retries
/* TODO Drop packets if some fragments are missing after reaching max retries
* or packet timeout
* Note: this lowers the guaranteed arrival constraint */
* Note: this lowers the guaranteed arrival constraint
* Note: Continue reassembling full packets until none left in buffer;
* several full packets are sometimes left in buffer unprocessed
* so we must not just taking the oldest full packet and ignore newer ones */
/* Process:
* if buffer contains >0 frags
* for frag in buffer from start {
@ -225,19 +241,17 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint
/* reset reassembly things to start over */
consecutive_frags = 0;
holes++;
} else if (f->start || consecutive_frags >= 1) {
found_frags++;
consecutive_frags++;
WDEBUG("reassemble: frag seq %u, data length %" L "u, data offset %" \
L "u, total len %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u",
f->seqID, fraglen, dest - data, datalen, maxlen);
if (drop == 0) {
/* Copy next fragment to buffer if not going to drop */
memcpy(dest, f->data, MIN(fraglen, maxlen));
}
dest += fraglen;
datalen += fraglen;
*len += fraglen;
if (compression) {
*compression &= f->compressed & 1;
if (f->compressed != *compression) {
@ -245,15 +259,19 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint
}
}
if (fraglen > maxlen) {
WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", datalen);
WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", *len);
drop = 1;
}
WDEBUG("reassemble: id %u, len %" L "u, offs %" \
L "u, total %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u",
f->seqID, fraglen, dest - data, *len, maxlen, found_frags, consecutive_frags);
/* Move window along to avoid weird issues */
window_tick(w);
if (f->end == 1) {
WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)", f->seqID, i, datalen);
WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)",
f->seqID, consecutive_frags, *len);
end = 1;
break;
}
@ -275,15 +293,21 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint
return 0;
}
WDEBUG("Reassembled %" L "u bytes from %" L "u frags; %scompressed!", datalen, i + 1, *compression ? "" : "un");
/* Clear all used fragments */
size_t p;
ITER_FORWARD(w->chunk_start, WRAP(w->chunk_start + i + 1), w->length, p,
memset(&w->frags[p], 0, sizeof(fragment));
);
w->chunk_start = WRAP(woffs + 1);
w->numitems -= i + 1;
return drop == 0 ? datalen : 0;
WDEBUG("Reassembled %" L "ub from %" L "u frags; comp=%u; holes=%u",
*len, consecutive_frags, *compression, holes);
/* Clear all used fragments, going backwards from last processed */
size_t p = woffs;
for (int n = 0; n < consecutive_frags; n++) {
w->frags[p].len = 0;
p = (p <= 0) ? w->length - 1 : p - 1;
}
if (holes == 0) {
/* move start of window forwards only if there are no pending fragments (holes)
* or incomplete packets that we might have skipped */
w->chunk_start = WRAP(woffs + 1);
}
w->numitems -= consecutive_frags;
return found_frags >= consecutive_frags;
}
size_t
@ -427,7 +451,16 @@ window_tick(struct frag_buffer *w)
{
for (size_t i = 0; i < w->windowsize; i++) {
// TODO are ACKs required for reduced arrival guarantee?
if (w->frags[w->window_start].acks >= 1) {
/* Requirements for fragment being cleared (SENDING):
* (must have been sent) AND
* (((must be received) AND (must be acknowledged)) OR
* (not acknowledged if ACK not required))
*
* Fragments (or holes) cleared on RECEIVING must:
* ((be received) AND (be ACK'd)) OR (... see window_reassemble_data)
*/
fragment *f = &w->frags[w->window_start];
if (f->len > 0 && f->acks >= 1) {
#ifdef DEBUG_BUILD
unsigned old_start_id = w->start_seq_id;
#endif
@ -438,7 +471,7 @@ window_tick(struct frag_buffer *w)
if (w->direction == WINDOW_SENDING) {
WDEBUG("Clearing old fragments in SENDING window.");
w->numitems --; /* Clear old fragments */
w->frags[w->window_start].len = 0;
f->len = 0;
}
w->window_start = AFTER(w, 1);
@ -461,25 +494,28 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8
compressed &= 1;
size_t offset = 0;
fragment *f = &w->frags[w->last_write];
WDEBUG("add data len %" L "u, %" L "u frags, max fragsize %u", len, n, w->maxfraglen);
WDEBUG("add_outgoing_data: chunk len %" L "u -> %" L "u frags, max fragsize %u",
len, n, w->maxfraglen);
for (size_t i = 0; i < n; i++) {
/* copy in new data and reset frag stats */
f->len = MIN(len - offset, w->maxfraglen);
memcpy(f->data, data + offset, f->len);
f->seqID = w->cur_seq_id;
f->compressed = compressed;
f->start = (i == 0) ? 1 : 0;
f->end = (i == n - 1) ? 1 : 0;
f->compressed = compressed;
f->ack_other = -1;
/* append fragment */
if (window_buffer_available(w) < 1) return 0;
memcpy(&w->frags[w->last_write], f, sizeof(fragment));
f->retries = 0;
f->acks = 0;
f->ack_other = -1;
f->lastsent.tv_sec = 0;
f->lastsent.tv_usec = 0;
w->last_write = WRAP(w->last_write + 1);
w->numitems ++;
w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID;
WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", f->len, f->seqID, f->start, f->end, offset);
WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u",
f->len, f->seqID, f->start, f->end, offset);
offset += f->len;
}
return n;

View file

@ -134,7 +134,7 @@ ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f);
/* Reassembles first complete sequence of fragments into data. (RECV)
* Returns length of data reassembled, or 0 if no data reassembled */
size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint8_t *compression);
int window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t *maxlen, uint8_t *compression);
/* Returns number of fragments to be sent */
size_t window_sending(struct frag_buffer *w, struct timeval *);

View file

@ -70,7 +70,9 @@ START_TEST(test_window_everything)
uint8_t c;
for (i = 0; i < 50; i++) {
memset(data, 0, 100);
size_t len = window_reassemble_data(in, data, 100, &c);
size_t len = 100;
// TODO test reassemble multiple packets
window_reassemble_data(in, data, &len, &c);
fail_if(c != 0, "Compression flag weird");
// printf("Reassembled %lu bytes, num frags %lu: '", len, in->numitems);
// for (unsigned i = 0; i < len; i++) {