#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <structmember.h>

#include "src/threadpool/threadpool.h"


static const unsigned CTX_POOL_SIZE_DEFAULT = 8;
static const unsigned CTX_MAX_REQUESTS_DEFAULT = 512;


static PyTypeObject AIOOperationType;
static PyTypeObject AIOContextType;

typedef struct {
    PyObject_HEAD
    threadpool_t* pool;
    uint16_t max_requests;
    uint8_t pool_size;
} AIOContext;


typedef struct {
    PyObject_HEAD
    PyObject* py_buffer;
    PyObject* callback;
    int opcode;
    unsigned int fileno;
    off_t offset;
    int result;
    uint8_t error;
    uint8_t in_progress;
    Py_ssize_t buf_size;
    char* buf;
    PyObject* ctx;
} AIOOperation;


enum THAIO_OP_CODE {
    THAIO_READ,
    THAIO_WRITE,
    THAIO_FSYNC,
    THAIO_FDSYNC,
    THAIO_NOOP,
};


static void
AIOContext_dealloc(AIOContext *self) {
    if (self->pool != 0) {
        threadpool_t* pool = self->pool;
        self->pool = 0;

        threadpool_destroy(pool, 0);
    }

    Py_TYPE(self)->tp_free((PyObject *) self);
}

/*
    AIOContext.__new__ classmethod definition
*/
static PyObject *
AIOContext_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
    AIOContext *self;

    self = (AIOContext *) type->tp_alloc(type, 0);
    return (PyObject *) self;
}

static int
AIOContext_init(AIOContext *self, PyObject *args, PyObject *kwds)
{
    static char *kwlist[] = {"max_requests", "pool_size", NULL};

    self->pool = NULL;
    self->max_requests = 0;

    if (!PyArg_ParseTupleAndKeywords(
            args, kwds, "|HH", kwlist,
            &self->max_requests, &self->pool_size
    )) return -1;

    if (self->max_requests <= 0) {
        self->max_requests = CTX_MAX_REQUESTS_DEFAULT;
    }

    if (self->pool_size <= 0) {
        self->pool_size = CTX_POOL_SIZE_DEFAULT;
    }

    if (self->pool_size > MAX_THREADS) {
        PyErr_Format(
            PyExc_ValueError,
            "pool_size too large. Allowed lower then %d",
            MAX_THREADS
        );
        return -1;
    }

    if (self->max_requests >= (MAX_QUEUE - 1)) {
        PyErr_Format(
            PyExc_ValueError,
            "max_requests too large. Allowed lower then %d",
            MAX_QUEUE - 1
        );
        return -1;
    }

    self->pool = threadpool_create(self->pool_size, self->max_requests, 0);

    if (self->pool == NULL) {
        PyErr_Format(
            PyExc_RuntimeError,
            "Pool initialization failed size=%d max_requests=%d",
            self->pool_size, self->max_requests
        );
        return -1;
    }

    return 0;
}

static PyObject* AIOContext_repr(AIOContext *self) {
    if (self->pool == NULL) {
        PyErr_SetString(PyExc_RuntimeError, "Pool not initialized");
        return NULL;
    }
    return PyUnicode_FromFormat(
        "<%s as %p: max_requests=%i, pool_size=%i, ctx=%lli>",
        Py_TYPE(self)->tp_name, self, self->max_requests,
        self->pool_size, self->pool
    );
}


void worker(void *arg) {
    PyGILState_STATE state;

    AIOOperation* op = arg;
    PyObject* ctx = op->ctx;
    op->ctx = NULL;
    op->error = 0;

    if (op->opcode == THAIO_NOOP) {
        state = PyGILState_Ensure();
        op->ctx = NULL;
        Py_DECREF(ctx);
        Py_DECREF(op);
        PyGILState_Release(state);
        return;
    }

    int fileno = op->fileno;
    off_t offset = op->offset;
    int buf_size = op->buf_size;
    char* buf = op->buf;

    int result;

    switch (op->opcode) {
        case THAIO_WRITE:
            result = pwrite(fileno, (const char*) buf, buf_size, offset);
            break;
        case THAIO_FSYNC:
            result = fsync(fileno);
            break;
        case THAIO_FDSYNC:
#ifdef HAVE_FDATASYNC
            result = fdatasync(fileno);
#else
            result = fsync(fileno);
#endif
            break;

        case THAIO_READ:
            result = pread(fileno, buf, buf_size, offset);
            break;
    }

    op->ctx = NULL;
    op->result = result;

    if (result < 0) op->error = errno;

    if (op->opcode == THAIO_READ) {
        op->buf_size = result;
    }
    state = PyGILState_Ensure();
    if (op->callback != NULL) {
        PyObject_CallFunction(op->callback, "i", result);
    }

    if (op->opcode == THAIO_WRITE) {
        Py_DECREF(op->py_buffer);
        op->py_buffer = NULL;
    }

    Py_DECREF(ctx);
    Py_DECREF(op);

    PyGILState_Release(state);
}


