Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
uWSGI / core / offload.c
Size: Mime:
#include "uwsgi.h"

/*

	uWSGI offloading subsystem

	steps to offload a task

	1) allocate a uwsgi_offload_request structure (could be on stack)
	struct uwsgi_offload_request uor;
	2) prepare it for a specific engine (last argument is the takeover flag)
	uwsgi_offload_setup(my_engine, &uor, 1)
	3) run it (last argument is the waiter)
	int uwsgi_offload_run(wsgi_req, &uor, NULL);

	between 2 and 3 you can set specific values

*/


extern struct uwsgi_server uwsgi;

#define uwsgi_offload_retry if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) return 0;
#define uwsgi_offload_0r_1w(x, y) if (event_queue_del_fd(ut->queue, x, event_queue_read())) return -1;\
					if (event_queue_fd_read_to_write(ut->queue, y)) return -1;

void uwsgi_offload_setup(struct uwsgi_offload_engine *uoe, struct uwsgi_offload_request *uor, struct wsgi_request *wsgi_req, uint8_t takeover) {

	memset(uor, 0, sizeof(struct uwsgi_offload_request));

	uor->engine = uoe;

	uor->s = wsgi_req->fd;
	uor->fd = -1;
	uor->fd2 = -1;
	// an engine could changes behaviour based on pipe anf takeover values
	uor->pipe[0] = -1;
	uor->pipe[1] = -1;
	uor->takeover = takeover;

}

static int uwsgi_offload_enqueue(struct wsgi_request *wsgi_req, struct uwsgi_offload_request *uor) {
	struct uwsgi_core *uc = &uwsgi.workers[uwsgi.mywid].cores[wsgi_req->async_id];
	uc->offloaded_requests++;
	// round robin
	if (uc->offload_rr >= uwsgi.offload_threads) {
		uc->offload_rr = 0;
	}
	struct uwsgi_thread *ut = uwsgi.offload_thread[uc->offload_rr];
	uc->offload_rr++;
	if (write(ut->pipe[0], uor, sizeof(struct uwsgi_offload_request)) != sizeof(struct uwsgi_offload_request)) {
		if (uor->takeover) {
			wsgi_req->fd_closed = 0;
		}
		return -1;
	}
#ifdef UWSGI_DEBUG
        uwsgi_log("[offload] created session %p\n", uor);
#endif
	return 0;
}

/*

	pipe offload engine:
		fd -> the file descriptor to read from
		len -> amount of data to transfer

*/

static int u_offload_pipe_prepare(struct wsgi_request *wsgi_req, struct uwsgi_offload_request *uor) {

        if (uor->fd < 0 || !uor->len) {
                return -1;
        }
        return 0;
}

/*

        memory offload engine:
                buf -> pointer to the memory to transfer (memory is freed at the end)
                len -> amount of data to transfer

*/

static int u_offload_memory_prepare(struct wsgi_request *wsgi_req, struct uwsgi_offload_request *uor) {

        if (!uor->buf || !uor->len) {
                return -1;
        }
        return 0;
}


/*

	transfer offload engine:
		name -> socket name
		ubuf -> data to send

*/

static int u_offload_transfer_prepare(struct wsgi_request *wsgi_req, struct uwsgi_offload_request *uor) {

	if (!uor->name) {
		return -1;
	}

	uor->fd = uwsgi_connect(uor->name, 0, 1);
	if (uor->fd < 0) {
		uwsgi_error("u_offload_transfer_prepare()/connect()");
		return -1;
	}

	return 0;
}

/*

	sendfile offload engine:
		name -> filename to transfer
		fd -> file descriptor of file to transfer	
		fd2 -> sendfile destination (default wsgi_req->fd)
		len -> size of the file or amount of data to transfer
		pos -> position in the file

*/

static int u_offload_sendfile_prepare(struct wsgi_request *wsgi_req, struct uwsgi_offload_request *uor) {

	if (!uor->name && uor->fd == -1) return -1;

	if (uor->name) {
		uor->fd = open(uor->name, O_RDONLY | O_NONBLOCK);
		if (uor->fd < 0) {
			uwsgi_error_open(uor->name);
			return -1;
		}
	}

	// make a fstat to get the file size
	if (!uor->len) {
		struct stat st;
		if (fstat(uor->fd, &st)) {
			uwsgi_error("u_offload_sendfile_prepare()/fstat()");
			if (uor->name) {
				close(uor->fd);
			}
			return -1;
		}
		uor->len = st.st_size;
	}

	if (uor->fd2 == -1) {
		uor->fd2 = uor->s;
	}
	uor->s = -1;

	return 0;
}

