Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
meinheld / meinheld / server / server.c
Size: Mime:
#include "server.h"

#include <arpa/inet.h>
#include <signal.h>

#ifdef linux
#include <sys/prctl.h>
#include <sys/socket.h>
#endif

#include <sys/un.h>
#include <sys/stat.h>

#include "http_request_parser.h"
#include "response.h"
#include "log.h"
#include "client.h"
#include "util.h"
#include "input.h"
#include "timer.h"
#include "heapq.h"

#ifdef WITH_GREENLET
#include "greensupport.h"
#endif

#define ACCEPT_TIMEOUT_SECS 1
#define READ_TIMEOUT_SECS 30

#define READ_BUF_SIZE 1024 * 64

typedef struct {
   TimerObject **q;
   uint32_t size;
   uint32_t max;
} pending_queue_t;

static char *server_name = "127.0.0.1";
static uint16_t server_port = 8000;
/* static int listen_sock;  // listen socket */
static PyObject *listen_socks = NULL;  // listen socket

static volatile sig_atomic_t loop_done;
static volatile sig_atomic_t call_shutdown = 0;
static volatile sig_atomic_t catch_signal = 0;

static picoev_loop* main_loop = NULL; //main loop
static heapq_t *g_timers;
static pending_queue_t *g_pendings = NULL;

// active event cnt
static int activecnt = 0;

static PyObject *wsgi_app = NULL; //wsgi app

static uint8_t watch_loop = 0;
static PyObject *watchdog = NULL; //watchdog
static char is_write_access_log = 0;

static int is_keep_alive = 0; //keep alive support
static int keep_alive_timeout = 5;

uint64_t max_content_length = 1024 * 1024 * 16; //max_content_length
int client_body_buffer_size = 1024 * 500;  //client_body_buffer_size

static char *unix_sock_name = NULL;

static int backlog = 1024 * 4; // backlog size
static int max_fd = 1024 * 4;  // picoev max_fd

// greenlet hub switch value
static PyObject *hub_switch_value;
PyObject* current_client;
PyObject* timeout_error;

/* reuse object */
static PyObject *client_key = NULL; //meinheld.client
static PyObject *wsgi_input_key = NULL; //wsgi.input key
static PyObject *status_code_key = NULL; //STATUS_CODE
static PyObject *bytes_sent_key = NULL; // SEND_BYTES
static PyObject *request_time_key = NULL; // REQUEST_TIME
static PyObject *local_time_key = NULL; // LOCAL_TIME
static PyObject *empty_string = NULL; //""

static PyObject *app_handler_func = NULL;

/* gunicorn */
static time_t watchdog_lasttime;
static int spinner = 0;
static int tempfile_fd = 0;
static int gtimeout = 0;
static int ppid = 0;

#define CLIENT_MAXFREELIST 1024

static client_t *client_free_list[CLIENT_MAXFREELIST];
static int client_numfree = 0;

static void
read_callback(picoev_loop* loop, int fd, int events, void* cb_arg);

#ifndef WITH_GREENLET
static void
write_callback(picoev_loop* loop, int fd, int events, void* cb_arg);
#endif

static void
kill_callback(picoev_loop* loop, int fd, int events, void* cb_arg);

static void
trampoline_callback(picoev_loop* loop, int fd, int events, void* cb_arg);

static PyObject*
internal_schedule_call(int seconds, PyObject *cb, PyObject *args, PyObject *kwargs, PyObject *greenlet);

static int
prepare_call_wsgi(client_t *client);

static void
call_wsgi_handler(client_t *client);

static int
check_status_code(client_t *client);

static pending_queue_t*
init_pendings(void)
{
    pending_queue_t *pendings = NULL;

    pendings = PyMem_Malloc(sizeof(pending_queue_t));
    if (pendings == NULL) {
        return NULL;
    }
    pendings->size = 0;
    pendings->max= 1024;
    pendings->q = (TimerObject**)malloc(sizeof(TimerObject*) * pendings->max);
    if (pendings->q == NULL) {
        PyMem_Free(pendings);
        return NULL;
    }
    return pendings;
}

static int
realloc_pendings(void)
{
    TimerObject **new_heap;
    uint32_t max;
    pending_queue_t *pendings = g_pendings;

    if (pendings->size >= pendings->max) {
        //realloc
        max = pendings->max * 2;
        new_heap = (TimerObject**)realloc(pendings->q, sizeof(TimerObject*) * max);
        if (new_heap == NULL) {
            PyErr_SetString(PyExc_Exception, "size over timer queue");
            return -1;
        }
        pendings->max = max;
        pendings->q = new_heap;
        RDEBUG("realloc max:%d", pendings->max);
    }
    return 1;
}

static void
destroy_pendings(void)
{
    int i = 0, len; 
    TimerObject *timer = NULL;
    TimerObject **t = g_pendings->q;
    if (g_pendings == NULL) {
        return;
    }
    len = g_pendings->size;
    t += i;

    while(len--) {
        timer = *t;
        Py_DECREF(timer);
        t++;
    }

    free(g_pendings->q);
    PyMem_Free(g_pendings);
    g_pendings = NULL;
}


static void
client_t_list_fill(void)
{
    client_t *client;
    while (client_numfree < CLIENT_MAXFREELIST) {
        client = (client_t *)PyMem_Malloc(sizeof(client_t));
        client_free_list[client_numfree++] = client;
    }
}

static void
client_t_list_clear(void)
{
    client_t *op;

    while (client_numfree) {
        op = client_free_list[--client_numfree];
        PyMem_Free(op);
    }
}

static client_t*
alloc_client_t(void)
{
    client_t *client;
    if (client_numfree) {
        client = client_free_list[--client_numfree];
        GDEBUG("use pooled %p", client);
    } else {
        client = (client_t *)PyMem_Malloc(sizeof(client_t));
        GDEBUG("alloc %p", client);
    }
    memset(client, 0, sizeof(client_t));
    return client;
}

static void
dealloc_client(client_t *client)
{
    if (client_numfree < CLIENT_MAXFREELIST) {
        client_free_list[client_numfree++] = client;
        GDEBUG("back to pool %p", client);
    } else {
        PyMem_Free(client);
    }
}


static client_t *
new_client_t(int client_fd, char *remote_addr, uint32_t remote_port)
{
    client_t *client;

    client = alloc_client_t();
    //client = PyMem_Malloc(sizeof(client_t));
    //memset(client, 0, sizeof(client_t));

    client->fd = client_fd;
    client->complete = 1;
    client->request_queue = new_request_queue();
    client->remote_addr = remote_addr;
    client->remote_port = remote_port;
    /* client->body_type = BODY_TYPE_NONE; */
    GDEBUG("client alloc %p", client);
    return client;
}

static void
set_log_value(client_t *client, PyObject *environ, uintptr_t delta_msec)
{
    PyObject *status_code = NULL, *bytes = NULL, *request_time = NULL, *local_time = NULL;
    
    status_code = PyLong_FromLong(client->status_code);
    bytes = PyLong_FromLong(client->write_bytes);
    request_time = PyLong_FromLong(delta_msec);
    local_time = NATIVE_FROMSTRING((char*)http_log_time);
    
    if (status_code) {
        PyDict_SetItem(environ, status_code_key, status_code);
        Py_DECREF(status_code);
    }
    
    if (bytes) {
        PyDict_SetItem(environ, bytes_sent_key, bytes);
        Py_DECREF(bytes);
    }
    
    if (request_time) {
        PyDict_SetItem(environ, request_time_key, request_time);
        Py_DECREF(request_time);
    }

    if (local_time) {
        PyDict_SetItem(environ, local_time_key, local_time);
        Py_DECREF(local_time);
    }

}

static void
clean_client(client_t *client)
{
    PyObject *environ = NULL;
    uintptr_t end, delta_msec = 0;

    request *req = client->current_req;
   
    if (is_write_access_log) {
        DEBUG("write access log");
        cache_time_update();
        if (req) {
            environ = req->environ;
            end = current_msec;
            if (req->start_msec > 0){
                delta_msec = end - req->start_msec;
            }
            set_log_value(client, environ, delta_msec);
            call_access_logger(environ);
        } else {
            if (client->status_code != 408) {
                environ = new_environ(client);
                set_log_value(client, environ, delta_msec);
                call_access_logger(environ);
            }
        }
    }

    Py_CLEAR(client->http_status);
    Py_CLEAR(client->headers);
    Py_CLEAR(client->response_iter);
    Py_CLEAR(client->response);

    if (req == NULL) {
        goto init;
    }

    DEBUG("status_code:%d env:%p", client->status_code, req->environ);
    if (req->environ) { 
        /* PyDict_Clear(client->environ); */
        /* DEBUG("CLEAR environ"); */
        Py_CLEAR(req->environ);
    }
    if (req->body) {
        if (req->body_type == BODY_TYPE_TMPFILE) {
            fclose(req->body);
        } else {
            free_buffer(req->body);
        }
        req->body = NULL;
    }
    free_request(req);

init:
    client->current_req = NULL;
    client->header_done = 0;
    client->response_closed = 0;
    client->chunked_response = 0;
    client->content_length_set = 0;
    client->content_length = 0;
    client->write_bytes = 0;
}