inline int process_pool_error(int code) {
    switch (code) {
        case threadpool_invalid:
            PyErr_SetString(
                PyExc_RuntimeError,
                "Thread pool pointer is invalid"
            );
            return code;
        case threadpool_lock_failure:
            PyErr_SetString(
                PyExc_RuntimeError,
                "Failed to lock thread pool"
            );
            return code;
        case threadpool_queue_full:
            PyErr_Format(
                PyExc_RuntimeError,
                "Thread pool queue full"
            );
            return code;
        case threadpool_shutdown:
            PyErr_SetString(
                PyExc_RuntimeError,
                "Thread pool is shutdown"
            );
            return code;
        case threadpool_thread_failure:
            PyErr_SetString(
                PyExc_RuntimeError,
                "Thread failure"
            );
            return code;
    }

    if (code < 0) PyErr_SetString(PyExc_RuntimeError, "Unknown error");
    return code;
}



PyDoc_STRVAR(AIOContext_submit_docstring,
    "Accepts multiple Operations. Returns \n\n"
    "    Operation.submit(aio_op1, aio_op2, aio_opN, ...) -> int"
);
static PyObject* AIOContext_submit(
    AIOContext *self, PyObject *args
) {
    if (self == NULL) {
        PyErr_SetString(PyExc_RuntimeError, "self is NULL");
        return NULL;
    }

    if (self->pool == NULL) {
        PyErr_SetString(PyExc_RuntimeError, "self->pool is NULL");
        return NULL;
    }

    if (!PyTuple_Check(args)) {
        PyErr_SetNone(PyExc_ValueError);
        return NULL;
    }

    unsigned int nr = PyTuple_Size(args);

    PyObject* obj;
    AIOOperation* ops[nr];
    unsigned int i;

    for (i=0; i < nr; i++) {
        obj = PyTuple_GetItem(args, i);
        if (PyObject_TypeCheck(obj, &AIOOperationType) == 0) {
            PyErr_Format(
                PyExc_TypeError,
                "Wrong type for argument %d", i
            );

            return NULL;
        }

        ops[i] = (AIOOperation*) obj;
        ops[i]->ctx = (void*) self;
    }

    unsigned int j=0;
    int result = 0;

    for (i=0; i < nr; i++) {
        if (ops[i]->in_progress) continue;
        ops[i]->in_progress = 1;
        Py_INCREF(ops[i]);
        Py_INCREF(ops[i]->ctx);
        result = threadpool_add(self->pool, worker, (void*) ops[i], 0);
        if (process_pool_error(result) < 0) return NULL;
        j++;
    }

    return (PyObject*) PyLong_FromSsize_t(j);
}


PyDoc_STRVAR(AIOContext_cancel_docstring,
    "Cancels multiple Operations. Returns \n\n"
    "    Operation.cancel(aio_op1, aio_op2, aio_opN, ...) -> int\n\n"
    "(Always returns zero, this method exists for compatibility reasons)"
);
static PyObject* AIOContext_cancel(
    AIOContext *self, PyObject *args
) {
    return (PyObject*) PyLong_FromSsize_t(0);
}


/*
    AIOContext properties
*/
static PyMemberDef AIOContext_members[] = {
    {
        "pool_size",
        T_INT,
        offsetof(AIOContext, pool_size),
        READONLY,
        "pool_size"
    },
    {
        "max_requests",
        T_USHORT,
        offsetof(AIOContext, max_requests),
        READONLY,
        "max requests"
    },
    {NULL}  /* Sentinel */
};