static void uwsgi_offload_close(struct uwsgi_thread *ut, struct uwsgi_offload_request *uor) {

	// call the free function asap
	if (uor->free) {
		uor->free(uor);
	}

	// close the socket and the file descriptor
	if (uor->takeover && uor->s > -1) {
		close(uor->s);
	}
	if (uor->fd != -1) {
		close(uor->fd);
	}
	if (uor->fd2 != -1) {
		close(uor->fd2);
	}
	// remove the structure from the linked list;
	struct uwsgi_offload_request *prev = uor->prev;
	struct uwsgi_offload_request *next = uor->next;

	if (uor == ut->offload_requests_head) {
		ut->offload_requests_head = next;
	}

	if (uor == ut->offload_requests_tail) {
		ut->offload_requests_tail = prev;
	}

	if (prev) {
		prev->next = next;
	}

	if (next) {
		next->prev = prev;
	}

	if (uor->buf) {
		free(uor->buf);
	}

	if (uor->ubuf) {
		uwsgi_buffer_destroy(uor->ubuf);
	}

	if (uor->ubuf1) {
		uwsgi_buffer_destroy(uor->ubuf1);
	}
	if (uor->ubuf2) {
		uwsgi_buffer_destroy(uor->ubuf2);
	}
	if (uor->ubuf3) {
		uwsgi_buffer_destroy(uor->ubuf3);
	}
	if (uor->ubuf4) {
		uwsgi_buffer_destroy(uor->ubuf4);
	}
	if (uor->ubuf5) {
		uwsgi_buffer_destroy(uor->ubuf5);
	}
	if (uor->ubuf6) {
		uwsgi_buffer_destroy(uor->ubuf6);
	}
	if (uor->ubuf7) {
		uwsgi_buffer_destroy(uor->ubuf7);
	}
	if (uor->ubuf8) {
		uwsgi_buffer_destroy(uor->ubuf8);
	}

	if (uor->pipe[0] != -1) {
		close(uor->pipe[1]);
		close(uor->pipe[0]);
	}

	free(uor);

#ifdef UWSGI_DEBUG
	uwsgi_log("[offload] destroyed session %p\n", uor);
#endif
}

static void uwsgi_offload_append(struct uwsgi_thread *ut, struct uwsgi_offload_request *uor) {

	if (!ut->offload_requests_head) {
		ut->offload_requests_head = uor;
	}

	if (ut->offload_requests_tail) {
		ut->offload_requests_tail->next = uor;
		uor->prev = ut->offload_requests_tail;
	}

	ut->offload_requests_tail = uor;
}

static struct uwsgi_offload_request *uwsgi_offload_get_by_fd(struct uwsgi_thread *ut, int s) {
	struct uwsgi_offload_request *uor = ut->offload_requests_head;
	while (uor) {
		if (uor->s == s || uor->fd == s || uor->fd2 == s) {
			return uor;
		}
		uor = uor->next;
	}

	return NULL;
}

static void uwsgi_offload_loop(struct uwsgi_thread *ut) {

	int i;
	void *events = event_queue_alloc(uwsgi.offload_threads_events);

	for (;;) {
		// TODO make timeout tunable
		int nevents = event_queue_wait_multi(ut->queue, -1, events, uwsgi.offload_threads_events);
		for (i = 0; i < nevents; i++) {
			int interesting_fd = event_queue_interesting_fd(events, i);
			if (interesting_fd == ut->pipe[1]) {
				struct uwsgi_offload_request *uor = uwsgi_malloc(sizeof(struct uwsgi_offload_request));
				ssize_t len = read(ut->pipe[1], uor, sizeof(struct uwsgi_offload_request));
				if (len != sizeof(struct uwsgi_offload_request)) {
					uwsgi_error("read()");
					free(uor);
					continue;
				}
				// cal the event function for the first time
				if (uor->engine->event_func(ut, uor, -1)) {
					uwsgi_offload_close(ut, uor);
					continue;
				}
				uwsgi_offload_append(ut, uor);
				continue;
			}

			// get the task from the interesting fd
			struct uwsgi_offload_request *uor = uwsgi_offload_get_by_fd(ut, interesting_fd);
			if (!uor)
				continue;
			// run the hook
			if (uor->engine->event_func(ut, uor, interesting_fd)) {
				uwsgi_offload_close(ut, uor);
			}
		}
	}
}