static void
close_client(client_t *client)
{
    client_t *new_client = NULL;
    int ret;

    if (!client->response_closed) {
        close_response(client);
    }
    DEBUG("start close client:%p fd:%d status_code %d", client, client->fd, client->status_code);

    if (picoev_is_active(main_loop, client->fd)) {
        if (!picoev_del(main_loop, client->fd)) {
            activecnt--;
            DEBUG("activecnt:%d", activecnt);
        }
        DEBUG("picoev_del client:%p fd:%d", client, client->fd);
    }

    clean_client(client);

    DEBUG("remain http pipeline size :%d", client->request_queue->size);
    if (client->request_queue->size > 0) {
        if (check_status_code(client) > 0) {
            //process pipeline
            if (prepare_call_wsgi(client) > 0) {
                call_wsgi_handler(client);
            }
        }
        return ;
    }

    if (client->http_parser != NULL) {
        /* PyMem_Free(client->http_parser); */
        dealloc_parser(client->http_parser);
    }

    free_request_queue(client->request_queue);
    if (!client->keep_alive) {
        close(client->fd);
        BDEBUG("close client:%p fd:%d", client, client->fd);
    } else {
        BDEBUG("keep alive client:%p fd:%d", client, client->fd);
        new_client = new_client_t(client->fd, client->remote_addr, client->remote_port);
        new_client->keep_alive = 1;
        init_parser(new_client, server_name, server_port);
        ret = picoev_add(main_loop, new_client->fd, PICOEV_READ, keep_alive_timeout, read_callback, (void *)new_client);
        if (ret == 0) {
            activecnt++;
        }
    }
    //clear old client
    dealloc_client(client);
}

static void init_main_loop(void)
{
    if (main_loop == NULL) {
        /* init picoev */
        picoev_init(max_fd);
        /* create loop */
        main_loop = picoev_create_loop(60);
    }
}

static void
kill_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{

    picoev_del(loop, fd);
    if ((events & PICOEV_TIMEOUT) != 0) {
        DEBUG("force shutdown...");
        loop_done = 0;
    }
}

static inline void
kill_server(int timeout)
{

    int listen_sock = 0;
    PyObject *iter = NULL, *item;
    int set_callback = 0;

    if (main_loop == NULL) {
        return;
    }

    iter = PyObject_GetIter(listen_socks);
    if (PyErr_Occurred()){
        call_error_logger();
        return;
    }

    while((item =  PyIter_Next(iter))){
#ifdef PY3
        if (PyLong_Check(item)) {
            listen_sock = (int)PyLong_AsLong(item);
#else
        if (PyInt_Check(item)) {
            listen_sock = (int)PyInt_AsLong(item);
#endif

            //stop accepting
            if (!picoev_del(main_loop, listen_sock)) {
                activecnt--;
                DEBUG("activecnt:%d", activecnt);
            }

            if (!set_callback) {
                //shutdown timeout
                if (timeout > 0) {
                    //set timeout
                    (void)picoev_add(main_loop, listen_sock, PICOEV_TIMEOUT, timeout, kill_callback, NULL);
                } else {
                    (void)picoev_add(main_loop, listen_sock, PICOEV_TIMEOUT, 1, kill_callback, NULL);
                }
                set_callback = 1;
            }

        } else {
            //TODO WARN???   
        }
        Py_DECREF(item);
    }
    Py_DECREF(iter);

}

static inline void
set_current_request(client_t *client)
{
    request *req;
    req = shift_request(client->request_queue);
    client->current_req = req;
}

static void
set_bad_request_code(client_t *client, int status_code)
{
    request *req;
    req = client->request_queue->tail;
    req->bad_request_code = status_code;
    DEBUG("set bad request code %d", status_code);
}

static int
check_status_code(client_t *client)
{
    request *req;
    req = client->request_queue->head;
    if (req && req->bad_request_code > 200) {
        //error
        //shift
        DEBUG("bad status code %d", req->bad_request_code);
        set_current_request(client);
        client->status_code = req->bad_request_code;
        send_error_page(client);
        close_client(client);
        return -1;
    }
    return 1;
}

static PyObject *
app_handler(PyObject *self, PyObject *args)
{
    int ret, active;
    PyObject *wsgi_args = NULL, *start = NULL, *current = NULL, *parent = NULL, *res = NULL;
    PyObject *env = NULL;
    ClientObject *pyclient;
    client_t *client;
    request *req;
    response_status status;

    if (!PyArg_ParseTuple(args, "O:app_handler", &env)) {
        return NULL;
    }
    pyclient = (ClientObject*)PyDict_GetItem(env, client_key);
    client = pyclient->client;

    req = client->current_req;
    start = create_start_response(client);

    if (!start) {
        return NULL;
    }

    DEBUG("call wsgi app");
    wsgi_args = PyTuple_Pack(2, env, start);
    res = PyObject_CallObject(wsgi_app, wsgi_args);
    Py_DECREF(wsgi_args);
    DEBUG("called wsgi app");

    //check response & PyErr_Occurred
    if (res && res == Py_None) {
        PyErr_SetString(PyExc_Exception, "response must be a iter or sequence object");
        goto error;
    }
    //Check wsgi_app error
    if (PyErr_Occurred()) {
        goto error;
    }

    client->response = res;

    if (client->response_closed) {
        //closed
        close_client(client);
        Py_RETURN_NONE;
    }
    status = response_start(client);

#ifdef WITH_GREENLET
    while(status != STATUS_OK) {
        if (status == STATUS_ERROR) {
            // Internal Server Error
            req->bad_request_code = 500;
            goto error;
        } else {
            active = picoev_is_active(main_loop, client->fd);
            ret = picoev_add(main_loop, client->fd, PICOEV_WRITE, 300, trampoline_callback, (void *)pyclient);
            if ((ret == 0 && !active)) {
                activecnt++;
            }

            // switch to hub
            current = pyclient->greenlet;
            parent = greenlet_getparent(current);
        
            /* Py_INCREF(hub_switch_value); */
            res = greenlet_switch(parent, hub_switch_value, NULL);
            Py_XDECREF(res);

            // try again after event switch
            status = process_body(client);
        }
    }
    status = close_response(client);
    if (status == STATUS_ERROR) {
        //TODO logging error
    }
    // send OK
    close_client(client);
#else
    switch(status) {
        case STATUS_ERROR:
            // Internal Server Error
            req->bad_request_code = 500;
            goto error;
        case STATUS_SUSPEND:
            // continue
            // set callback
            active = picoev_is_active(main_loop, client->fd);
            ret = picoev_add(main_loop, client->fd, PICOEV_WRITE, 300, write_callback, (void *)pyclient);
            if ((ret == 0 && !active)) {
                activecnt++;
            }
        default:
            // send OK
            close_client(client);
    }
#endif
    Py_RETURN_NONE;

error:
    client->status_code = 500;
    status = close_response(client);
    if (status == STATUS_ERROR) {
        //TODO logging error
    }
    /* write_error_log(__FILE__, __LINE__); */
    call_error_logger();
    send_error_page(client);
    close_client(client);
    Py_RETURN_NONE;
}

static PyMethodDef app_handler_def = {"_app_handler",   (PyCFunction)app_handler, METH_VARARGS, 0};

static PyObject*
get_app_handler(void)
{
    if (app_handler_func == NULL) {
        app_handler_func = PyCFunction_NewEx(&app_handler_def, (PyObject *)NULL, NULL);
    }
    //Py_INCREF(app_handler_func);
    return app_handler_func;
}

#ifdef WITH_GREENLET
static void
resume_greenlet(PyObject *greenlet)
{
    PyObject *res = NULL;
    PyObject *err_type, *err_val, *err_tb;

    if (PyErr_Occurred()) {
        PyErr_Fetch(&err_type, &err_val, &err_tb);
        PyErr_Clear();
        //set error
        res = greenlet_throw(greenlet, err_type, err_val, err_tb);
    } else {
        /* Py_INCREF(hub_switch_value); */
        res = greenlet_switch(greenlet, hub_switch_value, NULL);
        if (res == NULL) {
            call_error_logger();
            /* PyErr_Fetch(&err_type, &err_val, &err_tb); */
            /* PyErr_Clear(); */
            //set error
            /* res = greenlet_throw(greenlet, err_type, err_val, err_tb); */
        }
    }
    Py_XDECREF(res);
    if (greenlet_dead(greenlet)) {
        Py_DECREF(greenlet);
    }
}

static void
resume_wsgi_handler(ClientObject *pyclient)
{
    PyObject *res = NULL;
    PyObject *err_type, *err_val, *err_tb;
    client_t *old_client;
    client_t *client = pyclient->client;

    //swap bind client_t

    old_client = start_response->cli;
    start_response->cli = client;

    current_client = (PyObject *)pyclient;
    if (PyErr_Occurred()) {
        PyErr_Fetch(&err_type, &err_val, &err_tb);
        PyErr_Clear();
        //set error
        res = greenlet_throw(pyclient->greenlet, err_type, err_val, err_tb);
    } else {
        res = greenlet_switch(pyclient->greenlet, pyclient->args, pyclient->kwargs);
    }
    start_response->cli = old_client;

    Py_CLEAR(pyclient->args);
    Py_CLEAR(pyclient->kwargs);
    Py_XDECREF(res);
}
#endif

static void
call_wsgi_handler(client_t *client)
{
    PyObject *handler, *greenlet, *args, *res;
    ClientObject *pyclient;
    request *req = NULL;

    handler = get_app_handler();
    req = client->current_req;
    current_client = PyDict_GetItem(req->environ, client_key);
    pyclient = (ClientObject *)current_client;

    args = PyTuple_Pack(1, req->environ);
#ifdef WITH_GREENLET
    //new greenlet
    greenlet = greenlet_new(handler, NULL);
    // set_greenlet
    pyclient->greenlet = greenlet;
    Py_INCREF(pyclient->greenlet);

    res = greenlet_switch(greenlet, args, NULL);
    //res = PyObject_CallObject(wsgi_app, args);
    Py_DECREF(args);
    Py_DECREF(greenlet);
#else
    pyclient->greenlet = NULL;
    res = PyObject_CallObject(handler, args);
    Py_DECREF(args);
#endif
    Py_XDECREF(res);
}


#ifdef WITH_GREENLET
static void
timeout_error_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    ClientObject *pyclient = (ClientObject *)(cb_arg);
    client_t *client = pyclient->client;

    if ((events & PICOEV_TIMEOUT) != 0) {
        DEBUG("timeout_error_callback pyclient:%p client:%p fd:%d", pyclient, pyclient->client, pyclient->client->fd);
        if (!picoev_del(loop, fd)) {
            activecnt--;
            DEBUG("activecnt:%d", activecnt);
        }
        pyclient->suspended = 0;
        /* pyclient->resumed = 1; */
        PyErr_SetString(timeout_error, "timeout");
        set_so_keepalive(client->fd, 0);
        resume_wsgi_handler(pyclient);
    }
}

