Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
uWSGI / core / async.c
Size: Mime:
#include <uwsgi.h>

extern struct uwsgi_server uwsgi;

/*

	This is a general-purpose async loop engine (it expects a coroutine-based approach)

	You can see it as an hub holding the following structures:

	1) the runqueue, cores ready to be run are appended to this list

	2) the fd list, this is a list of monitored file descriptors, a core can wait for all the file descriptors it needs

	3) the timeout value, if set, the current core will timeout after the specified number of seconds (unless an event cancels it)


	IMPORTANT: this is not a callback-based engine !!!

*/

// this is called whenever a new connection is ready, but there are no cores to handle it
void uwsgi_async_queue_is_full(time_t now) {
	if (now > uwsgi.async_queue_is_full) {
		uwsgi_log_verbose("[DANGER] async queue is full !!!\n");
		uwsgi.async_queue_is_full = now;
	}
}

void uwsgi_async_init() {

	uwsgi.async_queue = event_queue_init();

	if (uwsgi.async_queue < 0) {
		exit(1);
	}

	uwsgi_add_sockets_to_queue(uwsgi.async_queue, -1);

	uwsgi.rb_async_timeouts = uwsgi_init_rb_timer();

	// optimization, this array maps file descriptor to requests
        uwsgi.async_waiting_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd);
        uwsgi.async_proto_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd);

}

struct wsgi_request *find_wsgi_req_proto_by_fd(int fd) {
	return uwsgi.async_proto_fd_table[fd];
}

struct wsgi_request *find_wsgi_req_by_fd(int fd) {
	return uwsgi.async_waiting_fd_table[fd];
}

static void runqueue_remove(struct uwsgi_async_request *u_request) {

	struct uwsgi_async_request *parent = u_request->prev;
	struct uwsgi_async_request *child = u_request->next;

	if (parent) {
		parent->next = child;
	}
	if (child) {
		child->prev = parent;
	}

	if (u_request == uwsgi.async_runqueue) {
		uwsgi.async_runqueue = child;
	}

	if (u_request == uwsgi.async_runqueue_last) {
		uwsgi.async_runqueue_last = parent;
	}

	free(u_request);
}

static void runqueue_push(struct wsgi_request *wsgi_req) {

	// do not push the same request in the runqueue
	struct uwsgi_async_request *uar = uwsgi.async_runqueue;
	while(uar) {
		if (uar->wsgi_req == wsgi_req) return;
		uar = uar->next;
	}

	uar = uwsgi_malloc(sizeof(struct uwsgi_async_request));
	uar->prev = NULL;
	uar->next = NULL;
	uar->wsgi_req = wsgi_req;

	if (uwsgi.async_runqueue == NULL) {
		uwsgi.async_runqueue = uar;
	}
	else {
		uar->prev = uwsgi.async_runqueue_last;	
	}

	if (uwsgi.async_runqueue_last) {
		uwsgi.async_runqueue_last->next = uar;
	}
	uwsgi.async_runqueue_last = uar;
}

struct wsgi_request *find_first_available_wsgi_req() {

	struct wsgi_request *wsgi_req;

	if (uwsgi.async_queue_unused_ptr < 0) {
		return NULL;
	}

	wsgi_req = uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr];
	uwsgi.async_queue_unused_ptr--;
	return wsgi_req;
}

void async_reset_request(struct wsgi_request *wsgi_req) {
	if (wsgi_req->async_timeout) {
		uwsgi_del_rb_timer(uwsgi.rb_async_timeouts, wsgi_req->async_timeout);
		free(wsgi_req->async_timeout);
		wsgi_req->async_timeout = NULL;
	}
	
	struct uwsgi_async_fd *uaf = wsgi_req->waiting_fds;
	while (uaf) {
        	event_queue_del_fd(uwsgi.async_queue, uaf->fd, uaf->event);
                uwsgi.async_waiting_fd_table[uaf->fd] = NULL;
                struct uwsgi_async_fd *current_uaf = uaf;
                uaf = current_uaf->next;
                free(current_uaf);
	}

	wsgi_req->waiting_fds = NULL;
}

static void async_expire_timeouts(uint64_t now) {

	struct wsgi_request *wsgi_req;
	struct uwsgi_rb_timer *urbt;

	for (;;) {

		urbt = uwsgi_min_rb_timer(uwsgi.rb_async_timeouts, NULL);

		if (urbt == NULL)
			return;

		if (urbt->value <= now) {
			wsgi_req = (struct wsgi_request *) urbt->data;
			// timeout expired
			wsgi_req->async_timed_out = 1;
			// reset the request
			async_reset_request(wsgi_req);
			// push it in the runqueue
			runqueue_push(wsgi_req);
			continue;
		}

		break;
	}

}