static PyMethodDef AIOContext_methods[] = {
    {
        "submit",
        (PyCFunction) AIOContext_submit, METH_VARARGS,
        AIOContext_submit_docstring
    },
    {
        "cancel",
        (PyCFunction) AIOContext_cancel, METH_VARARGS,
        AIOContext_cancel_docstring
    },
    {NULL}  /* Sentinel */
};

static PyTypeObject
AIOContextType = {
    PyVarObject_HEAD_INIT(NULL, 0)
    .tp_name = "Context",
    .tp_doc = "thread aio context",
    .tp_basicsize = sizeof(AIOContext),
    .tp_itemsize = 0,
    .tp_flags = Py_TPFLAGS_DEFAULT,
    .tp_new = AIOContext_new,
    .tp_init = (initproc) AIOContext_init,
    .tp_dealloc = (destructor) AIOContext_dealloc,
    .tp_members = AIOContext_members,
    .tp_methods = AIOContext_methods,
    .tp_repr = (reprfunc) AIOContext_repr
};


static void
AIOOperation_dealloc(AIOOperation *self) {
    Py_CLEAR(self->callback);

    if ((self->opcode == THAIO_READ) && self->buf != NULL) {
        PyMem_Free(self->buf);
        self->buf = NULL;
    }

    Py_CLEAR(self->py_buffer);
    Py_TYPE(self)->tp_free((PyObject *) self);
}


static PyObject* AIOOperation_repr(AIOOperation *self) {
    char* mode;

    switch (self->opcode) {
        case THAIO_READ:
            mode = "read";
            break;

        case THAIO_WRITE:
            mode = "write";
            break;

        case THAIO_FSYNC:
            mode = "fsync";
            break;

        case THAIO_FDSYNC:
            mode = "fdsync";
            break;
        default:
            mode = "noop";
            break;
    }

    return PyUnicode_FromFormat(
        "<%s at %p: mode=\"%s\", fd=%i, offset=%i, result=%i, buffer=%p>",
        Py_TYPE(self)->tp_name, self, mode,
        self->fileno, self->offset, self->result, self->buf
    );
}


/*
    AIOOperation.read classmethod definition
*/
PyDoc_STRVAR(AIOOperation_read_docstring,
    "Creates a new instance of Operation on read mode.\n\n"
    "    Operation.read(\n"
    "        nbytes: int,\n"
    "        aio_context: Context,\n"
    "        fd: int, \n"
    "        offset: int,\n"
    "        priority=0\n"
    "    )"
);

static PyObject* AIOOperation_read(
    PyTypeObject *type, PyObject *args, PyObject *kwds
) {
    AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0);

    static char *kwlist[] = {"nbytes", "fd", "offset", "priority", NULL};

    if (self == NULL) {
        PyErr_SetString(PyExc_MemoryError, "can not allocate memory");
        return NULL;
    }

    self->buf = NULL;
    self->py_buffer = NULL;
    self->in_progress = 0;

    uint64_t nbytes = 0;
    uint16_t priority;

    int argIsOk = PyArg_ParseTupleAndKeywords(
        args, kwds, "KI|LH", kwlist,
        &nbytes,
        &(self->fileno),
        &(self->offset),
        &priority
    );

    if (!argIsOk) return NULL;

    self->buf = PyMem_Calloc(nbytes, sizeof(char));
    self->buf_size = nbytes;

    self->py_buffer = PyMemoryView_FromMemory(
        self->buf,
        self->buf_size,
        PyBUF_READ
    );

    self->opcode = THAIO_READ;

	return (PyObject*) self;
}

/*
    AIOOperation.write classmethod definition
*/
PyDoc_STRVAR(AIOOperation_write_docstring,
    "Creates a new instance of Operation on write mode.\n\n"
    "    Operation.write(\n"
    "        payload_bytes: bytes,\n"
    "        fd: int, \n"
    "        offset: int,\n"
    "        priority=0\n"
    "    )"
);