static void
timeout_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    ClientObject *pyclient = (ClientObject *)(cb_arg);
    client_t *client = pyclient->client;

    if ((events & PICOEV_TIMEOUT) != 0) {
        DEBUG("timeout_callback pyclient:%p client:%p fd:%d", pyclient, pyclient->client, pyclient->client->fd);
        //next intval 30sec
        picoev_set_timeout(loop, client->fd, 30);
        // is_active ??
        if (write(client->fd, "", 0) < 0) {
            if (!picoev_del(loop, fd)) {
                activecnt--;
                DEBUG("activecnt:%d", activecnt);
            }
            //resume
            pyclient->suspended = 0;
            /* pyclient->resumed = 1; */
            PyErr_SetFromErrno(PyExc_IOError);
            DEBUG("closed");
            set_so_keepalive(client->fd, 0);
            resume_wsgi_handler(pyclient);
        }
    }
}


static void
trampoline_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    PyObject *o = NULL;
    ClientObject *pyclient = NULL;
    client_t *client = NULL;


    if (!picoev_del(loop, fd)) {
        activecnt--;
        DEBUG("activecnt:%d", activecnt);
    }
    
    YDEBUG("call trampoline_callback fd:%d event:%d cb_arg:%p", fd, events, cb_arg);
    o = (PyObject*)cb_arg;

    if (CheckClientObject(o)) {
        pyclient = (ClientObject*)cb_arg;
        client = pyclient->client;
        
        if ((events & PICOEV_TIMEOUT) != 0) {

            RDEBUG("** trampoline_callback timeout **");
            //timeout
            client->keep_alive = 0;
            PyErr_SetString(timeout_error, "timeout");
        }
        YDEBUG("resume_wsgi_handler");
        resume_wsgi_handler(pyclient);
    } else if (greenlet_check(o)) {
        YDEBUG("resume_greenlet");
        resume_greenlet(o);
    }
}
#endif

#ifndef WITH_GREENLET
static void
write_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    ClientObject *pyclient = (ClientObject*)cb_arg;
    client_t *client = pyclient->client;
    int ret;
    DEBUG("call write_callback");
    current_client = (PyObject*)pyclient;
    if ((events & PICOEV_TIMEOUT) != 0) {

        DEBUG("** write_callback timeout **");

        //timeout
        client->keep_alive = 0;
        close_client(client);

    } else if ((events & PICOEV_WRITE) != 0) {
        ret = process_body(client);
        DEBUG("process_body ret %d", ret);
        if (ret != 0) {
            //ok or die
            close_client(client);
        }
    }
}
#endif

static int
check_http_expect(client_t *client)
{
    PyObject *c = NULL;
    char *val = NULL;
    int ret;
    request *req = client->current_req;

    if (client->http_parser->http_minor == 1) {
        ///TODO CHECK
        c = PyDict_GetItemString(req->environ, "HTTP_EXPECT");
        if (c) {
            val = PyBytes_AS_STRING(c);
            if (!strncasecmp(val, "100-continue", 12)) {
                ret = write(client->fd, "HTTP/1.1 100 Continue\r\n\r\n", 25);
                if (ret < 0) {
                    //fail
                    PyErr_SetFromErrno(PyExc_IOError);
                    /* write_error_log(__FILE__, __LINE__);  */
                    call_error_logger();
                    client->keep_alive = 0;
                    client->status_code = 500;
                    send_error_page(client);
                    close_client(client);
                    return -1;
                }
            } else {
                //417
                client->keep_alive = 0;
                client->status_code = 417;
                send_error_page(client);
                close_client(client);
                return -1;
            }
        }
        return 1;
    }
    return 0;
}

#ifdef PY3
static int
set_input_file(client_t *client)
{
    PyObject *input;
    int fd;
    request *req = client->current_req;

    FILE *tmp = (FILE *)req->body;
    fflush(tmp);
    rewind(tmp);
    
    fd = fileno(tmp);
    input = PyFile_FromFd(fd, "<tmpfile>", "rb", -1, NULL, NULL, NULL, 1);
    if (input == NULL) {
        fclose(tmp);
        req->body = NULL;
        return -1;
    }
    //env["wsgi.input"] = tmpfile
    //
    PyDict_SetItem((PyObject *)req->environ, wsgi_input_key, input);
    Py_DECREF(input);
    req->body = NULL;
    return 1;
}

#else
static int
set_input_file(client_t *client)
{
    PyObject *input;
    request *req = client->current_req;
    FILE *tmp = (FILE *)req->body;
    fflush(tmp);
    rewind(tmp);
    input = PyFile_FromFile(tmp, "<tmpfile>", "r", fclose);
    if (input == NULL) {
        fclose(tmp);
        req->body = NULL;
        return -1;
    }
    //env["wsgi.input"] = tmpfile
    //
    PyDict_SetItem((PyObject *)req->environ, wsgi_input_key, input);
    Py_DECREF(input);
    req->body = NULL;
    return 0;
}
#endif

static int
set_input_object(client_t *client)
{
    PyObject *input = NULL;
    request *req = client->current_req;

    if (req->body_type == BODY_TYPE_BUFFER) {
        input = InputObject_New((buffer_t*)req->body);
    } else {
        if (req->body) {
            input = InputObject_New((buffer_t*)req->body);
        } else {
            input = InputObject_New(new_buffer(0, 0));
        }
    }
    if (input == NULL) {
        return -1;
    }
    PyDict_SetItem((PyObject *)req->environ, wsgi_input_key, input);
    Py_DECREF(input);
    req->body = NULL;
    return 1;
}

/*
static void 
setting_keepalive(client_t *client)
{
    PyObject *c;
    char *val;
    if (is_keep_alive) {
        //support keep-alive
        c = PyDict_GetItemString(client->environ, "HTTP_CONNECTION");
        if (client->http_parser->http_minor == 1) {
            //HTTP 1.1
            if (c) {
                val = PyBytes_AS_STRING(c);
                if (!strcasecmp(val, "close")) {
                    client->keep_alive = 0;
                } else {
                    client->keep_alive = 1;
                }
            } else {
                client->keep_alive = 1;
            }
        } else {
            //HTTP 1.0
            if (c) {
                val = PyBytes_AS_STRING(c);
                if (!strcasecmp(val, "keep-alive")) {
                    client->keep_alive = 1;
                } else {
                    client->keep_alive = 0;
                }
            } else {
                client->keep_alive = 0;
            }
        }
    }
}
*/

static int
prepare_call_wsgi(client_t *client)
{
    request *req = NULL;

    set_current_request(client);
    
    req = client->current_req;

    //check Expect
    if (check_http_expect(client) < 0) {
        return -1;
    }

    if (req->body_type == BODY_TYPE_TMPFILE) {
        if (set_input_file(client) == -1) {
            return -1;
        }
    } else {
        if (set_input_object(client) == -1) {
            return -1;
        }
    }
    
    if (!is_keep_alive) {
        client->keep_alive = 0;
    }
    /* setting_keepalive(client); */
    return 1;
}