struct uwsgi_thread *uwsgi_offload_thread_start() {
	return uwsgi_thread_new(uwsgi_offload_loop);
}

/*

	offload memory transfer

        uor->len -> the size of the memory chunk
        uor->buf -> the memory to transfer
	uor->written -> written bytes

        status: none

*/

static int u_offload_memory_do(struct uwsgi_thread *ut, struct uwsgi_offload_request *uor, int fd) {
	if (fd == -1) {
                if (event_queue_add_fd_write(ut->queue, uor->s)) return -1;
                return 0;
        }
	ssize_t rlen = write(uor->s, uor->buf + uor->written, uor->len - uor->written);
	if (rlen > 0) {
		uor->written += rlen;
		if (uor->written >= uor->len) {
			return -1;
		}
		return 0;
	}
        else if (rlen < 0) {
		uwsgi_offload_retry
                uwsgi_error("u_offload_memory_do()");
	}
	return -1;
}


/*

the offload task starts after having acquired the file fd

	uor->len -> the size of the file
	uor->pos -> start writing from pos (default 0)

	status: none

*/

static int u_offload_sendfile_do(struct uwsgi_thread *ut, struct uwsgi_offload_request *uor, int fd) {

	if (fd == -1) {
		if (event_queue_add_fd_write(ut->queue, uor->fd2)) return -1;
		return 0;
	}
#if defined(__linux__) || defined(__sun__) || defined(__GNU_kFreeBSD__)
	ssize_t len = sendfile(uor->fd2, uor->fd, &uor->pos, 128 * 1024);
	if (len > 0) {
        	uor->written += len;
                if (uor->written >= uor->len) {
			return -1;
		}
		return 0;
	}
        else if (len < 0) {
		uwsgi_offload_retry
                uwsgi_error("u_offload_sendfile_do()");
	}
#elif defined(__FreeBSD__) || defined(__DragonFly__)
	off_t sbytes = 0;
	int ret = sendfile(uor->fd, uor->fd2, uor->pos, 0, NULL, &sbytes, 0);
	// transfer finished
	if (ret == -1) {
		uor->pos += sbytes;
		uwsgi_offload_retry
                uwsgi_error("u_offload_sendfile_do()");
	}
#elif defined(__APPLE__) && !defined(NO_SENDFILE)
	off_t len = 0;
        int ret = sendfile(uor->fd, uor->fd2, uor->pos, &len, NULL, 0);
        // transfer finished
        if (ret == -1) {
                uor->pos += len;
                uwsgi_offload_retry
                uwsgi_error("u_offload_sendfile_do()");
        }
#endif
	return -1;

}

/*

	pipe offloading
	status:
		0 -> waiting for data on fd
		1 -> waiting for write to s
*/

static int u_offload_pipe_do(struct uwsgi_thread *ut, struct uwsgi_offload_request *uor, int fd) {
	
	ssize_t rlen;

	// setup
	if (fd == -1) {
		event_queue_add_fd_read(ut->queue, uor->fd);
		return 0;
	}

	switch(uor->status) {
		// read event from fd
		case 0:
			if (!uor->buf) {
				uor->buf = uwsgi_malloc(4096);
			}
			rlen = read(uor->fd, uor->buf, 4096);
			if (rlen > 0) {
				uor->to_write = rlen;
				uor->pos = 0;
				if (event_queue_del_fd(ut->queue, uor->fd, event_queue_read())) return -1;
				if (event_queue_add_fd_write(ut->queue, uor->s)) return -1;
				uor->status = 1;
				return 0;
			}
			if (rlen < 0) {
				uwsgi_offload_retry
				uwsgi_error("u_offload_pipe_do() -> read()");
			}
			return -1;
		// write event on s
		case 1:
			rlen = write(uor->s, uor->buf + uor->pos, uor->to_write);
			if (rlen > 0) {
				uor->to_write -= rlen;
				uor->pos += rlen;
				if (uor->to_write == 0) {
					if (event_queue_del_fd(ut->queue, uor->s, event_queue_write())) return -1;
					if (event_queue_add_fd_read(ut->queue, uor->fd)) return -1;
					uor->status = 0;
				}
				return 0;
			}
			else if (rlen < 0) {
				uwsgi_offload_retry
				uwsgi_error("u_offload_pipe_do() -> write()");
			}
			return -1;
		default:
			break;
	}

	return -1;
}



