mirror of
https://github.com/yarrick/iodine.git
synced 2025-04-10 04:21:01 +00:00
Reorganised frag data buffers for dynamic allocation
This commit is contained in:
parent
65d9d2ba23
commit
e99fb20bbb
5 changed files with 179 additions and 144 deletions
10
src/client.c
10
src/client.c
|
@ -922,7 +922,6 @@ int
|
||||||
parse_data(uint8_t *data, size_t len, fragment *f, int *immediate, int *ping)
|
parse_data(uint8_t *data, size_t len, fragment *f, int *immediate, int *ping)
|
||||||
{
|
{
|
||||||
size_t headerlen = DOWNSTREAM_HDR;
|
size_t headerlen = DOWNSTREAM_HDR;
|
||||||
memset(f, 0, sizeof(fragment));
|
|
||||||
int error;
|
int error;
|
||||||
|
|
||||||
f->seqID = data[0];
|
f->seqID = data[0];
|
||||||
|
@ -954,8 +953,8 @@ parse_data(uint8_t *data, size_t len, fragment *f, int *immediate, int *ping)
|
||||||
}
|
}
|
||||||
f->len = len - headerlen;
|
f->len = len - headerlen;
|
||||||
if (f->len > 0)
|
if (f->len > 0)
|
||||||
memcpy(f->data, data + headerlen, MIN(f->len, sizeof(f->data)));
|
memcpy(f->data, data + headerlen, MIN(f->len, this.inbuf->maxfraglen));
|
||||||
return error; /* return ping flag (if corresponding query was a ping) */
|
return error;
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
|
@ -1048,9 +1047,9 @@ tunnel_dns()
|
||||||
{
|
{
|
||||||
struct query q;
|
struct query q;
|
||||||
size_t datalen, buflen;
|
size_t datalen, buflen;
|
||||||
uint8_t buf[64*1024], cbuf[64*1024], *data;
|
uint8_t buf[64*1024], cbuf[64*1024], *data, compressed;
|
||||||
fragment f;
|
fragment f;
|
||||||
int read, compressed, ping, immediate, error;
|
int read, ping, immediate, error;
|
||||||
|
|
||||||
memset(&q, 0, sizeof(q));
|
memset(&q, 0, sizeof(q));
|
||||||
memset(buf, 0, sizeof(buf));
|
memset(buf, 0, sizeof(buf));
|
||||||
|
@ -1137,6 +1136,7 @@ tunnel_dns()
|
||||||
this.num_recv++;
|
this.num_recv++;
|
||||||
|
|
||||||
/* Decode the downstream data header and fragment-ify ready for processing */
|
/* Decode the downstream data header and fragment-ify ready for processing */
|
||||||
|
f.data = buf;
|
||||||
error = parse_data(cbuf, read, &f, &immediate, &ping);
|
error = parse_data(cbuf, read, &f, &immediate, &ping);
|
||||||
|
|
||||||
/* Mark query as received */
|
/* Mark query as received */
|
||||||
|
|
19
src/server.c
19
src/server.c
|
@ -439,7 +439,6 @@ send_data_or_ping(int userid, struct query *q, int ping, int immediate, char *tc
|
||||||
immediate: 1=not from qmem (ie. fresh query), 0=query is from qmem
|
immediate: 1=not from qmem (ie. fresh query), 0=query is from qmem
|
||||||
tcperror: whether to tell user that TCP socket is closed (NULL if OK or pointer to error message) */
|
tcperror: whether to tell user that TCP socket is closed (NULL if OK or pointer to error message) */
|
||||||
{
|
{
|
||||||
uint8_t pkt[MAX_FRAGSIZE + DOWNSTREAM_PING_HDR];
|
|
||||||
size_t datalen, headerlen;
|
size_t datalen, headerlen;
|
||||||
fragment *f = NULL;
|
fragment *f = NULL;
|
||||||
struct frag_buffer *out, *in;
|
struct frag_buffer *out, *in;
|
||||||
|
@ -447,6 +446,8 @@ send_data_or_ping(int userid, struct query *q, int ping, int immediate, char *tc
|
||||||
in = users[userid].incoming;
|
in = users[userid].incoming;
|
||||||
out = users[userid].outgoing;
|
out = users[userid].outgoing;
|
||||||
|
|
||||||
|
uint8_t pkt[out->maxfraglen + DOWNSTREAM_PING_HDR];
|
||||||
|
|
||||||
window_tick(out);
|
window_tick(out);
|
||||||
|
|
||||||
if (!tcperror) {
|
if (!tcperror) {
|
||||||
|
@ -499,8 +500,6 @@ send_data_or_ping(int userid, struct query *q, int ping, int immediate, char *tc
|
||||||
headerlen = DOWNSTREAM_PING_HDR;
|
headerlen = DOWNSTREAM_PING_HDR;
|
||||||
}
|
}
|
||||||
if (datalen + headerlen > sizeof(pkt)) {
|
if (datalen + headerlen > sizeof(pkt)) {
|
||||||
/* Should never happen, or at least user should be warned about
|
|
||||||
* fragsize > MAX_FRAGLEN earlier on */
|
|
||||||
warnx("send_data_or_ping: fragment too large to send! (%" L "u)", datalen);
|
warnx("send_data_or_ping: fragment too large to send! (%" L "u)", datalen);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -521,7 +520,7 @@ user_process_incoming_data(int userid, int ack)
|
||||||
{
|
{
|
||||||
uint8_t pkt[65536];
|
uint8_t pkt[65536];
|
||||||
size_t datalen;
|
size_t datalen;
|
||||||
int compressed = 0;
|
uint8_t compressed = 0;
|
||||||
|
|
||||||
window_ack(users[userid].outgoing, ack);
|
window_ack(users[userid].outgoing, ack);
|
||||||
window_tick(users[userid].outgoing);
|
window_tick(users[userid].outgoing);
|
||||||
|
@ -1813,7 +1812,7 @@ void
|
||||||
handle_dns_data(int dns_fd, struct query *q, uint8_t *domain, int domain_len, int userid)
|
handle_dns_data(int dns_fd, struct query *q, uint8_t *domain, int domain_len, int userid)
|
||||||
{
|
{
|
||||||
uint8_t unpacked[20];
|
uint8_t unpacked[20];
|
||||||
static fragment f;
|
fragment f;
|
||||||
size_t len;
|
size_t len;
|
||||||
|
|
||||||
/* Need 6 char header + >=1 char data */
|
/* Need 6 char header + >=1 char data */
|
||||||
|
@ -1838,9 +1837,13 @@ handle_dns_data(int dns_fd, struct query *q, uint8_t *domain, int domain_len, in
|
||||||
f.start = (unpacked[2] >> 1) & 1;
|
f.start = (unpacked[2] >> 1) & 1;
|
||||||
f.end = unpacked[2] & 1;
|
f.end = unpacked[2] & 1;
|
||||||
|
|
||||||
|
uint8_t data[users[userid].incoming->maxfraglen];
|
||||||
|
f.data = data;
|
||||||
|
|
||||||
/* Decode remainder of data with user encoding into fragment */
|
/* Decode remainder of data with user encoding into fragment */
|
||||||
f.len = unpack_data(f.data, MAX_FRAGSIZE, (uint8_t *)domain + UPSTREAM_HDR,
|
f.len = unpack_data(f.data, users[userid].incoming->maxfraglen,
|
||||||
domain_len - UPSTREAM_HDR, users[userid].encoder);
|
(uint8_t *)domain + UPSTREAM_HDR,
|
||||||
|
domain_len - UPSTREAM_HDR, users[userid].encoder);
|
||||||
|
|
||||||
DEBUG(3, "frag seq %3u, datalen %5lu, ACK %3d, compression %1d, s%1d e%1d",
|
DEBUG(3, "frag seq %3u, datalen %5lu, ACK %3d, compression %1d, s%1d e%1d",
|
||||||
f.seqID, f.len, f.ack_other, f.compressed, f.start, f.end);
|
f.seqID, f.len, f.ack_other, f.compressed, f.start, f.end);
|
||||||
|
@ -1848,7 +1851,7 @@ handle_dns_data(int dns_fd, struct query *q, uint8_t *domain, int domain_len, in
|
||||||
/* if already waiting for an ACK to be sent back upstream (on incoming buffer) */
|
/* if already waiting for an ACK to be sent back upstream (on incoming buffer) */
|
||||||
if (users[userid].next_upstream_ack >= 0) {
|
if (users[userid].next_upstream_ack >= 0) {
|
||||||
/* Shouldn't normally happen; will always be reset after sending a packet. */
|
/* Shouldn't normally happen; will always be reset after sending a packet. */
|
||||||
DEBUG(1, "[WARNING] next_upstream_ack == %d for user %d.", users[userid].next_upstream_ack, userid);
|
DEBUG(1, "[WARNING] next_upstream_ack == %d for user %d.",users[userid].next_upstream_ack, userid);
|
||||||
}
|
}
|
||||||
|
|
||||||
window_process_incoming_fragment(users[userid].incoming, &f);
|
window_process_incoming_fragment(users[userid].incoming, &f);
|
||||||
|
|
247
src/window.c
247
src/window.c
|
@ -36,63 +36,49 @@
|
||||||
int window_debug = 0;
|
int window_debug = 0;
|
||||||
|
|
||||||
struct frag_buffer *
|
struct frag_buffer *
|
||||||
window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir)
|
window_buffer_init(size_t length, unsigned windowsize, unsigned maxfraglen, int dir)
|
||||||
{
|
{
|
||||||
struct frag_buffer *buf;
|
struct frag_buffer *buf;
|
||||||
buf = calloc(sizeof(struct frag_buffer), 1);
|
buf = calloc(1, sizeof(struct frag_buffer));
|
||||||
if (!buf) {
|
if (!buf) {
|
||||||
errx(1, "Failed to allocate window buffer memory!");
|
errx(1, "Failed to allocate window buffer memory!");
|
||||||
}
|
}
|
||||||
if (dir != WINDOW_RECVING && dir != WINDOW_SENDING) {
|
if (dir != WINDOW_RECVING && dir != WINDOW_SENDING) {
|
||||||
errx(1, "Invalid window direction!");
|
errx(1, "Invalid window direction!");
|
||||||
}
|
}
|
||||||
if (fragsize > MAX_FRAGSIZE) {
|
|
||||||
errx(fragsize, "Fragsize too large! Please recompile with larger MAX_FRAGSIZE!");
|
|
||||||
}
|
|
||||||
|
|
||||||
buf->frags = calloc(length, sizeof(fragment));
|
window_buffer_resize(buf, length, maxfraglen);
|
||||||
if (!buf->frags) {
|
|
||||||
errx(1, "Failed to allocate fragment buffer!");
|
|
||||||
}
|
|
||||||
buf->length = length;
|
|
||||||
buf->windowsize = windowsize;
|
buf->windowsize = windowsize;
|
||||||
buf->maxfraglen = fragsize;
|
|
||||||
buf->window_end = AFTER(buf, windowsize);
|
|
||||||
buf->direction = dir;
|
buf->direction = dir;
|
||||||
buf->timeout.tv_sec = 5;
|
|
||||||
buf->timeout.tv_usec = 0;
|
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
window_buffer_reset(struct frag_buffer *w)
|
window_buffer_resize(struct frag_buffer *w, size_t length, unsigned maxfraglen)
|
||||||
{
|
{
|
||||||
w->chunk_start = 0;
|
if (w->length == length && w->maxfraglen == maxfraglen) {
|
||||||
w->cur_seq_id = 0;
|
return;
|
||||||
w->last_write = 0;
|
}
|
||||||
w->numitems = 0;
|
|
||||||
w->oos = 0;
|
|
||||||
w->resends = 0;
|
|
||||||
w->start_seq_id = 0;
|
|
||||||
w->window_start = 0;
|
|
||||||
w->window_end = AFTER(w, w->windowsize);
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
|
||||||
window_buffer_resize(struct frag_buffer *w, size_t length)
|
|
||||||
{
|
|
||||||
if (w->length == length) return;
|
|
||||||
if (w->numitems > 0) {
|
if (w->numitems > 0) {
|
||||||
WDEBUG("Resizing window buffer with things still in it! This will cause problems!");
|
WDEBUG("Resizing window buffer with things still in it = data loss!");
|
||||||
}
|
}
|
||||||
if (w->frags) free(w->frags);
|
|
||||||
w->frags = calloc(length, sizeof(fragment));
|
w->frags = malloc(length * sizeof(fragment));
|
||||||
if (!w->frags) {
|
if (!w->frags) {
|
||||||
errx(1, "Failed to resize window buffer!");
|
errx(1, "Failed to allocate fragment buffer!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w->data = malloc(length * maxfraglen);
|
||||||
|
if (!w->data) {
|
||||||
|
errx(1, "Failed to allocate fragment data buffer! "
|
||||||
|
"Maybe fragsize too large (%u)?", maxfraglen);
|
||||||
|
}
|
||||||
|
|
||||||
w->length = length;
|
w->length = length;
|
||||||
window_buffer_reset(w);
|
w->maxfraglen = maxfraglen;
|
||||||
|
window_buffer_clear(w);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -100,6 +86,7 @@ window_buffer_destroy(struct frag_buffer *w)
|
||||||
{
|
{
|
||||||
if (!w) return;
|
if (!w) return;
|
||||||
if (w->frags) free(w->frags);
|
if (w->frags) free(w->frags);
|
||||||
|
if (w->data) free(w->data);
|
||||||
free(w);
|
free(w);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,9 +94,26 @@ void
|
||||||
window_buffer_clear(struct frag_buffer *w)
|
window_buffer_clear(struct frag_buffer *w)
|
||||||
{
|
{
|
||||||
if (!w) return;
|
if (!w) return;
|
||||||
|
|
||||||
memset(w->frags, 0, w->length * sizeof(fragment));
|
memset(w->frags, 0, w->length * sizeof(fragment));
|
||||||
window_buffer_reset(w);
|
memset(w->data, 0, w->length * w->maxfraglen);
|
||||||
|
|
||||||
|
/* Fix fragment data pointers */
|
||||||
|
for (size_t i = 0; i < w->length; i++) {
|
||||||
|
w->frags[i].data = FRAG_DATA(w, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
w->numitems = 0;
|
||||||
|
w->window_start = 0;
|
||||||
|
w->window_end = AFTER(w, w->windowsize);
|
||||||
|
w->last_write = 0;
|
||||||
|
w->chunk_start = 0;
|
||||||
|
w->cur_seq_id = 0;
|
||||||
|
w->start_seq_id = 0;
|
||||||
|
w->max_retries = 5;
|
||||||
|
w->resends = 0;
|
||||||
|
w->oos = 0;
|
||||||
|
w->timeout.tv_sec = 5;
|
||||||
|
w->timeout.tv_usec = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns number of available fragment slots (NOT BYTES) */
|
/* Returns number of available fragment slots (NOT BYTES) */
|
||||||
|
@ -119,18 +123,6 @@ window_buffer_available(struct frag_buffer *w)
|
||||||
return w->length - w->numitems;
|
return w->length - w->numitems;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Places a fragment in the window after the last one */
|
|
||||||
int
|
|
||||||
window_append_fragment(struct frag_buffer *w, fragment *src)
|
|
||||||
{
|
|
||||||
if (window_buffer_available(w) < 1) return 0;
|
|
||||||
memcpy(&w->frags[w->last_write], src, sizeof(fragment));
|
|
||||||
w->last_write = WRAP(w->last_write + 1);
|
|
||||||
w->numitems ++;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
ssize_t
|
ssize_t
|
||||||
window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
||||||
/* Handles fragment received from the sending side (RECV)
|
/* Handles fragment received from the sending side (RECV)
|
||||||
|
@ -146,16 +138,9 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
||||||
|
|
||||||
if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
|
if (!INWINDOW_SEQ(startid, endid, f->seqID)) {
|
||||||
w->oos++;
|
w->oos++;
|
||||||
if (offset > MIN(w->length - w->numitems, MAX_SEQ_ID / 2)) {
|
/* Only drop the fragment if it is ancient or from the future */
|
||||||
/* Only drop the fragment if it is ancient */
|
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid);
|
||||||
WDEBUG("Dropping frag with seqID %u: not in window (%u-%u)", f->seqID, startid, endid);
|
return -1;
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
/* Save "new" fragments to avoid causing other end to advance
|
|
||||||
* when this fragment is ACK'd despite being dropped */
|
|
||||||
WDEBUG("WARNING: Got future fragment (%u), offset %u from start %u (wsize %u).",
|
|
||||||
f->seqID, offset, startid, w->windowsize);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/* Place fragment into correct location in buffer */
|
/* Place fragment into correct location in buffer */
|
||||||
ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
|
ssize_t dest = WRAP(w->window_start + SEQ_OFFSET(startid, f->seqID));
|
||||||
|
@ -164,7 +149,7 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
||||||
|
|
||||||
/* Check if fragment already received */
|
/* Check if fragment already received */
|
||||||
fd = &w->frags[dest];
|
fd = &w->frags[dest];
|
||||||
if (fd->len != 0) {
|
if (fd->len == f->len && fd->seqID == f->seqID) {
|
||||||
WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
|
WDEBUG("Received duplicate frag, dropping. (prev %u/new %u)", fd->seqID, f->seqID);
|
||||||
if (f->seqID == fd->seqID) {
|
if (f->seqID == fd->seqID) {
|
||||||
/* use retries as counter for dupes */
|
/* use retries as counter for dupes */
|
||||||
|
@ -172,8 +157,12 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fd->len = MIN(f->len, w->maxfraglen);
|
||||||
|
fd->compressed = f->compressed;
|
||||||
|
fd->start = f->start;
|
||||||
|
fd->end = f->end;
|
||||||
|
|
||||||
memcpy(fd, f, sizeof(fragment));
|
memcpy(fd->data, f->data, fd->len);
|
||||||
w->numitems ++;
|
w->numitems ++;
|
||||||
|
|
||||||
fd->retries = 0;
|
fd->retries = 0;
|
||||||
|
@ -188,15 +177,17 @@ window_process_incoming_fragment(struct frag_buffer *w, fragment *f)
|
||||||
/* Reassembles first complete sequence of fragments into data. (RECV)
|
/* Reassembles first complete sequence of fragments into data. (RECV)
|
||||||
* Returns length of data reassembled, or 0 if no data reassembled */
|
* Returns length of data reassembled, or 0 if no data reassembled */
|
||||||
size_t
|
size_t
|
||||||
window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression)
|
window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint8_t *compression)
|
||||||
{
|
{
|
||||||
size_t woffs, fraglen, datalen = 0;
|
size_t woffs, fraglen, start, datalen = 0, found_frags = 0;
|
||||||
uint8_t *dest; //, *fdata_start;
|
uint8_t *dest; //, *fdata_start;
|
||||||
dest = data;
|
dest = data;
|
||||||
if (w->direction != WINDOW_RECVING)
|
if (w->direction != WINDOW_RECVING)
|
||||||
return 0;
|
return 0;
|
||||||
if (w->frags[w->chunk_start].start == 0 && w->numitems > 0) {
|
|
||||||
WDEBUG("chunk_start (%" L "u) pointing to non-start fragment (seq %u, len %" L "u)!",
|
/* start fragment may be missing, so only stop if w is empty */
|
||||||
|
if (w->frags[w->chunk_start].start == 0 && w->numitems == 0) {
|
||||||
|
WDEBUG("chunk_start (%" L "u) != start and w empty (seq %u, len %" L "u)!",
|
||||||
w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len);
|
w->chunk_start, w->frags[w->chunk_start].seqID, w->frags[w->chunk_start].len);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -204,42 +195,73 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
|
||||||
|
|
||||||
fragment *f;
|
fragment *f;
|
||||||
size_t i;
|
size_t i;
|
||||||
unsigned curseq;
|
unsigned curseq, consecutive_frags = 0;
|
||||||
int end = 0;
|
int end = 0, drop = 0; /* if packet is dropped, clean out old frags */
|
||||||
curseq = w->frags[w->chunk_start].seqID;
|
curseq = w->frags[w->chunk_start].seqID;
|
||||||
|
curseq = w->start_seq_id;
|
||||||
|
|
||||||
for (i = 0; i < w->numitems; ++i) {
|
for (i = 0; i < w->numitems; ++i) {
|
||||||
woffs = WRAP(w->chunk_start + i);
|
woffs = WRAP(w->chunk_start + i);
|
||||||
f = &w->frags[woffs];
|
f = &w->frags[woffs];
|
||||||
fraglen = f->len;
|
fraglen = f->len;
|
||||||
if (fraglen == 0 || f->seqID != curseq) {
|
|
||||||
WDEBUG("Missing next frag %u [%" L "u], got seq %u (%" L "u bytes) instead! Not reassembling!",
|
/* Drop packets if some fragments are missing after reaching max retries
|
||||||
|
* or packet timeout
|
||||||
|
* Note: this lowers the guaranteed arrival constraint */
|
||||||
|
/* Process:
|
||||||
|
* if buffer contains >0 frags
|
||||||
|
* for frag in buffer from start {
|
||||||
|
* if frag empty: skip; else
|
||||||
|
* if frag.start {
|
||||||
|
* attempt reassembly as normal;
|
||||||
|
* continue from end of full packet;
|
||||||
|
* } else skip;
|
||||||
|
* endif
|
||||||
|
* }
|
||||||
|
*/
|
||||||
|
if (fraglen == 0) { /* Empty fragment */
|
||||||
|
WDEBUG("reassemble: hole at frag %u [%" L "u]",
|
||||||
curseq, woffs, f->seqID, fraglen);
|
curseq, woffs, f->seqID, fraglen);
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
WDEBUG(" Fragment seq %u, data length %" L "u, data offset %" L "u, total len %" L "u, maxlen %" L "u",
|
/* reset reassembly things to start over */
|
||||||
f->seqID, fraglen, dest - data, datalen, maxlen);
|
consecutive_frags = 0;
|
||||||
memcpy(dest, f->data, MIN(fraglen, maxlen));
|
|
||||||
dest += fraglen;
|
} else if (f->start || consecutive_frags >= 1) {
|
||||||
datalen += fraglen;
|
found_frags++;
|
||||||
if (compression) {
|
consecutive_frags++;
|
||||||
*compression &= f->compressed & 1;
|
WDEBUG("reassemble: frag seq %u, data length %" L "u, data offset %" \
|
||||||
if (f->compressed != *compression) {
|
L "u, total len %" L "u, maxlen %" L "u, found %" L "u, consecutive %" L "u",
|
||||||
WDEBUG("Inconsistent compression flags in chunk. Will reassemble anyway!");
|
f->seqID, fraglen, dest - data, datalen, maxlen);
|
||||||
|
if (drop == 0) {
|
||||||
|
/* Copy next fragment to buffer if not going to drop */
|
||||||
|
memcpy(dest, f->data, MIN(fraglen, maxlen));
|
||||||
|
}
|
||||||
|
dest += fraglen;
|
||||||
|
datalen += fraglen;
|
||||||
|
if (compression) {
|
||||||
|
*compression &= f->compressed & 1;
|
||||||
|
if (f->compressed != *compression) {
|
||||||
|
WDEBUG("Inconsistent compression flags in chunk. Will reassemble anyway!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (fraglen > maxlen) {
|
||||||
|
WDEBUG("Data buffer too small: drop packet! Reassembled %" L "u bytes.", datalen);
|
||||||
|
drop = 1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (fraglen > maxlen) {
|
|
||||||
WDEBUG("Data buffer too small! Reassembled %" L "u bytes.", datalen);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Move window along to avoid weird issues */
|
/* Move window along to avoid weird issues */
|
||||||
window_tick(w);
|
window_tick(w);
|
||||||
|
|
||||||
if (f->end == 1) {
|
if (f->end == 1) {
|
||||||
WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)", f->seqID, i, datalen);
|
WDEBUG("Found end of chunk! (seqID %u, chunk len %" L "u, datalen %" L "u)", f->seqID, i, datalen);
|
||||||
end = 1;
|
end = 1;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (found_frags >= w->numitems) {
|
||||||
|
/* no point continuing if no full packets yet and no other action */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Move position counters and expected next seqID */
|
/* Move position counters and expected next seqID */
|
||||||
|
@ -247,7 +269,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
|
||||||
curseq = (curseq + 1) % MAX_SEQ_ID;
|
curseq = (curseq + 1) % MAX_SEQ_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (end == 0) {
|
if (end == 0 && drop == 0) {
|
||||||
/* no end of chunk found because the window buffer has no more frags
|
/* no end of chunk found because the window buffer has no more frags
|
||||||
* meaning they haven't been received yet. */
|
* meaning they haven't been received yet. */
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -261,7 +283,7 @@ window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int
|
||||||
);
|
);
|
||||||
w->chunk_start = WRAP(woffs + 1);
|
w->chunk_start = WRAP(woffs + 1);
|
||||||
w->numitems -= i + 1;
|
w->numitems -= i + 1;
|
||||||
return datalen;
|
return drop == 0 ? datalen : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t
|
size_t
|
||||||
|
@ -339,8 +361,8 @@ window_get_next_sending_fragment(struct frag_buffer *w, int *other_ack)
|
||||||
|
|
||||||
if (f->retries >= 1 && !timercmp(&age, &w->timeout, <)) {
|
if (f->retries >= 1 && !timercmp(&age, &w->timeout, <)) {
|
||||||
/* Resending fragment due to ACK timeout */
|
/* Resending fragment due to ACK timeout */
|
||||||
WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/total %u",
|
WDEBUG("Retrying frag %u (%ld ms old/timeout %ld ms), retries: %u/max %u/total %u",
|
||||||
f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->resends);
|
f->seqID, timeval_to_ms(&age), timeval_to_ms(&w->timeout), f->retries, w->max_retries, w->resends);
|
||||||
w->resends ++;
|
w->resends ++;
|
||||||
goto found;
|
goto found;
|
||||||
} else if (f->retries == 0 && f->len > 0) {
|
} else if (f->retries == 0 && f->len > 0) {
|
||||||
|
@ -404,6 +426,7 @@ void
|
||||||
window_tick(struct frag_buffer *w)
|
window_tick(struct frag_buffer *w)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < w->windowsize; i++) {
|
for (size_t i = 0; i < w->windowsize; i++) {
|
||||||
|
// TODO are ACKs required for reduced arrival guarantee?
|
||||||
if (w->frags[w->window_start].acks >= 1) {
|
if (w->frags[w->window_start].acks >= 1) {
|
||||||
#ifdef DEBUG_BUILD
|
#ifdef DEBUG_BUILD
|
||||||
unsigned old_start_id = w->start_seq_id;
|
unsigned old_start_id = w->start_seq_id;
|
||||||
|
@ -415,7 +438,7 @@ window_tick(struct frag_buffer *w)
|
||||||
if (w->direction == WINDOW_SENDING) {
|
if (w->direction == WINDOW_SENDING) {
|
||||||
WDEBUG("Clearing old fragments in SENDING window.");
|
WDEBUG("Clearing old fragments in SENDING window.");
|
||||||
w->numitems --; /* Clear old fragments */
|
w->numitems --; /* Clear old fragments */
|
||||||
memset(&w->frags[w->window_start], 0, sizeof(fragment));
|
w->frags[w->window_start].len = 0;
|
||||||
}
|
}
|
||||||
w->window_start = AFTER(w, 1);
|
w->window_start = AFTER(w, 1);
|
||||||
|
|
||||||
|
@ -427,7 +450,7 @@ window_tick(struct frag_buffer *w)
|
||||||
/* Splits data into fragments and adds to the end of the window buffer for sending
|
/* Splits data into fragments and adds to the end of the window buffer for sending
|
||||||
* All fragment meta-data is created here (SEND) */
|
* All fragment meta-data is created here (SEND) */
|
||||||
int
|
int
|
||||||
window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int compressed)
|
window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8_t compressed)
|
||||||
{
|
{
|
||||||
// Split data into thingies of <= fragsize
|
// Split data into thingies of <= fragsize
|
||||||
size_t n = ((len - 1) / w->maxfraglen) + 1;
|
size_t n = ((len - 1) / w->maxfraglen) + 1;
|
||||||
|
@ -437,21 +460,27 @@ window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int c
|
||||||
}
|
}
|
||||||
compressed &= 1;
|
compressed &= 1;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
static fragment f;
|
fragment *f = &w->frags[w->last_write];
|
||||||
WDEBUG("add data len %" L "u, %" L "u frags, max fragsize %u", len, n, w->maxfraglen);
|
WDEBUG("add data len %" L "u, %" L "u frags, max fragsize %u", len, n, w->maxfraglen);
|
||||||
for (size_t i = 0; i < n; i++) {
|
for (size_t i = 0; i < n; i++) {
|
||||||
memset(&f, 0, sizeof(f));
|
f->len = MIN(len - offset, w->maxfraglen);
|
||||||
f.len = MIN(len - offset, w->maxfraglen);
|
memcpy(f->data, data + offset, f->len);
|
||||||
memcpy(f.data, data + offset, f.len);
|
f->seqID = w->cur_seq_id;
|
||||||
f.seqID = w->cur_seq_id;
|
f->start = (i == 0) ? 1 : 0;
|
||||||
f.start = (i == 0) ? 1 : 0;
|
f->end = (i == n - 1) ? 1 : 0;
|
||||||
f.end = (i == n - 1) ? 1 : 0;
|
f->compressed = compressed;
|
||||||
f.compressed = compressed;
|
f->ack_other = -1;
|
||||||
f.ack_other = -1;
|
|
||||||
window_append_fragment(w, &f);
|
/* append fragment */
|
||||||
|
if (window_buffer_available(w) < 1) return 0;
|
||||||
|
memcpy(&w->frags[w->last_write], f, sizeof(fragment));
|
||||||
|
|
||||||
|
w->last_write = WRAP(w->last_write + 1);
|
||||||
|
w->numitems ++;
|
||||||
|
|
||||||
w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID;
|
w->cur_seq_id = (w->cur_seq_id + 1) % MAX_SEQ_ID;
|
||||||
WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", f.len, f.seqID, f.start, f.end, offset);
|
WDEBUG(" fragment len %" L "u, seqID %u, s %u, end %u, dOffs %" L "u", f->len, f->seqID, f->start, f->end, offset);
|
||||||
offset += f.len;
|
offset += f->len;
|
||||||
}
|
}
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
45
src/window.h
45
src/window.h
|
@ -20,41 +20,43 @@
|
||||||
/* Hard-coded sequence ID and fragment size limits
|
/* Hard-coded sequence ID and fragment size limits
|
||||||
* These should match the limitations of the protocol. */
|
* These should match the limitations of the protocol. */
|
||||||
#define MAX_SEQ_ID 256
|
#define MAX_SEQ_ID 256
|
||||||
#define MAX_FRAGSIZE 4096
|
#define MAX_FRAGSIZE 2048
|
||||||
|
|
||||||
/* Window function definitions. */
|
/* Window function definitions. */
|
||||||
#define WINDOW_SENDING 1
|
#define WINDOW_SENDING 1
|
||||||
#define WINDOW_RECVING 0
|
#define WINDOW_RECVING 0
|
||||||
|
|
||||||
typedef struct fragment {
|
typedef struct fragment {
|
||||||
|
uint8_t *data; /* pointer to fragment data */
|
||||||
|
struct timeval lastsent; /* timestamp of most recent send attempt */
|
||||||
size_t len; /* Length of fragment data (0 if fragment unused) */
|
size_t len; /* Length of fragment data (0 if fragment unused) */
|
||||||
unsigned seqID; /* fragment sequence ID */
|
unsigned seqID; /* fragment sequence ID */
|
||||||
|
unsigned retries; /* number of times has been sent or dupes recv'd */
|
||||||
|
int acks; /* number of times packet has been ack'd */
|
||||||
int ack_other; /* other way ACK seqID (>=0) or unset (<0) */
|
int ack_other; /* other way ACK seqID (>=0) or unset (<0) */
|
||||||
int compressed; /* compression flag */
|
uint8_t compressed; /* compression flag */
|
||||||
uint8_t start; /* start of chunk flag */
|
uint8_t start; /* start of chunk flag */
|
||||||
uint8_t end; /* end of chunk flag */
|
uint8_t end; /* end of chunk flag */
|
||||||
uint8_t data[MAX_FRAGSIZE]; /* fragment data */
|
|
||||||
unsigned retries; /* number of times has been sent or dupes recv'd */
|
|
||||||
struct timeval lastsent; /* timestamp of most recent send attempt */
|
|
||||||
int acks; /* number of times packet has been ack'd */
|
|
||||||
} fragment;
|
} fragment;
|
||||||
|
|
||||||
struct frag_buffer {
|
struct frag_buffer {
|
||||||
fragment *frags; /* pointer to array of data fragments */
|
fragment *frags; /* pointer to array of fragment metadata */
|
||||||
unsigned windowsize; /* Max number of fragments in flight */
|
uint8_t *data; /* pointer to actual fragment data */
|
||||||
unsigned maxfraglen; /* Max outgoing fragment data size */
|
|
||||||
size_t length; /* Length of buffer */
|
size_t length; /* Length of buffer */
|
||||||
size_t numitems; /* number of non-empty fragments stored in buffer */
|
size_t numitems; /* number of non-empty fragments stored in buffer */
|
||||||
size_t window_start; /* Start of window (index) */
|
size_t window_start; /* Start of window (index) */
|
||||||
size_t window_end; /* End of window (index) */
|
size_t window_end; /* End of window (index) */
|
||||||
size_t last_write; /* Last fragment appended (index) */
|
size_t last_write; /* Last fragment appended (index) */
|
||||||
size_t chunk_start; /* Start of current chunk of fragments (index) */
|
size_t chunk_start; /* Start of current chunk of fragments (index) */
|
||||||
|
struct timeval timeout; /* Fragment ACK timeout before resend or drop */
|
||||||
|
unsigned windowsize; /* Max number of fragments in flight */
|
||||||
|
unsigned maxfraglen; /* Max outgoing fragment data size */
|
||||||
unsigned cur_seq_id; /* Next unused sequence ID */
|
unsigned cur_seq_id; /* Next unused sequence ID */
|
||||||
unsigned start_seq_id; /* Start of window sequence ID */
|
unsigned start_seq_id; /* Start of window sequence ID */
|
||||||
|
unsigned max_retries; /* max number of resends before dropping (-1 = never drop) */
|
||||||
unsigned resends; /* number of fragments resent or number of dupes received */
|
unsigned resends; /* number of fragments resent or number of dupes received */
|
||||||
unsigned oos; /* Number of out-of-sequence fragments received */
|
unsigned oos; /* Number of out-of-sequence fragments received */
|
||||||
int direction; /* WINDOW_SENDING or WINDOW_RECVING */
|
int direction; /* WINDOW_SENDING or WINDOW_RECVING */
|
||||||
struct timeval timeout; /* Fragment ACK timeout before resend */
|
|
||||||
};
|
};
|
||||||
|
|
||||||
extern int window_debug;
|
extern int window_debug;
|
||||||
|
@ -70,6 +72,9 @@ extern int window_debug;
|
||||||
#define WDEBUG(...)
|
#define WDEBUG(...)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/* Gets pointer to fragment data given fragment index */
|
||||||
|
#define FRAG_DATA(w, fragIndex) ((w->data + (w->maxfraglen * fragIndex)))
|
||||||
|
|
||||||
/* Gets index of fragment o fragments after window start */
|
/* Gets index of fragment o fragments after window start */
|
||||||
#define AFTER(w, o) ((w->window_start + o) % w->length)
|
#define AFTER(w, o) ((w->window_start + o) % w->length)
|
||||||
|
|
||||||
|
@ -109,29 +114,27 @@ extern int window_debug;
|
||||||
}\
|
}\
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Window buffer creation and housekeeping */
|
/* Window buffer creation */
|
||||||
struct frag_buffer *window_buffer_init(size_t length, unsigned windowsize, unsigned fragsize, int dir);
|
struct frag_buffer *window_buffer_init(size_t length, unsigned windowsize, unsigned maxfraglen, int dir);
|
||||||
void window_buffer_resize(struct frag_buffer *w, size_t length);
|
|
||||||
|
/* Resize buffer, clear and reset stats and data */
|
||||||
|
void window_buffer_resize(struct frag_buffer *w, size_t length, unsigned maxfraglen);
|
||||||
|
|
||||||
|
/* Destroys window buffer instance */
|
||||||
void window_buffer_destroy(struct frag_buffer *w);
|
void window_buffer_destroy(struct frag_buffer *w);
|
||||||
|
|
||||||
/* Clears fragments and resets window stats */
|
/* Clears fragments and resets window stats */
|
||||||
void window_buffer_clear(struct frag_buffer *w);
|
void window_buffer_clear(struct frag_buffer *w);
|
||||||
|
|
||||||
/* Resets window stats without clearing fragments */
|
|
||||||
void window_buffer_reset(struct frag_buffer *w);
|
|
||||||
|
|
||||||
/* Returns number of available fragment slots (NOT BYTES) */
|
/* Returns number of available fragment slots (NOT BYTES) */
|
||||||
size_t window_buffer_available(struct frag_buffer *w);
|
size_t window_buffer_available(struct frag_buffer *w);
|
||||||
|
|
||||||
/* Places a fragment in the window after the last one */
|
|
||||||
int window_append_fragment(struct frag_buffer *w, fragment *src);
|
|
||||||
|
|
||||||
/* Handles fragment received from the sending side (RECV) */
|
/* Handles fragment received from the sending side (RECV) */
|
||||||
ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f);
|
ssize_t window_process_incoming_fragment(struct frag_buffer *w, fragment *f);
|
||||||
|
|
||||||
/* Reassembles first complete sequence of fragments into data. (RECV)
|
/* Reassembles first complete sequence of fragments into data. (RECV)
|
||||||
* Returns length of data reassembled, or 0 if no data reassembled */
|
* Returns length of data reassembled, or 0 if no data reassembled */
|
||||||
size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, int *compression);
|
size_t window_reassemble_data(struct frag_buffer *w, uint8_t *data, size_t maxlen, uint8_t *compression);
|
||||||
|
|
||||||
/* Returns number of fragments to be sent */
|
/* Returns number of fragments to be sent */
|
||||||
size_t window_sending(struct frag_buffer *w, struct timeval *);
|
size_t window_sending(struct frag_buffer *w, struct timeval *);
|
||||||
|
@ -151,6 +154,6 @@ void window_tick(struct frag_buffer *w);
|
||||||
|
|
||||||
/* Splits data into fragments and adds to the end of the window buffer for sending
|
/* Splits data into fragments and adds to the end of the window buffer for sending
|
||||||
* All fragment meta-data is created here (SEND) */
|
* All fragment meta-data is created here (SEND) */
|
||||||
int window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, int compressed);
|
int window_add_outgoing_data(struct frag_buffer *w, uint8_t *data, size_t len, uint8_t compressed);
|
||||||
|
|
||||||
#endif /* __WINDOW_H__ */
|
#endif /* __WINDOW_H__ */
|
||||||
|
|
|
@ -67,7 +67,7 @@ START_TEST(test_window_everything)
|
||||||
uint8_t newdata[1000];
|
uint8_t newdata[1000];
|
||||||
memset(newdata, 0, 1000);
|
memset(newdata, 0, 1000);
|
||||||
unsigned i;
|
unsigned i;
|
||||||
int c;
|
uint8_t c;
|
||||||
for (i = 0; i < 50; i++) {
|
for (i = 0; i < 50; i++) {
|
||||||
memset(data, 0, 100);
|
memset(data, 0, 100);
|
||||||
size_t len = window_reassemble_data(in, data, 100, &c);
|
size_t len = window_reassemble_data(in, data, 100, &c);
|
||||||
|
|
Loading…
Add table
Reference in a new issue