static int
set_read_error(client_t *client, int status_code)
{
    client->keep_alive = 0;
    if (status_code == 0) {
        // bad request
        status_code = 400;
    }
    if (client->request_queue->size > 0) {
        //piplining
        set_bad_request_code(client, status_code);
        //finish = 1
        return 1;
    } else {
        if (!client->complete) {
            // read error while reading request.
            client->status_code = status_code;
            send_error_page(client);
        } // else keepalive timeout. should not send any data.
        close_client(client);
        return -1;
    }
}

static int
read_timeout(int fd, client_t *client)
{
    RDEBUG("** read timeout fd:%d", fd);
    //timeout
    return set_read_error(client, 408);
}

static int
compare_key(PyObject *env, char *key, char *compare)
{
    int ret = -1;
    char *val = NULL;

    PyObject *c = PyDict_GetItemString(env, key);
    if (c) {
#ifdef PY3
        c = PyUnicode_AsLatin1String(c);
        val = PyBytes_AS_STRING(c);
#else
        val = PyBytes_AS_STRING(c);
        Py_INCREF(c);
#endif
        ret = strcasecmp(val, compare);
    }
    Py_XDECREF(c);
    return ret;
}


static int
check_websocket(PyObject *env)
{
    //Support only RFC6455
    if (PyMapping_HasKeyString(env, "HTTP_SEC_WEBSOCKET_KEY") == 1) {
        if (compare_key(env, "HTTP_SEC_WEBSOCKET_VERSION", "13") == 0) {
            return 1;
        }
    }

    return -1;
}


static int
parse_new_protocol(request *req, char *buf, ssize_t readed, int nread)
{
    PyObject *env, *c;
    char *val = NULL;

    env = req->environ;
    c = PyDict_GetItemString(env, "HTTP_UPGRADE");
    if (c) {
#ifdef PY3
        c = PyUnicode_AsLatin1String(c);
        val = PyBytes_AS_STRING(c);
#else
        val = PyBytes_AS_STRING(c);
        Py_INCREF(c);
#endif
        DEBUG("Upgrade protocol %s", val);
        if (!strcasecmp(val, "websocket")) {
            Py_DECREF(c);
            //Support only RFC6455
            return check_websocket(env);
        }
    }
    // protocol not found error
    PyErr_SetString(PyExc_IOError,"unknow protocol");
    return -1;
}

static int
parse_http_request(int fd, client_t *client, char *buf, ssize_t r)
{
    int nread = 0;
    request *req = NULL;

    BDEBUG("fd:%d \n%.*s", fd, (int)r, buf);
    nread = execute_parse(client, buf, r);
    BDEBUG("read request fd %d readed %d nread %d", fd, (int)r, nread);

    req = client->current_req;

    if (client->upgrade) {
        //TODO  New protocol
        if (parse_new_protocol(req, buf, r, nread) == -1) {
            return set_read_error(client, req->bad_request_code);
        }
    } else if (req == NULL) {
        DEBUG("fd %d bad_request code 400", fd);
        return set_read_error(client, 400);
    } else if (nread != r || req->bad_request_code > 0) {
        DEBUG("fd %d bad_request code %d", fd,  req->bad_request_code);
        return set_read_error(client, req->bad_request_code);
    }

    if (parser_finish(client) > 0) {
        return 1;
    }
    return 0;
}

static int
read_request(picoev_loop *loop, int fd, client_t *client, char call_time_update)
{
    char buf[READ_BUF_SIZE];
    ssize_t r;

    if (!client->keep_alive) {
        picoev_set_timeout(loop, fd, READ_TIMEOUT_SECS);
    }

    Py_BEGIN_ALLOW_THREADS
    r = read(client->fd, buf, sizeof(buf));
    Py_END_ALLOW_THREADS
    switch (r) {
        case 0: 
            return set_read_error(client, 503);
        case -1:
            // Error
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // try again later
                return 0;
            } else {
                // Fatal error
                client->keep_alive = 0;
                if (errno == ECONNRESET) {
                    client->header_done = 1;
                    client->response_closed = 1;
                } else {
                    PyErr_SetFromErrno(PyExc_IOError);
                    /* write_error_log(__FILE__, __LINE__);  */
                    call_error_logger();
                }
                return set_read_error(client, 500);
            }
        default:
            if (call_time_update) {
                cache_time_update();
            }
            return parse_http_request(fd, client, buf, r);

    }
}

static void
read_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    client_t *client = ( client_t *)(cb_arg);
    int finish = 0;

    if ((events & PICOEV_TIMEOUT) != 0) {
        finish = read_timeout(fd, client);

    } else if ((events & PICOEV_READ) != 0) {
        finish = read_request(loop, fd, client, 0);
    }
    if (finish == 1) {
        if (!picoev_del(main_loop, client->fd)) {
            activecnt--;
            DEBUG("activecnt:%d", activecnt);
        }
        if (check_status_code(client) > 0) {
            //current request ok
            if (prepare_call_wsgi(client) > 0) {
                call_wsgi_handler(client);
            }
        }
        return;
    }
}

static void
accept_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    int client_fd, ret;
    client_t *client;
    struct sockaddr_in client_addr;
    char *remote_addr;
    uint32_t remote_port;
    int finish = 0;
    if ((events & PICOEV_TIMEOUT) != 0) {
        // time out
        // next turn or other process
        return;
    } else if ((events & PICOEV_READ) != 0) {
        int i;
        socklen_t client_len = sizeof(client_addr);
        for (i=0; i<8; ++i) {
#if linux
            client_fd = accept4(fd, (struct sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
            client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
#endif

            if (client_fd != -1) {
                DEBUG("accept fd %d", client_fd);
                //printf("connected: %d\n", client_fd);
                if (setup_sock(client_fd) == -1) {
                    PyErr_SetFromErrno(PyExc_IOError);
                    /* write_error_log(__FILE__, __LINE__); */
                    call_error_logger();
                    // die
                    loop_done = 0;
                    return;
                }
                remote_addr = inet_ntoa (client_addr.sin_addr);
                remote_port = ntohs(client_addr.sin_port);
                client = new_client_t(client_fd, remote_addr, remote_port);
                init_parser(client, server_name, server_port);

                finish = read_request(loop, fd, client, 1);
                if (finish == 1) {
                    if (check_status_code(client) > 0) {
                        //current request ok
                        if (prepare_call_wsgi(client) > 0) {
                            call_wsgi_handler(client);
                        }
                    }
                } else if (finish == 0) {
                    ret = picoev_add(loop, client_fd, PICOEV_READ, keep_alive_timeout, read_callback, (void *)client);
                    if (ret == 0) {
                        activecnt++;
                    }
                }
            } else {
                if (errno != EAGAIN && errno != EWOULDBLOCK) {
                    PyErr_SetFromErrno(PyExc_IOError);
                    /* write_error_log(__FILE__, __LINE__); */
                    call_error_logger();
                    // die
                    kill_server(0);
                }
                break;
            }
        }
    }
}

static void
setup_server_env(void)
{
    /* setup_listen_sock(listen_sock); */
    cache_time_init();
    setup_static_env(server_name, server_port);
    setup_start_response();
    
    ClientObject_list_fill();
    client_t_list_fill();
    parser_list_fill();
    request_list_fill();
    buffer_list_fill();
    InputObject_list_fill();
    
    client_key = NATIVE_FROMSTRING("meinheld.client");
    wsgi_input_key = NATIVE_FROMSTRING("wsgi.input");
    status_code_key = NATIVE_FROMSTRING("STATUS_CODE");
    bytes_sent_key = NATIVE_FROMSTRING("SEND_BYTES");
    request_time_key = NATIVE_FROMSTRING("REQUEST_TIME");
    local_time_key = NATIVE_FROMSTRING("LOCAL_TIME");
    empty_string = NATIVE_FROMSTRING("");
}

static void
clear_server_env(void)
{
    //clean
    clear_start_response();
    clear_static_env();
    client_t_list_clear();
    parser_list_clear();
    
    ClientObject_list_clear();
    request_list_clear();
    buffer_list_clear();
    InputObject_list_clear();

    Py_DECREF(client_key);
    Py_DECREF(wsgi_input_key);
    Py_DECREF(status_code_key);
    Py_DECREF(bytes_sent_key);
    Py_DECREF(request_time_key);
    Py_DECREF(local_time_key);
    Py_DECREF(empty_string);
}


static int 
inet_listen(void)
{
    struct addrinfo hints, *servinfo, *p;
    int flag = 1;
    int res;
    char strport[7];
    int listen_sock = 0;
    PyObject *fd = NULL;
    
    memset(&hints, 0, sizeof hints);
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE; 
    
    snprintf(strport, sizeof (strport), "%d", server_port);
    
    if ((res = getaddrinfo(server_name, strport, &hints, &servinfo)) == -1) {
        PyErr_SetFromErrno(PyExc_IOError);
        return -1;
    }

    // loop through all the results and bind to the first we can
    for(p = servinfo; p != NULL; p = p->ai_next) {
        if ((listen_sock = socket(p->ai_family, p->ai_socktype,
                p->ai_protocol)) == -1) {
            //perror("server: socket");
            continue;
        }

        if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &flag,
                sizeof(int)) == -1) {
            close(listen_sock);
            PyErr_SetFromErrno(PyExc_IOError);
            return -1;
        }

        Py_BEGIN_ALLOW_THREADS
        res = bind(listen_sock, p->ai_addr, p->ai_addrlen);
        Py_END_ALLOW_THREADS
        if (res == -1) {
            close(listen_sock);
            PyErr_SetFromErrno(PyExc_IOError);
            return -1;
        }

        break;
    }

    if (p == NULL)  {
        close(listen_sock);
        PyErr_SetString(PyExc_IOError,"server: failed to bind\n");
        return -1;
    }

    freeaddrinfo(servinfo); // all done with this structure
    
    // BACKLOG 
    Py_BEGIN_ALLOW_THREADS
    res = listen(listen_sock, backlog);
    Py_END_ALLOW_THREADS
    if (res == -1) {
        close(listen_sock);
        PyErr_SetFromErrno(PyExc_IOError);
        return -1;
    }