/*
the offload task starts soon after the call to connect()

	status:
		0 -> waiting for connection on fd
		1 -> sending request to fd (write event)
		2 -> start waiting for read on s and fd
		3 -> write to s
		4 -> write to fd
*/
static int u_offload_transfer_do(struct uwsgi_thread *ut, struct uwsgi_offload_request *uor, int fd) {
	
	ssize_t rlen;

	// setup
	if (fd == -1) {
		event_queue_add_fd_write(ut->queue, uor->fd);
		return 0;
	}

	switch(uor->status) {
		// waiting for connection
		case 0:
			if (fd == uor->fd) {
				uor->status = 1;
				// ok try to send the request right now...
				return u_offload_transfer_do(ut, uor, fd);
			}
			return -1;
		// write event (or just connected)
		case 1:
			if (fd == uor->fd) {
				// maybe we want only a connection...
				if (uor->ubuf->pos == 0) {
					uor->status = 2;
					if (event_queue_add_fd_read(ut->queue, uor->s)) return -1;
					if (event_queue_fd_write_to_read(ut->queue, uor->fd)) return -1;
					return 0;
				}
				rlen = write(uor->fd, uor->ubuf->buf + uor->written, uor->ubuf->pos-uor->written);	
				if (rlen > 0) {
					uor->written += rlen;
					if (uor->written >= (size_t)uor->ubuf->pos) {
						uor->status = 2;
						if (event_queue_add_fd_read(ut->queue, uor->s)) return -1;
						if (event_queue_fd_write_to_read(ut->queue, uor->fd)) return -1;
					}
					return 0;
				}
				else if (rlen < 0) {
					uwsgi_offload_retry
					uwsgi_error("u_offload_transfer_do() -> write()");
				}
			}	
			return -1;
		// read event from s or fd
		case 2:
			if (!uor->buf) {
				uor->buf = uwsgi_malloc(4096);
			}
			if (fd == uor->fd) {
				rlen = read(uor->fd, uor->buf, 4096);
				if (rlen > 0) {
					uor->to_write = rlen;
					uor->pos = 0;
					uwsgi_offload_0r_1w(uor->fd, uor->s)
					uor->status = 3;
					return 0;
				}
				if (rlen < 0) {
					uwsgi_offload_retry
					uwsgi_error("u_offload_transfer_do() -> read()/fd");
				}
			}
			else if (fd == uor->s) {
				rlen = read(uor->s, uor->buf, 4096);
				if (rlen > 0) {
					uor->to_write = rlen;
					uor->pos = 0;
					uwsgi_offload_0r_1w(uor->s, uor->fd)
					uor->status = 4;
					return 0;
				}
				if (rlen < 0) {
					uwsgi_offload_retry
					uwsgi_error("u_offload_transfer_do() -> read()/s");
				}
			}
			return -1;
		// write event on s
		case 3:
			rlen = write(uor->s, uor->buf + uor->pos, uor->to_write);
			if (rlen > 0) {
				uor->to_write -= rlen;
				uor->pos += rlen;
				if (uor->to_write == 0) {
					if (event_queue_fd_write_to_read(ut->queue, uor->s)) return -1;
					if (event_queue_add_fd_read(ut->queue, uor->fd)) return -1;
					uor->status = 2;
				}
				return 0;
			}
			else if (rlen < 0) {
				uwsgi_offload_retry
				uwsgi_error("u_offload_transfer_do() -> write()/s");
			}
			return -1;
		// write event on fd
		case 4:
			rlen = write(uor->fd, uor->buf + uor->pos, uor->to_write);
			if (rlen > 0) {
				uor->to_write -= rlen;
				uor->pos += rlen;
				if (uor->to_write == 0) {
					if (event_queue_fd_write_to_read(ut->queue, uor->fd)) return -1;
					if (event_queue_add_fd_read(ut->queue, uor->s)) return -1;
					uor->status = 2;
				}
				return 0;
			}
			else if (rlen < 0) {
				uwsgi_offload_retry
				uwsgi_error("u_offload_transfer_do() -> write()/fd");
			}
			return -1;
		default:
			break;
	}

	return -1;
}