int async_add_fd_read(struct wsgi_request *wsgi_req, int fd, int timeout) {

	if (uwsgi.async < 2 || !uwsgi.async_waiting_fd_table){ 
		uwsgi_log_verbose("ASYNC call without async mode !!!\n");
		return -1;
	}

	struct uwsgi_async_fd *last_uad = NULL, *uad = wsgi_req->waiting_fds;

	if (fd < 0)
		return -1;

	// find last slot
	while (uad) {
		last_uad = uad;
		uad = uad->next;
	}

	uad = uwsgi_malloc(sizeof(struct uwsgi_async_fd));
	uad->fd = fd;
	uad->event = event_queue_read();
	uad->prev = last_uad;
	uad->next = NULL;

	if (last_uad) {
		last_uad->next = uad;
	}
	else {
		wsgi_req->waiting_fds = uad;
	}

	if (timeout > 0) {
		async_add_timeout(wsgi_req, timeout);
	}
	uwsgi.async_waiting_fd_table[fd] = wsgi_req;
	wsgi_req->async_force_again = 1;
	return event_queue_add_fd_read(uwsgi.async_queue, fd);
}

static int async_wait_fd_read(int fd, int timeout) {

	struct wsgi_request *wsgi_req = current_wsgi_req();

	wsgi_req->async_ready_fd = 0;

	if (async_add_fd_read(wsgi_req, fd, timeout)) {
		return -1;
	}
	if (uwsgi.schedule_to_main) {
		uwsgi.schedule_to_main(wsgi_req);
	}
	if (wsgi_req->async_timed_out) {
		wsgi_req->async_timed_out = 0;
		return 0;
	}
	return 1;
}

static int async_wait_fd_read2(int fd0, int fd1, int timeout, int *fd) {

        struct wsgi_request *wsgi_req = current_wsgi_req();

        wsgi_req->async_ready_fd = 0;

        if (async_add_fd_read(wsgi_req, fd0, timeout)) {
                return -1;
        }

        if (async_add_fd_read(wsgi_req, fd1, timeout)) {
		// reset already registered fd
		async_reset_request(wsgi_req);
                return -1;
        }

        if (uwsgi.schedule_to_main) {
                uwsgi.schedule_to_main(wsgi_req);
        }

        if (wsgi_req->async_timed_out) {
                wsgi_req->async_timed_out = 0;
                return 0;
        }

	if (wsgi_req->async_ready_fd) {
		*fd = wsgi_req->async_last_ready_fd;
		return 1;
	}

        return -1;
}


void async_add_timeout(struct wsgi_request *wsgi_req, int timeout) {

	if (uwsgi.async < 2 || !uwsgi.rb_async_timeouts) {
		uwsgi_log_verbose("ASYNC call without async mode !!!\n");
		return;
	}

	wsgi_req->async_ready_fd = 0;

	if (timeout > 0 && wsgi_req->async_timeout == NULL) {
		wsgi_req->async_timeout = uwsgi_add_rb_timer(uwsgi.rb_async_timeouts, uwsgi_now() + timeout, wsgi_req);
	}

}

int async_add_fd_write(struct wsgi_request *wsgi_req, int fd, int timeout) {

	if (uwsgi.async < 2 || !uwsgi.async_waiting_fd_table) {
		uwsgi_log_verbose("ASYNC call without async mode !!!\n");
		return -1;
	}

	struct uwsgi_async_fd *last_uad = NULL, *uad = wsgi_req->waiting_fds;

	if (fd < 0)
		return -1;

	// find last slot
	while (uad) {
		last_uad = uad;
		uad = uad->next;
	}

	uad = uwsgi_malloc(sizeof(struct uwsgi_async_fd));
	uad->fd = fd;
	uad->event = event_queue_write();
	uad->prev = last_uad;
	uad->next = NULL;

	if (last_uad) {
		last_uad->next = uad;
	}
	else {
		wsgi_req->waiting_fds = uad;
	}

	if (timeout > 0) {
		async_add_timeout(wsgi_req, timeout);
	}

	uwsgi.async_waiting_fd_table[fd] = wsgi_req;
	wsgi_req->async_force_again = 1;
	return event_queue_add_fd_write(uwsgi.async_queue, fd);
}

static int async_wait_fd_write(int fd, int timeout) {
	struct wsgi_request *wsgi_req = current_wsgi_req();

	wsgi_req->async_ready_fd = 0;

	if (async_add_fd_write(wsgi_req, fd, timeout)) {
		return -1;
	}
	if (uwsgi.schedule_to_main) {
		uwsgi.schedule_to_main(wsgi_req);
	}
	if (wsgi_req->async_timed_out) {
		wsgi_req->async_timed_out = 0;
		return 0;
	}
	return 1;
}