#ifdef PY3
    fd =  PyLong_FromLong((long) listen_sock);
#else
    fd =  PyInt_FromLong((long) listen_sock);
#endif 
    listen_socks = PyList_New(0);
    if (PyList_Append(listen_socks, fd) == -1) {
        return -1;
    }
    Py_DECREF(fd);
    return 1;
}

static int
check_unix_sockpath(char *sock_name)
{

    if (!access(sock_name, F_OK)) {
        if (unlink(sock_name) < 0) {
            PyErr_SetFromErrno(PyExc_IOError);
            return -1;
        }
    }
    return 1;
}

static int
unix_listen(char *sock_name, int len)
{
    int listen_sock = 0;
    int flag = 1;
    int res;
    struct sockaddr_un saddr;
    mode_t old_umask;
    PyObject *fd;

    DEBUG("unix domain socket %s", sock_name);

    if (len >= sizeof(saddr.sun_path)) {
        PyErr_SetString(PyExc_OSError, "AF_UNIX path too long");
        return -1;
    }

    memset(&saddr, 0, sizeof(saddr));
    if (check_unix_sockpath(sock_name) == -1) {
        return -1;
    }

    if ((listen_sock = socket(AF_UNIX, SOCK_STREAM,0)) == -1) {
        PyErr_SetFromErrno(PyExc_IOError);
        return -1;
    }

    if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &flag,
            sizeof(int)) == -1) {
        close(listen_sock);
        PyErr_SetFromErrno(PyExc_IOError);
        return -1;
    }

    saddr.sun_family = PF_UNIX;
    strncpy(saddr.sun_path, sock_name, len);

    old_umask = umask(0);

    Py_BEGIN_ALLOW_THREADS
    res = bind(listen_sock, (struct sockaddr *)&saddr, sizeof(saddr));
    Py_END_ALLOW_THREADS
    if (res == -1) {
        close(listen_sock);
        PyErr_SetFromErrno(PyExc_IOError);
        return -1;
    }
    umask(old_umask);

    // BACKLOG 1024
    Py_BEGIN_ALLOW_THREADS
    res = listen(listen_sock, backlog);
    Py_END_ALLOW_THREADS
    if (res == -1) {
        close(listen_sock);
        PyErr_SetFromErrno(PyExc_IOError);
        return -1;
    }
    unix_sock_name = sock_name;
#ifdef PY3
    fd =  PyLong_FromLong((long) listen_sock);
#else
    fd =  PyInt_FromLong((long) listen_sock);
#endif 
    listen_socks = PyList_New(0);
    if (PyList_Append(listen_socks, fd) == -1) {
        return -1;
    }
    Py_DECREF(fd);
    return 1;
}

static void
fast_notify(void)
{
    spinner = (spinner + 1) % 2;
    fchmod(tempfile_fd, spinner);
    if (ppid != getppid()) {
        kill_server(gtimeout);
        tempfile_fd = 0;
    }
}
static PyObject* 
set_listen_socket(PyObject *temp)
{
    if (listen_socks != NULL) {
        PyErr_SetString(PyExc_Exception, "already set listen socket");
        return NULL;
    }
#ifdef PY3
    if (PyLong_Check(temp)) {
#else
    if (PyInt_Check(temp)) {
#endif
       listen_socks = PyList_New(0);
       if (PyList_Append(listen_socks, temp) == -1) {
            return NULL;
       }
        Py_DECREF(temp);
    } else if (PyList_Check(temp)) {
        listen_socks = temp;
        Py_INCREF(listen_socks);
    } else {
        PyErr_SetString(PyExc_TypeError, "must be list or int");
        return NULL;
    }
    Py_RETURN_NONE;
}

static PyObject *
meinheld_listen(PyObject *self, PyObject *args, PyObject *kwds)
{
    PyObject *o = NULL;
    PyObject *sock_fd = NULL;
    char *path;
    int ret, len;

    static char *kwlist[] = {"address", "socket_fd", 0};

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|OO:listen",
                                     kwlist, &o, &sock_fd)) {
        return NULL;
    }

    if (listen_socks != NULL) {
        PyErr_SetString(PyExc_Exception, "already set listen socket");
        return NULL;
    }

    if (o == NULL && sock_fd != NULL) {
        return set_listen_socket(sock_fd);
    } else if (PyTuple_Check(o)) {
        //inet
        if (!PyArg_ParseTuple(o, "si:listen", &server_name, &server_port)) {
            return NULL;
        }
        ret = inet_listen();
    } else if (PyBytes_Check(o)) {
        // unix domain
        if (!PyArg_Parse(o, "s#", &path, &len)) {
            return NULL;
        }
        ret = unix_listen(path, len);
    } else {
        PyErr_SetString(PyExc_TypeError, "args tuple or string(path)");
        return NULL;
    }
    if (ret < 0) {
        //error
        return NULL;
    }
    
    Py_RETURN_NONE;
}

static void
sigint_cb(int signum)
{
    DEBUG("call SIGINT");
    if (!catch_signal) {
        catch_signal = signum;
    }
}

static void
sigpipe_cb(int signum)
{
    DEBUG("call SIGPIPE");
}

static PyObject *
meinheld_access_log(PyObject *self, PyObject *args)
{
    PyObject *o = NULL;
    PyObject *func = NULL;

    if (!PyArg_ParseTuple(args, "O:access_logger", &o)) {
        return NULL;
    }
   
    if (o == Py_None) {
        is_write_access_log = 0;
        set_access_logger(NULL);
        Py_RETURN_NONE;
    }

    func = PyObject_GetAttrString(o, "access");
    if (func == NULL) {
        return NULL;
    }

    if (!PyCallable_Check(func)) {
        PyErr_SetString(PyExc_TypeError, "must be callable");
        return NULL;
    }
    set_access_logger(func);
    is_write_access_log = 1;
    Py_RETURN_NONE;
}

static PyObject *
meinheld_error_log(PyObject *self, PyObject *args)
{
    PyObject *o = NULL;
    PyObject *func = NULL;

    if (!PyArg_ParseTuple(args, "O:error_logger", &o)) {
        return NULL;
    }

    if (o == Py_None) {
        set_err_logger(NULL);
        Py_RETURN_NONE;
    }

    func = PyObject_GetAttrString(o, "error");
    if (func == NULL) {
        return NULL;
    }
    
    if (!PyCallable_Check(func)) {
        PyErr_SetString(PyExc_TypeError, "must be callable");
        return NULL;
    }
    set_err_logger(func);
    Py_RETURN_NONE;
}


static PyObject *
meinheld_stop(PyObject *self, PyObject *args, PyObject *kwds)
{
    int timeout = 0;

    static char *kwlist[] = {"timeout", 0};

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|i:timeout",
                                     kwlist, &timeout)) {
        return NULL;
    }
    kill_server(timeout);
    Py_RETURN_NONE;
}


static inline int
fire_pendings(void)
{
    int ret = 1;
    TimerObject *timer = NULL;
    pending_queue_t *pendings = g_pendings;

    while(pendings->size && loop_done && activecnt > 0) {
        timer =  *(pendings->q + --pendings->size);
        DEBUG("start timer:%p activecnt:%d", timer, activecnt);
        fire_timer(timer);
        Py_DECREF(timer);
        activecnt--;

        DEBUG("fin timer:%p activecnt:%d", timer, activecnt);
        if (PyErr_Occurred()) {
            RDEBUG("pending call raise exception");
            call_error_logger();
            ret = -1;
            break;
        }
    }
    return ret;
}

static inline int
fire_timers(void)
{
    TimerObject *timer;
    int ret = 1;
    heapq_t *q = g_timers;
    time_t now = current_msec / 1000;

    while(q->size > 0 && loop_done && activecnt > 0) {

        timer = q->heap[0];
        DEBUG("seconds:%d", timer->seconds);
        DEBUG("now:%d", now);
        if (timer->seconds <= now) {
            //call
            timer = heappop(q);
            fire_timer(timer);

            Py_DECREF(timer);
            activecnt--;
            DEBUG("fin timer:%p activecnt:%d", timer, activecnt);

            if (PyErr_Occurred()) {
                RDEBUG("scheduled call raise exception");
                call_error_logger();
                ret = -1;
                break;
            }
            /* timer = q->heap[0]; */
        } else {
            break;
        }
    }
    return ret;

}