int uwsgi_offload_run(struct wsgi_request *wsgi_req, struct uwsgi_offload_request *uor, int *wait) {

	if (uor->engine->prepare_func(wsgi_req, uor)) {
		return -1;
	}

	if (wait) {
                if (pipe(uor->pipe)) {
                        uwsgi_error("uwsgi_offload_setup()/pipe()");
                        return -1;
                }
                *wait = uor->pipe[0];
                uwsgi_socket_nb(uor->pipe[0]);
                uwsgi_socket_nb(uor->pipe[1]);
        }

        if (uor->takeover) {
                wsgi_req->fd_closed = 1;
        }

	if (uwsgi_offload_enqueue(wsgi_req, uor)) {
		close(uor->pipe[0]);
		close(uor->pipe[1]);
		if (uor->takeover) {
			wsgi_req->fd_closed = 0;
		}
		return -1;
        }

        return 0;
	
};

struct uwsgi_offload_engine *uwsgi_offload_engine_by_name(char *name) {
	struct uwsgi_offload_engine *uoe = uwsgi.offload_engines;
	while(uoe) {
		if (!strcmp(name, uoe->name)) {
			return uoe;
		}
	}
	return NULL;
}

struct uwsgi_offload_engine *uwsgi_offload_register_engine(char *name, int (*prepare_func)(struct wsgi_request *, struct uwsgi_offload_request *), int (*event_func) (struct uwsgi_thread *, struct uwsgi_offload_request *, int)) {

	struct uwsgi_offload_engine *old_engine=NULL,*engine=uwsgi.offload_engines;
	while(engine) {
		if (!strcmp(engine->name, name)) {
			return engine;
		}
		old_engine = engine;
		engine = engine->next;
	}

	engine = uwsgi_calloc(sizeof(struct uwsgi_offload_engine));
	engine->name = name;
	engine->prepare_func = prepare_func;
	engine->event_func = event_func;

	if (old_engine) {
		old_engine->next = engine;
	}
	else {
		uwsgi.offload_engines = engine;
	}

	return engine;
}

void uwsgi_offload_engines_register_all() {
	uwsgi.offload_engine_sendfile = uwsgi_offload_register_engine("sendfile", u_offload_sendfile_prepare, u_offload_sendfile_do);
	uwsgi.offload_engine_transfer = uwsgi_offload_register_engine("transfer", u_offload_transfer_prepare, u_offload_transfer_do);
	uwsgi.offload_engine_memory = uwsgi_offload_register_engine("memory", u_offload_memory_prepare, u_offload_memory_do);
	uwsgi.offload_engine_pipe = uwsgi_offload_register_engine("pipe", u_offload_pipe_prepare, u_offload_pipe_do);
}

int uwsgi_offload_request_sendfile_do(struct wsgi_request *wsgi_req, int fd, size_t len) {
	struct uwsgi_offload_request uor;
	uwsgi_offload_setup(uwsgi.offload_engine_sendfile, &uor, wsgi_req, 1);
	uor.fd = fd;
	uor.len = len;
	return uwsgi_offload_run(wsgi_req, &uor, NULL);
}

int uwsgi_offload_request_net_do(struct wsgi_request *wsgi_req, char *socketname, struct uwsgi_buffer *ubuf) {
	struct uwsgi_offload_request uor;
	uwsgi_offload_setup(uwsgi.offload_engine_transfer, &uor, wsgi_req, 1);
	uor.name = socketname;
	uor.ubuf = ubuf;
	return uwsgi_offload_run(wsgi_req, &uor, NULL);
}

int uwsgi_offload_request_memory_do(struct wsgi_request *wsgi_req, char *buf, size_t len) {
        struct uwsgi_offload_request uor;
        uwsgi_offload_setup(uwsgi.offload_engine_memory, &uor, wsgi_req, 1);
        uor.buf = buf;
        uor.len = len;
        return uwsgi_offload_run(wsgi_req, &uor, NULL);
}

int uwsgi_offload_request_pipe_do(struct wsgi_request *wsgi_req, int fd, size_t len) {
        struct uwsgi_offload_request uor;
        uwsgi_offload_setup(uwsgi.offload_engine_pipe, &uor, wsgi_req, 1);
        uor.fd = fd;
        uor.len = len;
        return uwsgi_offload_run(wsgi_req, &uor, NULL);
}