static PyObject* AIOOperation_write(
    PyTypeObject *type, PyObject *args, PyObject *kwds
) {
    AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0);

    static char *kwlist[] = {"payload_bytes", "fd", "offset", "priority", NULL};

    if (self == NULL) {
        PyErr_SetString(PyExc_MemoryError, "can not allocate memory");
        return NULL;
    }

    // unused
    uint16_t priority;

    self->buf = NULL;
    self->py_buffer = NULL;
    self->in_progress = 0;

    int argIsOk = PyArg_ParseTupleAndKeywords(
        args, kwds, "OI|LH", kwlist,
        &(self->py_buffer),
        &(self->fileno),
        &(self->offset),
        &priority
    );

    if (!argIsOk) return NULL;

    if (!PyBytes_Check(self->py_buffer)) {
        Py_XDECREF(self);
        PyErr_SetString(
            PyExc_ValueError,
            "payload_bytes argument must be bytes"
        );
        return NULL;
    }

    self->opcode = THAIO_WRITE;

    if (PyBytes_AsStringAndSize(
            self->py_buffer,
            &self->buf,
            &self->buf_size
    )) {
        Py_XDECREF(self);
        PyErr_SetString(
            PyExc_RuntimeError,
            "Can not convert bytes to c string"
        );
        return NULL;
    }

    Py_INCREF(self->py_buffer);

	return (PyObject*) self;
}


/*
    AIOOperation.fsync classmethod definition
*/
PyDoc_STRVAR(AIOOperation_fsync_docstring,
    "Creates a new instance of Operation on fsync mode.\n\n"
    "    Operation.fsync(\n"
    "        aio_context: AIOContext,\n"
    "        fd: int, \n"
    "        priority=0\n"
    "    )"
);

static PyObject* AIOOperation_fsync(
    PyTypeObject *type, PyObject *args, PyObject *kwds
) {
    AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0);

    static char *kwlist[] = {"fd", "priority", NULL};

    if (self == NULL) {
        PyErr_SetString(PyExc_MemoryError, "can not allocate memory");
        return NULL;
    }

    uint16_t priority;

    self->buf = NULL;
    self->py_buffer = NULL;
    self->in_progress = 0;

    int argIsOk = PyArg_ParseTupleAndKeywords(
        args, kwds, "I|H", kwlist,
        &(self->fileno),
        &priority
    );

    if (!argIsOk) return NULL;

    self->opcode = THAIO_FSYNC;

	return (PyObject*) self;
}


/*
    AIOOperation.fdsync classmethod definition
*/
PyDoc_STRVAR(AIOOperation_fdsync_docstring,
    "Creates a new instance of Operation on fdsync mode.\n\n"
    "    Operation.fdsync(\n"
    "        aio_context: AIOContext,\n"
    "        fd: int, \n"
    "        priority=0\n"
    "    )"
);

static PyObject* AIOOperation_fdsync(
    PyTypeObject *type, PyObject *args, PyObject *kwds
) {
    AIOOperation *self = (AIOOperation *) type->tp_alloc(type, 0);

    static char *kwlist[] = {"fd", "priority", NULL};

    if (self == NULL) {
        PyErr_SetString(PyExc_MemoryError, "can not allocate memory");
        return NULL;
    }

    self->buf = NULL;
    self->py_buffer = NULL;
    self->in_progress = 0;
    uint16_t priority;

    int argIsOk = PyArg_ParseTupleAndKeywords(
        args, kwds, "I|H", kwlist,
        &(self->fileno),
        &priority
    );

    if (!argIsOk) return NULL;

    self->opcode = THAIO_FDSYNC;

	return (PyObject*) self;
}

/*
    AIOOperation.get_value method definition
*/
PyDoc_STRVAR(AIOOperation_get_value_docstring,
    "Method returns a bytes value of Operation's result or None.\n\n"
    "    Operation.get_value() -> Optional[bytes]"
);

static PyObject* AIOOperation_get_value(
    AIOOperation *self, PyObject *args, PyObject *kwds
) {
    if (self->error != 0) {
        PyErr_SetString(
            PyExc_SystemError,
            strerror(self->error)
        );

        return NULL;
    }

    switch (self->opcode) {
        case THAIO_READ:
            return PyBytes_FromStringAndSize(
                self->buf, self->buf_size
            );

        case THAIO_WRITE:
            return PyLong_FromSsize_t(self->result);
    }

    return Py_None;
}


/*
    AIOOperation.get_value method definition
*/
PyDoc_STRVAR(AIOOperation_set_callback_docstring,
    "Set callback which will be called after Operation will be finished.\n\n"
    "    Operation.get_value() -> Optional[bytes]"
);