static int
listen_all_sockets(void)
{
    PyObject *iter = NULL, *item = NULL;
    int listen_sock = 0;
    int ret = 0;

    iter = PyObject_GetIter(listen_socks);
    
    if (PyErr_Occurred()){
        call_error_logger();
        return -1;
    }
    
    DEBUG("socks iter %p", iter);
    DEBUG("socks size %d", PyList_Size(listen_socks));

    while((item =  PyIter_Next(iter))){
#ifdef PY3
        if (PyLong_Check(item)) {
            listen_sock = (int)PyLong_AsLong(item);
#else
        if (PyInt_Check(item)) {
            listen_sock = (int)PyInt_AsLong(item);
#endif
            setup_listen_sock(listen_sock);
            ret = picoev_add(main_loop, listen_sock, PICOEV_READ, ACCEPT_TIMEOUT_SECS, accept_callback, NULL);
            if (ret == 0) {
                activecnt++;
            }
        }
        Py_DECREF(item);
    }
    Py_DECREF(iter);
    return 1;
}

static int
close_all_sockets(void) 
{
    PyObject *iter = NULL, *item = NULL;
    int listen_sock = 0;
    
    iter = PyObject_GetIter(listen_socks);
    
    if (PyErr_Occurred()){
        call_error_logger();
        return -1;
    }

    while((item =  PyIter_Next(iter))){
#ifdef PY3
        if (PyLong_Check(item)) {
            listen_sock = (int)PyLong_AsLong(item);
#else
        if (PyInt_Check(item)) {
            listen_sock = (int)PyInt_AsLong(item);
#endif
            close(listen_sock);
            if (unix_sock_name) {
               unlink(unix_sock_name);
               unix_sock_name = NULL;
            }
        }
        Py_DECREF(item);
    }
    Py_DECREF(iter);
    return 1;
}

static PyObject *
meinheld_run_loop(PyObject *self, PyObject *args, PyObject *kwds)
{
    PyObject *watchdog_result;
    int silent = 0;
    int interrupted = 0;

    static char *kwlist[] = {"app", "silent", 0};
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|i:run",
                                     kwlist, &wsgi_app, &silent)) {
        return NULL;
    }

    if (listen_socks == NULL) {
        PyErr_Format(PyExc_TypeError, "not found listen socket");
        return NULL;

    }

    Py_INCREF(wsgi_app);
    setup_server_env();

    init_main_loop();
    loop_done = 1;

    PyOS_setsig(SIGPIPE, sigpipe_cb);
    PyOS_setsig(SIGINT, sigint_cb);
    PyOS_setsig(SIGTERM, sigint_cb);


    if (listen_all_sockets() < 0) {
        //FATAL Error
        return NULL;
    }

    /* loop */
    while (likely(loop_done == 1 && activecnt > 0)) {
        /* DEBUG("before activecnt:%d", activecnt); */
        fire_pendings();
        fire_timers();
        picoev_loop_once(main_loop, 10);
        if (unlikely(catch_signal != 0)) {
            if (catch_signal == SIGINT) {
                interrupted = 1;
            }
            catch_signal = 0;
            kill_server(0);
        }
        if (watch_loop && watchdog_lasttime != main_loop->now) {
            watchdog_lasttime = main_loop->now;
            if (tempfile_fd) {
                fast_notify();
            } else if (watchdog) {
                watchdog_result = PyObject_CallFunction(watchdog, NULL);
                if (PyErr_Occurred()) {
                    PyErr_Print();
                    PyErr_Clear();
                }
                Py_XDECREF(watchdog_result);
            }
        }
        /* DEBUG("after activecnt:%d", activecnt); */
        /* DEBUG("pendings->size:%d", g_pendings->size); */
    }

    Py_DECREF(wsgi_app);
    Py_CLEAR(watchdog);
    
    current_client = NULL;
    picoev_destroy_loop(main_loop);
    picoev_deinit();
    main_loop = NULL;

    clear_server_env();

    if (close_all_sockets() < 0) {
        Py_CLEAR(listen_socks);
        return NULL;
    }
    Py_CLEAR(listen_socks);


    if (!silent && interrupted) {
        //override
        PyErr_Clear();
        PyErr_SetNone(PyExc_KeyboardInterrupt);
        return NULL;
    }
    Py_RETURN_NONE;
}


PyObject *
meinheld_set_keepalive(PyObject *self, PyObject *args)
{
    int on;
    if (!PyArg_ParseTuple(args, "i", &on))
        return NULL;
    if (on < 0) {
        PyErr_SetString(PyExc_ValueError, "keep alive value out of range ");
        return NULL;
    }
    is_keep_alive = on;
    if (is_keep_alive) {
        keep_alive_timeout = on;
    } else {
        keep_alive_timeout = 2;
    }
    Py_RETURN_NONE;
}

PyObject *
meinheld_get_keepalive(PyObject *self, PyObject *args)
{
    return Py_BuildValue("i", is_keep_alive);
}

PyObject *
meinheld_set_backlog(PyObject *self, PyObject *args)
{
    int temp;
    if (!PyArg_ParseTuple(args, "i", &temp))
        return NULL;
    if (temp <= 0) {
        PyErr_SetString(PyExc_ValueError, "backlog value out of range ");
        return NULL;
    }
    backlog = temp;
    Py_RETURN_NONE;
}

PyObject *
meinheld_get_backlog(PyObject *self, PyObject *args)
{
    return Py_BuildValue("i", backlog);
}

PyObject *
meinheld_set_picoev_max_fd(PyObject *self, PyObject *args)
{
    int temp;
    if (!PyArg_ParseTuple(args, "i", &temp))
        return NULL;
    if (temp <= 0) {
        PyErr_SetString(PyExc_ValueError, "max_fd value out of range ");
        return NULL;
    }
    max_fd = temp;
    Py_RETURN_NONE;
}

PyObject *
meinheld_get_picoev_max_fd(PyObject *self, PyObject *args)
{
    return Py_BuildValue("i", max_fd);
}

PyObject *
meinheld_set_max_content_length(PyObject *self, PyObject *args)
{
    int temp;
    if (!PyArg_ParseTuple(args, "i", &temp))
        return NULL;
    if (temp <= 0) {
        PyErr_SetString(PyExc_ValueError, "max_content_length value out of range ");
        return NULL;
    }
    max_content_length = temp;
    Py_RETURN_NONE;
}

PyObject *
meinheld_get_max_content_length(PyObject *self, PyObject *args)
{
    return Py_BuildValue("i", max_content_length);
}

PyObject *
meinheld_set_client_body_buffer_size(PyObject *self, PyObject *args)
{
    int temp;
    if (!PyArg_ParseTuple(args, "i", &temp))
        return NULL;
    if (temp <= 0) {
        PyErr_SetString(PyExc_ValueError, "client_body_buffer_size value out of range ");
        return NULL;
    }
    client_body_buffer_size = temp;
    Py_RETURN_NONE;
}

PyObject *
meinheld_get_client_body_buffer_size(PyObject *self, PyObject *args)
{
    return Py_BuildValue("i", client_body_buffer_size);
}

PyObject *
meinheld_set_listen_socket(PyObject *self, PyObject *args)
{
    PyObject *temp;
    if (!PyArg_ParseTuple(args, "O:listen_socket", &temp)) {
        return NULL;
    }
    return set_listen_socket(temp);
}

PyObject *
meinheld_set_fastwatchdog(PyObject *self, PyObject *args)
{
    int _fd;
    int _ppid;
    int timeout = 0;
    if (!PyArg_ParseTuple(args, "iii", &_fd, &_ppid, &timeout))
        return NULL;

    watchdog_lasttime = 0;
    tempfile_fd = _fd;
    ppid = _ppid;
    gtimeout = timeout;
    watch_loop = 1;
    Py_RETURN_NONE;
}

PyObject *
meinheld_set_watchdog(PyObject *self, PyObject *args)
{
    PyObject *temp;
    if (!PyArg_ParseTuple(args, "O:watchdog", &temp))
        return NULL;

    if (!PyCallable_Check(temp)) {
        PyErr_SetString(PyExc_TypeError, "must be callable");
        return NULL;
    }
    Py_INCREF(temp);
    Py_XDECREF(watchdog);
    watchdog_lasttime = 0;
    watchdog = temp;
    watch_loop = 1;
    Py_RETURN_NONE;
}

/*
PyObject *
meinheld_set_process_name(PyObject *self, PyObject *args)
{
#ifdef linux

    int i = 0,argc,len;
    char **argv;
    char *name;

    if (!PyArg_ParseTuple(args, "s:process name", &name)) {
        return NULL;
    }
    Py_GetArgcArgv(&argc, &argv);

    for(i = 0;i < argc; i++) {
        len = strlen(argv[i]);
        memset(argv[i], 0, len);
    }

    strcpy(*argv, name);
    prctl(15, name, 0, 0, 0);
#endif

    Py_RETURN_NONE;
}*/