void async_schedule_to_req(void) {
#ifdef UWSGI_ROUTING
        if (uwsgi_apply_routes(uwsgi.wsgi_req) == UWSGI_ROUTE_BREAK) {
		goto end;
        }
	// a trick to avoid calling routes again
	uwsgi.wsgi_req->is_routing = 1;
#endif
	uwsgi.wsgi_req->async_status = uwsgi.p[uwsgi.wsgi_req->uh->modifier1]->request(uwsgi.wsgi_req);
        if (uwsgi.wsgi_req->async_status <= UWSGI_OK) goto end;

	if (uwsgi.schedule_to_main) {
        	uwsgi.schedule_to_main(uwsgi.wsgi_req);
	}
	return;

end:
	async_reset_request(uwsgi.wsgi_req);
	uwsgi_close_request(uwsgi.wsgi_req);
	uwsgi.wsgi_req->async_status = UWSGI_OK;	
	uwsgi.async_queue_unused_ptr++;
        uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = uwsgi.wsgi_req;
}

void async_schedule_to_req_green(void) {
	struct wsgi_request *wsgi_req = uwsgi.wsgi_req;
#ifdef UWSGI_ROUTING
        if (uwsgi_apply_routes(wsgi_req) == UWSGI_ROUTE_BREAK) {
                goto end;
        }
#endif
        for(;;) {
		wsgi_req->async_status = uwsgi.p[wsgi_req->uh->modifier1]->request(wsgi_req);
                if (wsgi_req->async_status <= UWSGI_OK) {
                        break;
                }
                wsgi_req->switches++;
		if (uwsgi.schedule_fix) {
			uwsgi.schedule_fix(wsgi_req);
		}
                // switch after each yield
		if (uwsgi.schedule_to_main)
			uwsgi.schedule_to_main(wsgi_req);
        }

#ifdef UWSGI_ROUTING
end:
#endif
	// re-set the global state
	uwsgi.wsgi_req = wsgi_req;
        async_reset_request(wsgi_req);
        uwsgi_close_request(wsgi_req);
	// re-set the global state (routing could have changed it)
	uwsgi.wsgi_req = wsgi_req;
        wsgi_req->async_status = UWSGI_OK;
	uwsgi.async_queue_unused_ptr++;
        uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = wsgi_req;
	
}

static int uwsgi_async_wait_milliseconds_hook(int timeout) {
	struct wsgi_request *wsgi_req = current_wsgi_req();
	timeout = timeout / 1000;
	if (!timeout) timeout = 1;
	async_add_timeout(wsgi_req, timeout);
	wsgi_req->async_force_again = 1;
	if (uwsgi.schedule_to_main) {
                uwsgi.schedule_to_main(wsgi_req);
        }
        if (wsgi_req->async_timed_out) {
                wsgi_req->async_timed_out = 0;
                return 0;
        }

	return -1;
}