static PyObject* AIOOperation_set_callback(
    AIOOperation *self, PyObject *args, PyObject *kwds
) {
    static char *kwlist[] = {"callback", NULL};

    PyObject* callback;

    int argIsOk = PyArg_ParseTupleAndKeywords(
        args, kwds, "O", kwlist,
        &callback
    );

    if (!argIsOk) return NULL;

    if (!PyCallable_Check(callback)) {
        PyErr_Format(
            PyExc_ValueError,
            "object %r is not callable",
            callback
        );
        return NULL;
    }

    Py_INCREF(callback);
    self->callback = callback;

    Py_RETURN_TRUE;
}

/*
    AIOOperation properties
*/
static PyMemberDef AIOOperation_members[] = {
    {
        "fileno", T_UINT,
        offsetof(AIOOperation, fileno),
        READONLY, "file descriptor"
    },
    {
        "offset", T_ULONGLONG,
        offsetof(AIOOperation, offset),
        READONLY, "offset"
    },
    {
        "payload", T_OBJECT,
        offsetof(AIOOperation, py_buffer),
        READONLY, "payload"
    },
    {
        "nbytes", T_ULONGLONG,
        offsetof(AIOOperation, buf_size),
        READONLY, "nbytes"
    },
    {
        "result", T_INT,
        offsetof(AIOOperation, result),
        READONLY, "result"
    },
    {
        "error", T_INT,
        offsetof(AIOOperation, error),
        READONLY, "error"
    },
    {NULL}  /* Sentinel */
};

/*
    AIOOperation methods
*/
static PyMethodDef AIOOperation_methods[] = {
    {
        "read",
        (PyCFunction) AIOOperation_read,
        METH_CLASS | METH_VARARGS | METH_KEYWORDS,
        AIOOperation_read_docstring
    },
    {
        "write",
        (PyCFunction) AIOOperation_write,
        METH_CLASS | METH_VARARGS | METH_KEYWORDS,
        AIOOperation_write_docstring
    },
    {
        "fsync",
        (PyCFunction) AIOOperation_fsync,
        METH_CLASS | METH_VARARGS | METH_KEYWORDS,
        AIOOperation_fsync_docstring
    },
    {
        "fdsync",
        (PyCFunction) AIOOperation_fdsync,
        METH_CLASS | METH_VARARGS | METH_KEYWORDS,
        AIOOperation_fdsync_docstring
    },
    {
        "get_value",
        (PyCFunction) AIOOperation_get_value, METH_NOARGS,
        AIOOperation_get_value_docstring
    },
    {
        "set_callback",
        (PyCFunction) AIOOperation_set_callback, METH_VARARGS | METH_KEYWORDS,
        AIOOperation_set_callback_docstring
    },
    {NULL}  /* Sentinel */
};

/*
    AIOOperation class
*/
static PyTypeObject
AIOOperationType = {
    PyVarObject_HEAD_INIT(NULL, 0)
    .tp_name = "aio.AIOOperation",
    .tp_doc = "thread aio operation representation",
    .tp_basicsize = sizeof(AIOOperation),
    .tp_itemsize = 0,
    .tp_flags = Py_TPFLAGS_DEFAULT,
    .tp_dealloc = (destructor) AIOOperation_dealloc,
    .tp_members = AIOOperation_members,
    .tp_methods = AIOOperation_methods,
    .tp_repr = (reprfunc) AIOOperation_repr
};


static PyModuleDef thread_aio_module = {
    PyModuleDef_HEAD_INIT,
    .m_name = "thread_aio",
    .m_doc = "Thread based AIO.",
    .m_size = -1,
};


PyMODINIT_FUNC PyInit_thread_aio(void) {
    Py_Initialize();

    PyObject *m;

    m = PyModule_Create(&thread_aio_module);

    if (m == NULL) return NULL;

    if (PyType_Ready(&AIOContextType) < 0) return NULL;

    Py_INCREF(&AIOContextType);

    if (PyModule_AddObject(m, "Context", (PyObject *) &AIOContextType) < 0) {
        Py_XDECREF(&AIOContextType);
        Py_XDECREF(m);
        return NULL;
    }

    if (PyType_Ready(&AIOOperationType) < 0) return NULL;

    Py_INCREF(&AIOOperationType);

    if (PyModule_AddObject(m, "Operation", (PyObject *) &AIOOperationType) < 0) {
        Py_XDECREF(&AIOOperationType);
        Py_XDECREF(m);
        return NULL;
    }

    return m;
}