PyObject *
meinheld_suspend_client(PyObject *self, PyObject *args)
{

#ifdef WITH_GREENLET
    PyObject *temp = NULL, *parent = NULL, *res = NULL;
    ClientObject *pyclient;
    client_t *client;
    int timeout = 0, ret = 0, active = 0;

    if (!PyArg_ParseTuple(args, "O|i:_suspend_client", &temp, &timeout)) {
        return NULL;
    }
    if (timeout < 0) {
        PyErr_SetString(PyExc_ValueError, "timeout value out of range ");
        return NULL;
    }

    // check client object
    if (!CheckClientObject(temp)) {
        PyErr_SetString(PyExc_TypeError, "must be a client object");
        return NULL;
    }

    pyclient = (ClientObject *)temp;
    client = pyclient->client;

    if (!pyclient->greenlet) {
        PyErr_SetString(PyExc_ValueError, "greenlet is not set");
        return NULL;
    }
    
    /*
    if (pyclient->resumed == 1) {
        //call later
        PyErr_SetString(PyExc_IOError, "not called resume");
        return NULL;
    }
    */

    if (client && !(pyclient->suspended)) {
        pyclient->suspended = 1;
        parent = greenlet_getparent(pyclient->greenlet);

        set_so_keepalive(client->fd, 1);
        BDEBUG("meinheld_suspend_client pyclient:%p client:%p fd:%d", pyclient, client, client->fd);
        BDEBUG("meinheld_suspend_client active ? %d", picoev_is_active(main_loop, client->fd));
        active = picoev_is_active(main_loop, client->fd);
        if (timeout > 0) {
            ret = picoev_add(main_loop, client->fd, PICOEV_TIMEOUT, timeout, timeout_error_callback, (void *)pyclient);
        } else {
            ret = picoev_add(main_loop, client->fd, PICOEV_TIMEOUT, 3, timeout_callback, (void *)pyclient);
        }
        if ((ret == 0 && !active)) {
            activecnt++;
        }
        /* Py_INCREF(hub_switch_value); */
        res = greenlet_switch(parent, hub_switch_value, NULL);
        return res;
    } else {
        PyErr_SetString(PyExc_IOError, "already suspended");
        return NULL;
    }
    Py_RETURN_NONE;
#else
    NO_GREENLET_ERROR;
#endif
}

PyObject *
meinheld_resume_client(PyObject *self, PyObject *args)
{
#ifdef WITH_GREENLET
    PyObject *temp, *switch_args, *switch_kwargs;
    ClientObject *pyclient;
    client_t *client;
    int ret, active;

    if (!PyArg_ParseTuple(args, "O|OO:_resume_client", &temp, &switch_args, &switch_kwargs)) {
        return NULL;
    }

    // check client object
    if (!CheckClientObject(temp)) {
        PyErr_SetString(PyExc_TypeError, "must be a client object");
        return NULL;
    }

    pyclient = (ClientObject *)temp;
    client = pyclient->client;

    if (!pyclient->greenlet) {
        PyErr_SetString(PyExc_ValueError, "greenlet is not set");
        return NULL;
    }

    if (!pyclient->suspended) {
        // not suspend
        PyErr_SetString(PyExc_IOError, "not suspended or dead ");
        return NULL;
    }

    if (pyclient->client && pyclient->suspended) {
        set_so_keepalive(pyclient->client->fd, 0);
        pyclient->args = switch_args;
        Py_XINCREF(pyclient->args);

        pyclient->kwargs = switch_kwargs;
        Py_XINCREF(pyclient->kwargs);

        pyclient->suspended = 0;
        /* pyclient->resumed = 1; */
        DEBUG("meinheld_resume_client pyclient:%p client:%p fd:%d", pyclient, pyclient->client, pyclient->client->fd);
        DEBUG("meinheld_resume_client active ? %d", picoev_is_active(main_loop, pyclient->client->fd));
        //clear event
        active = picoev_is_active(main_loop, client->fd);
        ret = picoev_add(main_loop, client->fd, PICOEV_WRITE, 0, trampoline_callback, (void *)pyclient);
        if ((ret == 0 && !active)) {
            activecnt++;
        }
    } else {
        PyErr_SetString(PyExc_IOError, "already resumed");
        return NULL;
    }
    Py_RETURN_NONE;
#else
    NO_GREENLET_ERROR;
#endif
}

PyObject *
meinheld_cancel_wait(PyObject *self, PyObject *args)
{
#ifdef WITH_GREENLET
    int fd;
    if (!PyArg_ParseTuple(args, "i:cancel_event", &fd)) {
        return NULL;
    }

    if (fd < 0) {
        PyErr_SetString(PyExc_ValueError, "fileno value out of range ");
        return NULL;
    }
    if (picoev_is_active(main_loop, fd)) {
        if (!picoev_del(main_loop, fd)) {
            activecnt--;
            DEBUG("activecnt:%d", activecnt);
        }
    }
    Py_RETURN_NONE;
#else
    NO_GREENLET_ERROR;
#endif
}


static PyObject*
meinheld_trampoline(PyObject *self, PyObject *args, PyObject *kwargs)
{
#ifdef WITH_GREENLET
    PyObject *current = NULL, *parent = NULL, *res = NULL;
    ClientObject *pyclient;
    int fd, event, timeout = 0, ret, active;
    PyObject *read = Py_None, *write = Py_None;

    static char *keywords[] = {"fileno", "read", "write", "timeout", NULL};

    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "i|OOi:trampoline", keywords, &fd, &read, &write, &timeout)) {
        return NULL;
    }

    if (fd < 0) {
        PyErr_SetString(PyExc_ValueError, "fileno value out of range ");
        return NULL;
    }

    if (timeout < 0) {
        PyErr_SetString(PyExc_ValueError, "timeout value out of range ");
        return NULL;
    }

    if (PyObject_IsTrue(read) && PyObject_IsTrue(write)) {
        event = PICOEV_READWRITE;
    } else if (PyObject_IsTrue(read)) {
        event = PICOEV_READ;
    } else if (PyObject_IsTrue(write)) {
        event = PICOEV_WRITE;
    } else {
        event = PICOEV_TIMEOUT;
        if (timeout <= 0) {
            PyErr_SetString(PyExc_ValueError, "timeout value out of range ");
            return NULL;
        }
    }
    
    /*
    if (current_client == NULL) {
        //TODO Cheange Exception class and Messages
        PyErr_SetString(PyExc_ValueError, "server not running ");
        //PyErr_SetString(PyExc_ValueError, " WSGI Handler");
        return NULL;
    }*/
   
    current = greenlet_getcurrent();
    pyclient = (ClientObject *) current_client;
    Py_DECREF(current);
    if (pyclient != NULL && pyclient->greenlet == current) {
        active = picoev_is_active(main_loop, fd);
        ret = picoev_add(main_loop, fd, event, timeout, trampoline_callback, (void *)pyclient);
        if ((ret == 0 && !active)) {
            activecnt++;
        }
        DEBUG("call from wsgi app");

        // switch to hub
        current = pyclient->greenlet;
        parent = greenlet_getparent(current);
        YDEBUG("trampoline fd:%d event:%d current:%p parent:%p cb_arg:%p", fd, event, current, parent, pyclient);
        
        /* Py_INCREF(hub_switch_value); */
        res = greenlet_switch(parent, hub_switch_value, NULL);
        return res;
    } else {
        DEBUG("call from greenlet");
        parent = greenlet_getparent(current);
        if (parent == NULL) {
            PyErr_SetString(PyExc_IOError, "call from same greenlet");
            return NULL;
        }

        active = picoev_is_active(main_loop, fd);
        ret = picoev_add(main_loop, fd, event, timeout, trampoline_callback, current);
        if ((ret == 0 && !active)) {
            activecnt++;
        }
        YDEBUG("trampoline fd:%d event:%d current:%p parent:%p cb_arg:%p", fd, event, current, parent, current);
        /* Py_INCREF(hub_switch_value); */
        res = greenlet_switch(parent, hub_switch_value, NULL);
        return res;
    }
#else
    NO_GREENLET_ERROR;
#endif

}


static PyObject*
meinheld_spawn(PyObject *self, PyObject *args, PyObject *kwargs)
{
#ifdef WITH_GREENLET
    PyObject *greenlet = NULL, *res = NULL;
    PyObject *func = NULL, *func_args = NULL, *func_kwargs = NULL;

    static char *keywords[] = {"func", "args", "kwargs", NULL};

    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OO:spawn", keywords, &func, &func_args, &func_kwargs)) {
        return NULL;
    }

    //new greenlet
    greenlet = greenlet_new(func, NULL);
    if (greenlet == NULL) {
        return NULL;
    }
    res = internal_schedule_call(0, func, func_args, func_kwargs, greenlet);
    Py_XDECREF(res);
    DEBUG("greenlet refcnt:%d", (int)Py_REFCNT(greenlet));
    Py_RETURN_NONE;