void async_loop() {

	if (uwsgi.async < 2) {
		uwsgi_log("the async loop engine requires async mode (--async <n>)\n");
		exit(1);
	}

	int interesting_fd, i;
	struct uwsgi_rb_timer *min_timeout;
	int timeout;
	int is_a_new_connection;
	int proto_parser_status;

	uint64_t now;

	struct uwsgi_async_request *current_request = NULL;

	void *events = event_queue_alloc(64);
	struct uwsgi_socket *uwsgi_sock;

	uwsgi_async_init();

	uwsgi.async_runqueue = NULL;

	uwsgi.wait_write_hook = async_wait_fd_write;
        uwsgi.wait_read_hook = async_wait_fd_read;
        uwsgi.wait_read2_hook = async_wait_fd_read2;
	uwsgi.wait_milliseconds_hook = uwsgi_async_wait_milliseconds_hook;

	if (uwsgi.signal_socket > -1) {
		event_queue_add_fd_read(uwsgi.async_queue, uwsgi.signal_socket);
		event_queue_add_fd_read(uwsgi.async_queue, uwsgi.my_signal_socket);
	}

	// set a default request manager
	if (!uwsgi.schedule_to_req)
		uwsgi.schedule_to_req = async_schedule_to_req;

	if (!uwsgi.schedule_to_main) {
		uwsgi_log("*** DANGER *** async mode without coroutine/greenthread engine loaded !!!\n");
	}

	while (uwsgi.workers[uwsgi.mywid].manage_next_request) {

		now = (uint64_t) uwsgi_now();
		if (uwsgi.async_runqueue) {
			timeout = 0;
		}
		else {
			min_timeout = uwsgi_min_rb_timer(uwsgi.rb_async_timeouts, NULL);
			if (min_timeout) {
				timeout = min_timeout->value - now;
				if (timeout <= 0) {
					async_expire_timeouts(now);
					timeout = 0;
				}
			}
			else {
				timeout = -1;
			}
		}

		uwsgi.async_nevents = event_queue_wait_multi(uwsgi.async_queue, timeout, events, 64);

		now = (uint64_t) uwsgi_now();
		// timeout ???
		if (uwsgi.async_nevents == 0) {
			async_expire_timeouts(now);
		}


		for (i = 0; i < uwsgi.async_nevents; i++) {
			// manage events
			interesting_fd = event_queue_interesting_fd(events, i);

			// signals are executed in the main stack... in the future we could have dedicated stacks for them
			if (uwsgi.signal_socket > -1 && (interesting_fd == uwsgi.signal_socket || interesting_fd == uwsgi.my_signal_socket)) {
				uwsgi_receive_signal(interesting_fd, "worker", uwsgi.mywid);
				continue;
			}

			is_a_new_connection = 0;

			// new request coming in ?
			uwsgi_sock = uwsgi.sockets;
			while (uwsgi_sock) {

				if (interesting_fd == uwsgi_sock->fd) {

					is_a_new_connection = 1;

					uwsgi.wsgi_req = find_first_available_wsgi_req();
					if (uwsgi.wsgi_req == NULL) {
						uwsgi_async_queue_is_full((time_t)now);
						break;
					}

					// on error re-insert the request in the queue
					wsgi_req_setup(uwsgi.wsgi_req, uwsgi.wsgi_req->async_id, uwsgi_sock);
					if (wsgi_req_simple_accept(uwsgi.wsgi_req, interesting_fd)) {
						uwsgi.async_queue_unused_ptr++;
						uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = uwsgi.wsgi_req;
						break;
					}

					if (wsgi_req_async_recv(uwsgi.wsgi_req)) {
						uwsgi.async_queue_unused_ptr++;
						uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = uwsgi.wsgi_req;
						break;
					}

					// by default the core is in UWSGI_AGAIN mode
					uwsgi.wsgi_req->async_status = UWSGI_AGAIN;
					// some protocol (like zeromq) do not need additional parsing, just push it in the runqueue
					if (uwsgi.wsgi_req->do_not_add_to_async_queue) {
						runqueue_push(uwsgi.wsgi_req);
					}

					break;
				}

				uwsgi_sock = uwsgi_sock->next;
			}

			if (!is_a_new_connection) {
				// proto event
				uwsgi.wsgi_req = find_wsgi_req_proto_by_fd(interesting_fd);
				if (uwsgi.wsgi_req) {
					proto_parser_status = uwsgi.wsgi_req->socket->proto(uwsgi.wsgi_req);
					// reset timeout
					async_reset_request(uwsgi.wsgi_req);
					// parsing complete
					if (!proto_parser_status) {
						// remove fd from event poll and fd proto table 
						uwsgi.async_proto_fd_table[interesting_fd] = NULL;
						event_queue_del_fd(uwsgi.async_queue, interesting_fd, event_queue_read());
						// put request in the runqueue (set it as UWSGI_OK to signal the first run)
						uwsgi.wsgi_req->async_status = UWSGI_OK;
						runqueue_push(uwsgi.wsgi_req);
						continue;
					}
					else if (proto_parser_status < 0) {
						uwsgi.async_proto_fd_table[interesting_fd] = NULL;
						close(interesting_fd);
						uwsgi.async_queue_unused_ptr++;
						uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = uwsgi.wsgi_req;
						continue;
					}
					// re-add timer
					async_add_timeout(uwsgi.wsgi_req, uwsgi.socket_timeout);
					continue;
				}

				// app-registered event
				uwsgi.wsgi_req = find_wsgi_req_by_fd(interesting_fd);
				// unknown fd, remove it (for safety)
				if (uwsgi.wsgi_req == NULL) {
					close(interesting_fd);
					continue;
				}

				// remove all the fd monitors and timeout
				async_reset_request(uwsgi.wsgi_req);
				uwsgi.wsgi_req->async_ready_fd = 1;
				uwsgi.wsgi_req->async_last_ready_fd = interesting_fd;

				// put the request in the runqueue again
				runqueue_push(uwsgi.wsgi_req);
			}
		}


		// event queue managed, give cpu to runqueue
		current_request = uwsgi.async_runqueue;

		while(current_request) {

			// current_request could be nulled on error/end of request
			struct uwsgi_async_request *next_request = current_request->next;

			uwsgi.wsgi_req = current_request->wsgi_req;
			uwsgi.schedule_to_req();
			uwsgi.wsgi_req->switches++;

			// request ended ?
			if (uwsgi.wsgi_req->async_status <= UWSGI_OK ||
				uwsgi.wsgi_req->waiting_fds || uwsgi.wsgi_req->async_timeout) {
				// remove from the runqueue
				runqueue_remove(current_request);
			}
			current_request = next_request;
		}

	}

}