summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlejandro Colomar <alx@kernel.org>2023-06-07 01:31:33 +0200
committerAlejandro Colomar <alx@kernel.org>2023-06-21 01:09:19 +0200
commitc6c37402c92c06077c33558a74c293bc8b0f7c6d (patch)
tree5418f1296ef1842018aa1f814cb1a076d74edd49
parented1e007b917cd425c40ae1821bdc0c37c583597f (diff)
Redesign to gzip the request bodygz
The intention is that this is async. It's not really true, because unitd will only serve the entire body at once, but that's as far as we can get. Cc: Andrei Vasiliu <whyte.vuhuni@gmail.com> Signed-off-by: Alejandro Colomar <alx@kernel.org>
-rw-r--r--src/nxt_unit_app_test.c215
1 files changed, 142 insertions, 73 deletions
diff --git a/src/nxt_unit_app_test.c b/src/nxt_unit_app_test.c
index 114038b..a993328 100644
--- a/src/nxt_unit_app_test.c
+++ b/src/nxt_unit_app_test.c
@@ -40,9 +40,20 @@
#define BODY " Body:\n"
+struct request_data_s {
+ z_stream z;
+ void *in;
+ void *out;
+ size_t szin;
+ size_t szout;
+};
+
+
static int ready_handler(nxt_unit_ctx_t *ctx);
static void *worker(void *main_ctx);
-static void greeting_app_request_handler(nxt_unit_request_info_t *req);
+static void request_handler(nxt_unit_request_info_t *req);
+static void data_handler(nxt_unit_request_info_t *req);
+static void end_handler(nxt_unit_request_info_t *req);
static int thread_count;
@@ -62,8 +73,10 @@ main(int argc, char **argv)
memset(&init, 0, sizeof(nxt_unit_init_t));
- init.callbacks.request_handler = greeting_app_request_handler;
+ init.callbacks.request_handler = request_handler;
init.callbacks.ready_handler = ready_handler;
+ init.callbacks.data_handler = data_handler;
+ init.request_data_size = sizeof(struct request_data_s);
ctx = nxt_unit_init(&init);
if (ctx == NULL) {
@@ -149,13 +162,11 @@ worker(void *main_ctx)
static void
-greeting_app_request_handler(nxt_unit_request_info_t *req)
+request_handler(nxt_unit_request_info_t *req)
{
- int fd, rc, tmp, flush;
- void *buf, *buf2;
- size_t sz, sz2;
- ssize_t rd;
- z_stream z;
+ int rc;
+ z_stream *z;
+ struct request_data_s *data;
#define CONTENT_TYPE "Content-Type"
#define TEXT_PLAIN "text/plain"
@@ -186,86 +197,144 @@ greeting_app_request_handler(nxt_unit_request_info_t *req)
goto fail;
}
- fd = open("/var/www/html/index.nginx-debian.html", O_RDONLY);
- if (fd == -1) {
+ data = req->data;
+ data->szin = 0;
+ data->in = malloc(1);
+ data->szout = 0;
+ data->out = malloc(1);
+
+ z = &data->z;
+ z->zalloc = NULL;
+ z->zfree = NULL;
+ z->opaque = NULL;
+ if (deflateInit(z, Z_DEFAULT_COMPRESSION) != 0) {
+ rc = NXT_UNIT_ERROR;
goto fail;
}
- fcntl(fd, F_SETFL, O_NONBLOCK);
- z.zalloc = NULL;
- z.zfree = NULL;
- z.opaque = NULL;
- if (deflateInit(&z, 5) != 0) {
- goto close_fd;
+ if (req->content_length > 0) {
+ data_handler(req);
}
- do {
- errno = 0;
- sz = 0;
- buf = NULL;
- z.avail_in = 0;
- do { // read as much as possible until EAGAIN or EOF.
- sz += 64;
- buf = reallocf(buf, sz);
- if (buf == NULL) {
- goto deflate_end;
- }
-
- rd = read(fd, buf + sz - 64, 64);
- switch (rd) {
- case 0:
- flush = Z_FINISH;
- break;
- case -1:
- if (errno != EAGAIN || z.avail_in == 0) {
- goto free_buf;
- }
- [[fallthrough]];
- default:
- flush = Z_SYNC_FLUSH;
- break;
- }
+ return;
- z.avail_in += rd;
- } while (rd == 64);
+fail:
+ nxt_unit_request_done(req, rc);
+ return;
+}
- z.next_in = buf;
- sz2 = sz + 32;
- buf2 = malloc(sz2);
- if (buf2 == NULL) {
- goto free_buf;
+static void
+data_handler(nxt_unit_request_info_t *req)
+{
+ int ret;
+ size_t sz;
+ ssize_t rd;
+ z_stream *z;
+ struct request_data_s *data;
+
+ data = req->data;
+ z = &data->z;
+
+ if (data->szin < req->content_length) {
+ data->szin = req->content_length;
+ data->in = reallocf(data->in, data->szin);
+ if (data->in == NULL) {
+ goto fail;
}
+ }
- z.avail_out = sz2;
- z.next_out = buf2;
- tmp = deflate(&z, flush);
- if (tmp == Z_STREAM_ERROR) {
- goto free_buf2;
- }
- if (tmp == Z_BUF_ERROR) {
- goto free_buf2;
- }
- if (z.avail_in != 0) {
- goto free_buf2;
+ /* We want to deflate all available data in one call. */
+ sz = deflateBound(z, req->content_length);
+ if (data->szout < sz) {
+ data->szout = sz;
+ data->out = reallocf(data->out, data->szout);
+ if (data->out == NULL) {
+ goto fail;
}
+ }
- if (nxt_unit_response_write(req, buf2, sz2 - z.avail_out) == -1) {
- goto free_buf2;
- }
- } while (flush != Z_FINISH);
+ /* This should read everything available in one call. */
+ rd = nxt_unit_request_read(req, data->in, data->szin);
+ if (rd == 0) {
+ return;
+ }
+ if (rd == NXT_UNIT_ERROR) {
+ goto fail;
+ }
-free_buf2:
- free(buf2);
-free_buf:
- free(buf);
+ z->next_in = data->in;
+ z->avail_in = rd;
+ z->next_out = data->out;
+ z->avail_out = data->szout;
+ errno = 0;
-deflate_end:
- deflateEnd(&z);
+ ret = deflate(z, Z_SYNC_FLUSH);
+ if (ret == Z_STREAM_ERROR || ret == Z_BUF_ERROR) {
+ goto fail;
+ }
-close_fd:
- close(fd);
+ sz = data->szout - z->avail_out;
+ ret = nxt_unit_response_write(req, data->out, sz);
+ if (ret == -1) {
+ goto fail;
+ }
+
+ end_handler(req); // FIXME: Call this conditionally, when no more data will come in.
+ return;
fail:
- nxt_unit_request_done(req, rc);
+ free(data->out);
+ free(data->in);
+ deflateEnd(z);
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+}
+
+
+static void
+end_handler(nxt_unit_request_info_t *req)
+{
+ int ret, status;
+ size_t sz;
+ z_stream *z;
+ struct request_data_s *data;
+
+ status = NXT_UNIT_ERROR;
+
+ data = req->data;
+ z = &data->z;
+
+ sz = deflateBound(z, 0);
+ if (data->szout < sz) {
+ data->szout = sz;
+ data->out = reallocf(data->out, data->szout);
+ if (data->out == NULL) {
+ goto fail;
+ }
+ }
+
+ z->next_in = data->in;
+ z->avail_in = 0;
+ z->next_out = data->out;
+ z->avail_out = data->szout;
+ errno = 0;
+ ret = deflate(z, Z_FINISH);
+ if (ret == Z_STREAM_ERROR || ret == Z_BUF_ERROR) {
+ goto fail;
+ }
+
+ sz = data->szout - z->avail_out;
+ ret = nxt_unit_response_write(req, data->out, sz);
+ if (ret == -1) {
+ goto fail;
+ }
+
+ status = NXT_UNIT_OK;
+fail:
+ free(data->out);
+ free(data->in);
+ deflateEnd(z);
+
+ nxt_unit_request_done(req, status);
}