#else
    NO_GREENLET_ERROR;
#endif
}

static PyObject*
meinheld_sleep(PyObject *self, PyObject *args, PyObject *kwargs)
{
#ifdef WITH_GREENLET
    PyObject *current = NULL, *parent = NULL, *res = NULL;
    int sec = 0;
    static char *keywords[] = {"seconds", NULL};

    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "i:sleep", keywords, &sec)) {
        return NULL;
    }
    
    current = greenlet_getcurrent();
    parent = greenlet_getparent(current);
    Py_DECREF(current);
    if (parent == NULL) {
        PyErr_SetString(PyExc_IOError, "call from same greenlet");
        return NULL;
    }
    DEBUG("sleep sec:%d", sec);
    res = internal_schedule_call(sec, NULL, NULL, NULL, current);
    Py_XDECREF(res);
    res = greenlet_switch(parent, hub_switch_value, NULL);
    Py_XDECREF(res);

    Py_RETURN_NONE;

#else
    NO_GREENLET_ERROR;
#endif
}

PyObject *
meinheld_get_ident(PyObject *self, PyObject *args)
{
#ifdef WITH_GREENLET
    return greenlet_getcurrent();
#else
    NO_GREENLET_ERROR;
#endif
}

static PyObject*
internal_schedule_call(int seconds, PyObject *cb, PyObject *args, PyObject *kwargs, PyObject *greenlet)
{
    TimerObject* timer;
    heapq_t *timers = g_timers;
    pending_queue_t *pendings = g_pendings;

    timer = TimerObject_new(seconds, cb, args, kwargs, greenlet);
    if (timer == NULL) {
        return NULL;
    }
    DEBUG("seconds:%d", seconds);
    if (!seconds) {
        if (realloc_pendings() == -1) {
            Py_DECREF(timer);
            return NULL;
        }
        Py_INCREF(timer);

        //timer->pending = ++pendings->size;
        pendings->q[pendings->size] = timer;
        pendings->size++;
        DEBUG("add timer:%p pendings->size:%d", timer, pendings->size);
    } else {
        if (heappush(timers, timer) == -1) {
            Py_DECREF(timer);
            return NULL;
        }
    }
        
    activecnt++;
    return (PyObject*)timer;
}

static PyObject*
meinheld_schedule_call(PyObject *self, PyObject *args, PyObject *kwargs)
{
    long seconds = 0, ret;
    Py_ssize_t size;
    PyObject *sec = NULL, *cb = NULL, *cbargs = NULL, *timer;

    size = PyTuple_GET_SIZE(args);
    DEBUG("args size %d", (int)size);

    if (size < 2) {
        PyErr_SetString(PyExc_TypeError, "schedule_call takes exactly 2 argument");
        return NULL;
    }
    sec = PyTuple_GET_ITEM(args, 0);
    cb = PyTuple_GET_ITEM(args, 1);

#ifdef PY3
    if (!PyLong_Check(sec)) {
#else
    if (!PyInt_Check(sec)) {
#endif
        PyErr_SetString(PyExc_TypeError, "must be integer");
        return NULL;
    }
    if (!PyCallable_Check(cb)) {
        PyErr_SetString(PyExc_TypeError, "must be callable");
        return NULL;
    }

    ret = PyLong_AsLong(sec);
    if (PyErr_Occurred()) {
        return NULL;
    }
    if (ret < 0) {
        PyErr_SetString(PyExc_TypeError, "seconds value out of range");
        return NULL;
    }
    seconds = ret;

    if (size > 2) {
        cbargs = PyTuple_GetSlice(args, 2, size);
    }

    timer = internal_schedule_call(seconds, cb, cbargs, kwargs, NULL);
    Py_XDECREF(cbargs);
    return timer;
}

static PyMethodDef ServerMethods[] = {
    {"listen", (PyCFunction)meinheld_listen, METH_VARARGS|METH_KEYWORDS, "set host and port num"},
    {"set_access_logger", meinheld_access_log, METH_VARARGS, "set access logger function."},
    {"set_error_logger", meinheld_error_log, METH_VARARGS, "set error logger function."},

    {"set_keepalive", meinheld_set_keepalive, METH_VARARGS, "set keep-alive support. value set timeout sec. default 0. (disable keep-alive)"},
    {"get_keepalive", meinheld_get_keepalive, METH_VARARGS, "return keep-alive support."},

    {"set_max_content_length", meinheld_set_max_content_length, METH_VARARGS, "set max_content_length"},
    {"get_max_content_length", meinheld_get_max_content_length, METH_VARARGS, "return max_content_length"},

    {"set_client_body_buffer_size", meinheld_set_client_body_buffer_size, METH_VARARGS, "set client_body_buffer_size"},
    {"get_client_body_buffer_size", meinheld_get_client_body_buffer_size, METH_VARARGS, "return client_body_buffer_size"},

    {"set_backlog", meinheld_set_backlog, METH_VARARGS, "set backlog size"},
    {"get_backlog", meinheld_get_backlog, METH_VARARGS, "return backlog size"},

    {"set_picoev_max_fd", meinheld_set_picoev_max_fd, METH_VARARGS, "set picoev max fd size"},
    {"get_picoev_max_fd", meinheld_get_picoev_max_fd, METH_VARARGS, "return picoev max fd size"},

    /* {"set_process_name", meinheld_set_process_name, METH_VARARGS, "set process name"}, */
    {"stop", (PyCFunction)meinheld_stop, METH_VARARGS|METH_KEYWORDS, "stop main loop"},
    {"shutdown", (PyCFunction)meinheld_stop, METH_VARARGS|METH_KEYWORDS, "stop main loop "},

    {"schedule_call", (PyCFunction)meinheld_schedule_call, METH_VARARGS|METH_KEYWORDS, ""},
    {"spawn", (PyCFunction)meinheld_spawn, METH_VARARGS|METH_KEYWORDS, ""},
    {"sleep", (PyCFunction)meinheld_sleep, METH_VARARGS|METH_KEYWORDS, ""},

    // support gunicorn
    {"set_listen_socket", meinheld_set_listen_socket, METH_VARARGS, "set listen_sock"},
    {"set_watchdog", meinheld_set_watchdog, METH_VARARGS, "set watchdog"},
    {"set_fastwatchdog", meinheld_set_fastwatchdog, METH_VARARGS, "set watchdog"},
    {"run", (PyCFunction)meinheld_run_loop, METH_VARARGS|METH_KEYWORDS, "set wsgi app, run the main loop"},
    // greenlet and continuation
    {"_suspend_client", meinheld_suspend_client, METH_VARARGS, "resume client"},
    {"_resume_client", meinheld_resume_client, METH_VARARGS, "resume client"},
    // io
    {"cancel_wait", meinheld_cancel_wait, METH_VARARGS, "cancel wait"},
    {"trampoline", (PyCFunction)meinheld_trampoline, METH_VARARGS | METH_KEYWORDS, "trampoline"},
    {"get_ident", meinheld_get_ident, METH_VARARGS, "return thread ident id"},

    {NULL, NULL, 0, NULL}        /* Sentinel */
};

#ifdef PY3
#define INITERROR return NULL

static struct PyModuleDef server_module_def = {
    PyModuleDef_HEAD_INIT,
    MODULE_NAME,
    NULL,
    -1,
    ServerMethods,
};

PyObject *
PyInit_server(void)
#else
#define INITERROR return

PyMODINIT_FUNC
initserver(void)
#endif
{
    PyObject *m;
#ifdef PY3
    m = PyModule_Create(&server_module_def);
#else
    m = Py_InitModule3(MODULE_NAME, ServerMethods, "");
#endif
    if (m == NULL) {
        INITERROR;
    }

    if (PyType_Ready(&ResponseObjectType) < 0) {
        INITERROR;
    }

    if (PyType_Ready(&FileWrapperType) < 0) {
        INITERROR;
    }

    if (PyType_Ready(&ClientObjectType) < 0) {
        INITERROR;
    }

    if (PyType_Ready(&InputObjectType) < 0) {
        INITERROR;
    }

    if (PyType_Ready(&TimerObjectType) < 0) {
        INITERROR;
    }

    timeout_error = PyErr_NewException("meinheld.server.timeout",
                      PyExc_IOError, NULL);
    if (timeout_error == NULL) {
        INITERROR;
    }
    Py_INCREF(timeout_error);
    PyModule_AddObject(m, "timeout", timeout_error);

    //DEBUG("client size %u", sizeof(client_t));
    //DEBUG("request size %u", sizeof(request));
    //DEBUG("header bucket %u", sizeof(write_bucket));
    g_timers = init_queue();
    if (g_timers == NULL) {
        INITERROR;
    }
    g_pendings = init_pendings();
    if (g_pendings == NULL) {
        INITERROR;
    }

#ifdef WITH_GREENLET
    hub_switch_value = PyTuple_New(0);
#endif

#ifdef PY3
    return m;
#endif
}