forked from memcached/memcached
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproxy_luafgen.c
1623 lines (1436 loc) · 56.4 KB
/
proxy_luafgen.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "proxy.h"
static mcp_funcgen_t *mcp_funcgen_route(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr);
static int mcp_funcgen_router_cleanup(lua_State *L, mcp_funcgen_t *fgen);
static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen);
static void proxy_return_rqu_cb(io_pending_t *pending);
// If we're GC'ed but not closed, it means it was created but never
// attached to a function, so ensure everything is closed properly.
int mcplib_funcgen_gc(lua_State *L) {
mcp_funcgen_t *fgen = luaL_checkudata(L, -1, "mcp.funcgen");
if (fgen->closed) {
return 0;
}
assert(fgen->self_ref == 0);
mcp_funcgen_cleanup(L, fgen);
return 0;
}
// For describing functions which generate functions which can execute
// requests.
// These "generator functions" handle pre-allocating and creating a memory
// heirarchy, allowing dynamic runtimes at high speed.
// TODO: switch from an array to a STAILQ so we can avoid the memory
// management and error handling.
// Realistically it's impossible for these to error so we're safe for now.
static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx) {
if (fgen->free + 1 >= fgen->free_max) {
fgen->free_max *= 2;
fgen->list = realloc(fgen->list, fgen->free_max * sizeof(mcp_rcontext_t *));
}
fgen->list[fgen->free] = rctx;
fgen->free++;
// we're closed and every outstanding request slot has been
// returned.
if (fgen->closed && fgen->free == fgen->total) {
mcp_funcgen_cleanup(fgen->thread->L, fgen);
}
}
// call with stack: mcp.funcgen -2, function -1
static int _mcplib_funcgen_gencall(lua_State *L) {
mcp_funcgen_t *fgen = luaL_checkudata(L, -2, "mcp.funcgen");
int fgen_idx = lua_absindex(L, -2);
// create the ctx object.
size_t rctx_len = sizeof(mcp_rcontext_t) + sizeof(struct mcp_rqueue_s) * fgen->max_queues;
mcp_rcontext_t *rc = lua_newuserdatauv(L, rctx_len, 0);
memset(rc, 0, rctx_len);
luaL_getmetatable(L, "mcp.rcontext");
lua_setmetatable(L, -2);
// allow the rctx to reference the function generator.
rc->fgen = fgen;
// initialize the queue slots based on the fgen parent
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *frqu = &fgen->queue_list[x];
struct mcp_rqueue_s *rqu = &rc->qslots[x];
rqu->obj_type = frqu->obj_type;
if (frqu->obj_type == RQUEUE_TYPE_POOL) {
rqu->obj_ref = 0;
rqu->obj = frqu->obj;
mcp_resp_t *r = mcp_prep_bare_resobj(L, fgen->thread);
rqu->res_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->res_obj = r;
} else if (frqu->obj_type == RQUEUE_TYPE_FGEN) {
// owner funcgen already holds the subfgen reference, so here we're just
// grabbing a subrctx to pin into the slot.
mcp_funcgen_t *fg = frqu->obj;
mcp_rcontext_t *subrctx = mcp_funcgen_get_rctx(L, fg->self_ref, fg);
if (subrctx == NULL) {
proxy_lua_error(L, "failed to generate request slot during queue_assign()");
}
// if this rctx ever had a request object assigned to it, we can get
// rid of it. we're pinning the subrctx in here and don't want
// to waste memory.
if (subrctx->request_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, subrctx->request_ref);
subrctx->request_ref = 0;
subrctx->request = NULL;
}
// link the new rctx into this chain; we'll hold onto it until the
// parent de-allocates.
subrctx->parent = rc;
subrctx->parent_handle = x;
rqu->obj = subrctx;
}
}
// copy the rcontext reference
lua_pushvalue(L, -1);
// issue a rotation so one rcontext is now below genfunc, and one rcontext
// is on the top.
// right shift: gf, rc1, rc2 -> rc2, gf, rc1
lua_rotate(L, -3, 1);
// current stack should be func, mcp.rcontext.
int call_argnum = 1;
// stack will be func, rctx, arg if there is an arg.
if (fgen->argument_ref) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->argument_ref);
call_argnum++;
}
// can throw an error upstream.
lua_call(L, call_argnum, 1);
// we should have a top level function as a result.
if (!lua_isfunction(L, -1)) {
proxy_lua_error(L, "function generator didn't return a function");
return 0;
}
// can't fail past this point.
// pop the returned function.
rc->function_ref = luaL_ref(L, LUA_REGISTRYINDEX);
// link the rcontext into the function generator.
fgen->total++;
lua_getiuservalue(L, fgen_idx, 1); // get the reference table.
// rc, t -> t, rc
lua_rotate(L, -2, 1);
lua_rawseti(L, -2, fgen->total); // pop rcontext
lua_pop(L, 1); // pop ref table.
_mcplib_funcgen_cache(fgen, rc);
// associate a coroutine thread with this context.
rc->Lc = lua_newthread(L);
assert(rc->Lc);
rc->coroutine_ref = luaL_ref(L, LUA_REGISTRYINDEX);
// increment the slot counter
const char *name = NULL;
if (fgen->name_ref) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->name_ref);
name = lua_tostring(L, -1);
// popping early: string stays referenced in name_ref and no other lua
// code executing on this VM, so it's safe to reference immediately.
lua_pop(L, 1);
} else {
name = "anonymous";
}
LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, name, 1);
// return the fgen.
// FIXME: just return 0? need to adjust caller to not mis-ref the
// generator function.
return 1;
}
static void _mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
mcp_funcgen_t *fgen = rctx->fgen;
assert(rctx->pending_reqs == 0);
int res = lua_resetthread(rctx->Lc);
if (res != LUA_OK) {
// TODO: I was under the impression it was possible to reuse a
// coroutine from an error state, but it seems like this only works if
// the routine landed in LUA_YIELD or LUA_OK
// Leaving a note here to triple check this or if my memory was wrong.
// Instead for now we throw away the coroutine if it was involved in
// an error. Realistically this shouldn't be normal so it might not
// matter anyway.
lua_State *L = fgen->thread->L;
luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
rctx->Lc = lua_newthread(L);
assert(rctx->Lc);
rctx->coroutine_ref = luaL_ref(L, LUA_REGISTRYINDEX);
} else {
lua_settop(rctx->Lc, 0);
}
rctx->wait_mode = QWAIT_IDLE;
rctx->resp = NULL;
rctx->first_queue = false; // HACK
if (rctx->request) {
mcp_request_cleanup(fgen->thread, rctx->request);
}
// reset each rqu.
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
if (rqu->res_ref) {
if (rqu->res_obj) {
// using a persistent object.
mcp_response_cleanup(fgen->thread, rqu->res_obj);
} else {
// temporary error object
luaL_unref(rctx->Lc, LUA_REGISTRYINDEX, rqu->res_ref);
rqu->res_ref = 0;
}
}
if (rqu->req_ref) {
luaL_unref(rctx->Lc, LUA_REGISTRYINDEX, rqu->req_ref);
rqu->req_ref = 0;
}
assert(rqu->state != RQUEUE_ACTIVE);
rqu->state = RQUEUE_IDLE;
rqu->flags = 0;
rqu->rq = NULL;
if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
_mcp_funcgen_return_rctx(rqu->obj);
}
}
}
// TODO: check rctx->awaiting before returning?
// TODO: separate the "cleanup" portion from the "Return to cache" portion, so
// we can call that directly for subrctx's
void mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
mcp_funcgen_t *fgen = rctx->fgen;
if (rctx->pending_reqs != 0) {
// not ready to return to cache yet.
return;
}
if (rctx->parent) {
// Important: we need to hold the parent request reference until this
// subrctx is fully depleted of outstanding requests itself.
rctx->parent->pending_reqs--;
assert(rctx->parent->pending_reqs > -1);
if (rctx->parent->pending_reqs == 0) {
mcp_funcgen_return_rctx(rctx->parent);
}
return;
}
_mcp_funcgen_return_rctx(rctx);
_mcplib_funcgen_cache(fgen, rctx);
}
mcp_rcontext_t *mcp_funcgen_get_rctx(lua_State *L, int fgen_ref, mcp_funcgen_t *fgen) {
mcp_rcontext_t *rctx = NULL;
// nothing left in slot cache, generate a new function.
if (fgen->free == 0) {
// TODO (perf): pre-create this c closure somewhere hidden.
lua_pushcclosure(L, _mcplib_funcgen_gencall, 0);
// pull in the funcgen object
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen_ref);
// then generator function
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->generator_ref);
// then generate a new function slot.
int res = lua_pcall(L, 2, 1, 0);
if (res != LUA_OK) {
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1));
lua_settop(L, 0);
return NULL;
}
lua_pop(L, 1); // drop the extra funcgen
} else {
P_DEBUG("%s: serving from cache\n", __func__);
}
rctx = fgen->list[fgen->free-1];
fgen->free--;
// on non-error, return the response object upward.
return rctx;
}
mcp_rcontext_t *mcp_funcgen_start(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr) {
if (fgen->router.type != FGEN_ROUTER_NONE) {
fgen = mcp_funcgen_route(L, fgen, pr);
if (fgen == NULL) {
return NULL;
}
}
// fgen->self_ref must be valid because we cannot start a function that
// hasn't been referenced anywhere.
mcp_rcontext_t *rctx = mcp_funcgen_get_rctx(L, fgen->self_ref, fgen);
if (rctx == NULL) {
return NULL;
}
// only top level rctx's can have a request object assigned to them.
// so we create them late here, in the start function.
// Note that we can _technically_ fail with an OOM here, but we've not set
// up lua in a way that OOM's are possible.
if (rctx->request_ref == 0) {
mcp_request_t *rq = lua_newuserdatauv(L, sizeof(mcp_request_t) + MCP_REQUEST_MAXLEN + KEY_MAX_LENGTH, 0);
memset(rq, 0, sizeof(mcp_request_t));
luaL_getmetatable(L, "mcp.request");
lua_setmetatable(L, -2);
rctx->request_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the request
rctx->request = rq;
}
// TODO: could probably move a few more lines from proto_proxy into here,
// but that's splitting hairs.
return rctx;
}
// calling either with self_ref set, or with fgen in stack -1 (ie; from GC
// function without ever being attached to anything)
static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
lua_checkstack(L, 5); // paranoia. this can recurse from a router.
// pull the fgen into the stack.
if (fgen->self_ref) {
// pull self onto the stack and hold until the end of the func.
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->self_ref);
// remove the C reference to the fgen
luaL_unref(L, LUA_REGISTRYINDEX, fgen->self_ref);
fgen->self_ref = 0;
} else {
// we've already cleaned up, probably redundant call from _gc()
return;
}
if (fgen->router.type != FGEN_ROUTER_NONE) {
// we're actually a "router", send this out for cleanup.
mcp_funcgen_router_cleanup(L, fgen);
}
// decrement the slot counter
const char *name = NULL;
if (fgen->name_ref) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->name_ref);
name = lua_tostring(L, -1);
} else {
name = "anonymous";
}
LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGEN_IDX, name, -1);
// Walk every request context and issue cleanup.
for (int x = 0; x < fgen->total; x++) {
mcp_rcontext_t *rctx = fgen->list[x];
luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
luaL_unref(L, LUA_REGISTRYINDEX, rctx->function_ref);
if (rctx->request_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rctx->request_ref);
}
assert(rctx->pending_reqs == 0);
// cleanup of request queue entries. recurse funcgen cleanup.
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
if (rqu->obj_type == RQUEUE_TYPE_POOL) {
// nothing to do.
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
// don't need to recurse, just return the subrctx.
mcp_rcontext_t *subrctx = rqu->obj;
_mcplib_funcgen_cache(subrctx->fgen, subrctx);
} else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
assert(1 == 0);
}
if (rqu->cb_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
rqu->cb_ref = 0;
}
}
}
// decrement the slot tracker. apply full delta at once for efficiency.
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, name, -fgen->total);
if (fgen->queue_list) {
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *rqu = &fgen->queue_list[x];
if (rqu->obj_type == RQUEUE_TYPE_POOL) {
// just the obj_ref
luaL_unref(L, LUA_REGISTRYINDEX, rqu->obj_ref);
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
// don't need to recurse, just deref.
mcp_rcontext_t *subrctx = rqu->obj;
mcp_funcgen_dereference(L, subrctx->fgen);
} else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
assert(1 == 0);
}
}
free(fgen->queue_list);
}
if (fgen->name_ref) {
lua_pop(L, 1); // pop the name string.
luaL_unref(L, LUA_REGISTRYINDEX, fgen->name_ref);
}
// Finally, get the rctx reference table and nil each reference to allow
// garbage collection to happen sooner on the rctx's
lua_getiuservalue(L, -1, 1);
for (int x = 0; x < fgen->total; x++) {
lua_pushnil(L);
lua_rawseti(L, -2, x+1);
}
// drop the reference table and the funcgen reference.
lua_pop(L, 2);
}
// Must be called with the function generator at on top of stack
// Pops the value from the stack.
void mcp_funcgen_reference(lua_State *L) {
mcp_funcgen_t *fgen = luaL_checkudata(L, -1, "mcp.funcgen");
if (fgen->self_ref) {
fgen->refcount++;
lua_pop(L, 1); // ensure we drop the extra value.
} else {
fgen->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
fgen->refcount = 1;
}
P_DEBUG("%s: funcgen referenced: %d\n", __func__, fgen->refcount);
}
void mcp_funcgen_dereference(lua_State *L, mcp_funcgen_t *fgen) {
assert(fgen->refcount > 0);
fgen->refcount--;
P_DEBUG("%s: funcgen dereferenced: %d\n", __func__, fgen->refcount);
if (fgen->refcount == 0) {
fgen->closed = true;
P_DEBUG("%s: funcgen cleaning up\n", __func__);
if (fgen->free == fgen->total) {
mcp_funcgen_cleanup(L, fgen);
}
}
}
// All we need to do here is copy the function reference we've stashed into
// the C closure's upvalue and return it.
static int _mcplib_funcgenbare_generator(lua_State *L) {
lua_pushvalue(L, lua_upvalueindex(1));
return 1;
}
// helper function to create a function generator with a "default" function.
// the function passed in here is a standard 'function(r) etc end' prototype,
// which we want to always return instead of calling a real generator
// function.
int mcplib_funcgenbare_new(lua_State *L) {
if (!lua_isfunction(L, -1)) {
proxy_lua_error(L, "Must pass a function to mcp.funcgenbare_new");
return 0;
}
// Pops the function into the upvalue of this C closure function.
lua_pushcclosure(L, _mcplib_funcgenbare_generator, 1);
// FIXME: not urgent, but this function chain isn't stack balanced, and its caller has
// to drop an extra reference.
// Need to re-audit and decide if we still need this pushvalue here or if
// we can drop the pop from the caller and leave this function balanced.
lua_pushvalue(L, -1);
int gen_ref = luaL_ref(L, LUA_REGISTRYINDEX);
// Pass our fakeish generator function down the line.
mcplib_funcgen_new(L);
mcp_funcgen_t *fgen = lua_touserdata(L, -1);
const char *name = "anonymous";
mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, name, 1);
fgen->generator_ref = gen_ref;
fgen->ready = true;
return 1;
}
#define FGEN_DEFAULT_FREELIST_SIZE 8
int mcplib_funcgen_new(lua_State *L) {
LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_funcgen_t *fgen = lua_newuserdatauv(L, sizeof(mcp_funcgen_t), 2);
memset(fgen, 0, sizeof(mcp_funcgen_t));
fgen->thread = t;
fgen->free_max = FGEN_DEFAULT_FREELIST_SIZE;
fgen->list = calloc(fgen->free_max, sizeof(mcp_rcontext_t *));
luaL_getmetatable(L, "mcp.funcgen");
lua_setmetatable(L, -2);
// the table we will use to hold references to rctx's
lua_createtable(L, 8, 0);
// set our table into the uservalue 1 of fgen (idx -2)
// pops the table.
lua_setiuservalue(L, -2, 1);
return 1;
}
int mcplib_funcgen_new_handle(lua_State *L) {
mcp_funcgen_t *fgen = lua_touserdata(L, 1);
mcp_pool_proxy_t *pp = NULL;
mcp_funcgen_t *fg = NULL;
if (fgen->ready) {
proxy_lua_error(L, "cannot modify function generator after calling ready");
return 0;
}
if ((pp = luaL_testudata(L, 2, "mcp.pool_proxy")) != NULL) {
// good.
} else if ((fg = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
if (fg->router.type != FGEN_ROUTER_NONE) {
proxy_lua_error(L, "cannot assign a router to a handle in new_handle");
return 0;
}
if (fg->closed) {
proxy_lua_error(L, "cannot use a replaced function in new_handle");
return 0;
}
} else {
proxy_lua_error(L, "invalid argument to new_handle");
return 0;
}
fgen->max_queues++;
if (fgen->queue_list == NULL) {
fgen->queue_list = malloc(sizeof(struct mcp_rqueue_s));
} else {
fgen->queue_list = realloc(fgen->queue_list, fgen->max_queues * sizeof(struct mcp_rqueue_s));
}
if (fgen->queue_list == NULL) {
proxy_lua_error(L, "failed to realloc queue list during new_handle()");
return 0;
}
struct mcp_rqueue_s *rqu = &fgen->queue_list[fgen->max_queues-1];
memset(rqu, 0, sizeof(*rqu));
if (pp) {
// pops pp from the stack
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->obj_type = RQUEUE_TYPE_POOL;
rqu->obj = pp;
} else {
// pops the fgen from the stack.
mcp_funcgen_reference(L);
rqu->obj_type = RQUEUE_TYPE_FGEN;
rqu->obj = fg;
}
lua_pushinteger(L, fgen->max_queues-1);
return 1;
}
int mcplib_funcgen_ready(lua_State *L) {
mcp_funcgen_t *fgen = lua_touserdata(L, 1);
luaL_checktype(L, 2, LUA_TTABLE);
if (fgen->ready) {
proxy_lua_error(L, "cannot modify function generator after calling ready");
return 0;
}
if (lua_getfield(L, 2, "f") != LUA_TFUNCTION) {
proxy_lua_error(L, "Must specify generator function ('f') to fgen:ready");
return 0;
}
fgen->generator_ref = luaL_ref(L, LUA_REGISTRYINDEX);
if (lua_getfield(L, 2, "a") != LUA_TNIL) {
fgen->argument_ref = luaL_ref(L, LUA_REGISTRYINDEX);
} else {
lua_pop(L, 1);
}
if (lua_getfield(L, 2, "n") == LUA_TSTRING) {
fgen->name_ref = luaL_ref(L, LUA_REGISTRYINDEX);
} else {
lua_pop(L, 1);
}
// now we test the generator function and create the first slot.
lua_pushvalue(L, 1); // copy the funcgen to pass into gencall
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->generator_ref); // for gencall
_mcplib_funcgen_gencall(L);
lua_pop(L, 1); // drop extra funcgen ref.
// add us to the global state
const char *name = NULL;
if (fgen->name_ref) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->name_ref);
name = lua_tostring(L, -1);
lua_pop(L, 1);
} else {
name = "anonymous";
}
mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, name, 1);
fgen->ready = true;
return 1;
}
// Handlers for request contexts
int mcplib_rcontext_handle_set_cb(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);
luaL_checktype(L, 2, LUA_TNUMBER);
luaL_checktype(L, 3, LUA_TFUNCTION);
int handle = lua_tointeger(L, 2);
if (handle < 0 || handle >= rctx->fgen->max_queues) {
proxy_lua_error(L, "invalid handle passed to queue_set_cb");
return 0;
}
struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
if (rqu->cb_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
}
rqu->cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
return 0;
}
// call with request object on top of stack.
// pops the request object
static void _mcplib_rcontext_queue(lua_State *L, mcp_rcontext_t *rctx, mcp_request_t *rq, int handle) {
if (handle < 0 || handle >= rctx->fgen->max_queues) {
proxy_lua_error(L, "invalid handle passed to queue");
return;
}
struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
if (rqu->state != RQUEUE_IDLE) {
lua_pop(L, 1);
return;
}
// If we're queueing to an fgen, arm the coroutine while we have the
// objects handy. Else this requires roundtripping a luaL_ref/luaL_unref
// later.
if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
mcp_rcontext_t *subrctx = rqu->obj;
lua_pushvalue(L, 2); // duplicate the request obj
lua_rawgeti(subrctx->Lc, LUA_REGISTRYINDEX, subrctx->function_ref);
lua_xmove(L, subrctx->Lc, 1); // move the requet object.
subrctx->pending_reqs++;
}
// hold the request reference.
rqu->req_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->state = RQUEUE_QUEUED;
rqu->rq = rq;
}
// first arg is rcontext
// then a request object
// then either a handle (integer) or array style table of handles
int mcplib_rcontext_enqueue(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);
mcp_request_t *rq = luaL_checkudata(L, 2, "mcp.request");
if (rctx->wait_mode != QWAIT_IDLE) {
proxy_lua_error(L, "enqueue: cannot enqueue new requests while in a wait");
return 0;
}
if (!rq->pr.keytoken) {
proxy_lua_error(L, "cannot queue requests without a key");
return 0;
}
int type = lua_type(L, 3);
if (type == LUA_TNUMBER) {
int handle = lua_tointeger(L, 3);
lua_pushvalue(L, 2);
_mcplib_rcontext_queue(L, rctx, rq, handle);
} else if (type == LUA_TTABLE) {
unsigned int len = lua_rawlen(L, 3);
for (int x = 0; x < len; x++) {
type = lua_rawgeti(L, 3, x+1);
if (type != LUA_TNUMBER) {
proxy_lua_error(L, "invalid handle passed to queue via array table");
return 0;
}
int handle = lua_tointeger(L, 4);
lua_pop(L, 1);
lua_pushvalue(L, 2);
_mcplib_rcontext_queue(L, rctx, rq, handle);
}
} else {
proxy_lua_error(L, "must pass a handle or a table to queue");
return 0;
}
return 0;
}
// TODO: pre-generate a result object into sub-rctx's that we can pull up for
// this, instead of allocating outside of a protected call.
static void _mcp_resume_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueue_s *rqu) {
// we have an error. need to mark the error into the parent rqu
rqu->flags |= RQUEUE_R_ERROR|RQUEUE_R_ANY;
mcp_resp_t *r = mcp_prep_bare_resobj(rctx->Lc, rctx->fgen->thread);
r->status = MCMC_ERR;
r->resp.code = MCMC_CODE_SERVER_ERROR;
assert(rqu->res_ref == 0);
rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
mcp_process_rqueue_return(rctx->parent, rctx->parent_handle, r);
if (rctx->parent->wait_count) {
mcp_process_rctx_wait(rctx->parent, rctx->parent_handle);
}
}
static void _mcp_start_rctx_process_error(mcp_rcontext_t *rctx, struct mcp_rqueue_s *rqu) {
// we have an error. need to mark the error into the parent rqu
rqu->flags |= RQUEUE_R_ERROR|RQUEUE_R_ANY;
mcp_resp_t *r = mcp_prep_bare_resobj(rctx->Lc, rctx->fgen->thread);
r->status = MCMC_ERR;
r->resp.code = MCMC_CODE_SERVER_ERROR;
assert(rqu->res_ref == 0);
rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
// queue an IO to return later.
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r);
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = rctx->parent_handle;
p->await_background = true;
}
static void mcp_start_subrctx(mcp_rcontext_t *rctx) {
int res = proxy_run_rcontext(rctx);
struct mcp_rqueue_s *rqu = &rctx->parent->qslots[rctx->parent_handle];
if (res == LUA_OK) {
int type = lua_type(rctx->Lc, 1);
mcp_resp_t *r = NULL;
if (type == LUA_TUSERDATA && (r = luaL_testudata(rctx->Lc, 1, "mcp.response")) != NULL) {
// move stack result object into parent rctx rqu slot.
assert(rqu->res_ref == 0);
rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, r);
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = rctx->parent_handle;
// TODO: change name of property to fast-return once mcp.await is
// retired.
p->await_background = true;
} else if (type == LUA_TSTRING) {
// TODO: wrap with a resobj and parse it.
// for now we bypass the rqueue process handling
// meaning no callbacks/etc.
assert(rqu->res_ref == 0);
rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
rqu->flags |= RQUEUE_R_ANY;
rqu->state = RQUEUE_COMPLETE;
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx->parent, NULL, NULL, NULL);
p->return_cb = proxy_return_rqu_cb;
p->queue_handle = rctx->parent_handle;
p->await_background = true;
} else {
// generate a generic object with an error.
_mcp_start_rctx_process_error(rctx, rqu);
}
} else if (res == LUA_YIELD) {
// normal.
} else {
lua_pop(rctx->Lc, 1); // drop the error message.
_mcp_start_rctx_process_error(rctx, rqu);
}
}
static void mcp_resume_rctx_from_cb(mcp_rcontext_t *rctx) {
int res = proxy_run_rcontext(rctx);
if (rctx->parent) {
struct mcp_rqueue_s *rqu = &rctx->parent->qslots[rctx->parent_handle];
if (res == LUA_OK) {
int type = lua_type(rctx->Lc, 1);
mcp_resp_t *r = NULL;
if (type == LUA_TUSERDATA && (r = luaL_testudata(rctx->Lc, 1, "mcp.response")) != NULL) {
// move stack result object into parent rctx rqu slot.
assert(rqu->res_ref == 0);
rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
mcp_process_rqueue_return(rctx->parent, rctx->parent_handle, r);
} else if (type == LUA_TSTRING) {
// TODO: wrap with a resobj and parse it.
// for now we bypass the rqueue process handling
// meaning no callbacks/etc.
assert(rqu->res_ref == 0);
rqu->res_ref = luaL_ref(rctx->Lc, LUA_REGISTRYINDEX);
rqu->flags |= RQUEUE_R_ANY;
rqu->state = RQUEUE_COMPLETE;
} else {
// generate a generic object with an error.
_mcp_resume_rctx_process_error(rctx, rqu);
}
if (rctx->parent->wait_count) {
mcp_process_rctx_wait(rctx->parent, rctx->parent_handle);
}
mcp_funcgen_return_rctx(rctx);
} else if (res == LUA_YIELD) {
// normal.
} else {
lua_pop(rctx->Lc, 1); // drop the error message.
_mcp_resume_rctx_process_error(rctx, rqu);
mcp_funcgen_return_rctx(rctx);
}
} else {
// Parent rctx has returned either a response or error to its top
// level resp object and is now complete.
// HACK
// see notes in proxy_run_rcontext()
// NOTE: this function is called from rqu_cb(), which pushes the
// submission loop. This code below can call drive_machine(), which
// calls submission loop if stuff is queued.
// Would remove redundancy if we can signal up to rqu_cb() and either
// q->count-- or do the inline submission at that level.
if (res != LUA_YIELD) {
mcp_funcgen_return_rctx(rctx);
io_queue_t *q = conn_io_queue_get(rctx->c, IO_QUEUE_PROXY);
q->count--;
if (q->count == 0) {
// call re-add directly since we're already in the worker thread.
conn_worker_readd(rctx->c);
}
}
}
}
// This "Dummy" IO immediately resumes the yielded function, without a result
// attached.
static void proxy_return_rqu_dummy_cb(io_pending_t *pending) {
io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
mcp_rcontext_t *rctx = p->rctx;
conn *c = rctx->c;
rctx->pending_reqs--;
assert(rctx->pending_reqs > -1);
lua_settop(rctx->Lc, 0);
lua_pushinteger(rctx->Lc, 0); // return a "0" done count to the function.
mcp_resume_rctx_from_cb(rctx);
do_cache_free(p->thread->io_cache, p);
// We always need a C object right now, but just in case.
if (c) {
// HACK
// see notes above proxy_run_rcontext.
// in case the above resume calls queued new work, we have to submit
// it to the backend handling system here.
for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
if (q->stack_ctx != NULL) {
io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
qcb->submit_cb(q);
}
}
}
}
void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle) {
struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
int status = rqu->flags;
assert(rqu->state == RQUEUE_COMPLETE);
// waiting for some IO's to complete before continuing.
// meaning if we "match good" here, we can resume.
// we can also resume if we are in wait mode but pending_reqs is down
// to 1.
switch (rctx->wait_mode) {
case QWAIT_IDLE:
// should be impossible to get here.
// TODO: find a better path for throwing real errors from these
// side cases. would feel better long term.
abort();
break;
case QWAIT_GOOD:
if (status & RQUEUE_R_GOOD) {
rctx->wait_done++;
rqu->state = RQUEUE_WAITED;
}
break;
case QWAIT_OK:
if (status & (RQUEUE_R_GOOD|RQUEUE_R_OK)) {
rctx->wait_done++;
rqu->state = RQUEUE_WAITED;
}
break;
case QWAIT_ANY:
rctx->wait_done++;
rqu->state = RQUEUE_WAITED;
break;
case QWAIT_FASTGOOD:
if (status & RQUEUE_R_GOOD) {
rctx->wait_done++;
rqu->state = RQUEUE_WAITED;
// resume early if "good"
status |= RQUEUE_R_RESUME;
} else if (status & RQUEUE_R_OK) {
// count but don't resume early if "ok"
rctx->wait_done++;
rqu->state = RQUEUE_WAITED;
}
break;
case QWAIT_HANDLE:
// waiting for a specific handle to return
if (handle == rctx->wait_handle) {
rctx->wait_done++;
rqu->state = RQUEUE_WAITED;
}
break;
}
assert(rctx->pending_reqs != 0);
if ((status & RQUEUE_R_RESUME) || rctx->wait_done == rctx->wait_count || rctx->pending_reqs == 1) {
// ran out of stuff to wait for. time to resume.
// TODO: can we do the settop at the yield? nothing we need to
// keep in the stack in this mode.
lua_settop(rctx->Lc, 0);
rctx->wait_count = 0;
if (rctx->wait_mode == QWAIT_HANDLE) {
mcp_rcontext_push_rqu_res(rctx->Lc, rctx, handle);
} else {
lua_pushinteger(rctx->Lc, rctx->wait_done);
}
rctx->wait_mode = QWAIT_IDLE;
mcp_resume_rctx_from_cb(rctx);
}
}
// sets the slot's return status code, to be used for filtering responses
// later.
// if a callback was set, execute it now.
int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res) {
struct mcp_rqueue_s *rqu = &rctx->qslots[handle];
uint8_t flag = RQUEUE_R_ANY;
assert(rqu->state == RQUEUE_ACTIVE);
rqu->state = RQUEUE_COMPLETE;
if (res->status == MCMC_OK) {
if (res->resp.code != MCMC_CODE_END) {
flag = RQUEUE_R_GOOD;
} else {
flag = RQUEUE_R_OK;
}
}
if (rqu->cb_ref) {
lua_settop(rctx->Lc, 0);
lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->cb_ref);
lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->res_ref);
lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->req_ref);
if (lua_pcall(rctx->Lc, 2, 2, 0) != LUA_OK) {
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(rctx->Lc, -1));
} else if (lua_isinteger(rctx->Lc, 1)) {
// allow overriding the result flag from the callback.
enum mcp_rqueue_e mode = lua_tointeger(rctx->Lc, 1);
switch (mode) {
case QWAIT_GOOD:
flag = RQUEUE_R_GOOD;
break;
case QWAIT_OK:
flag = RQUEUE_R_OK;
break;
case QWAIT_ANY:
break;
default:
// ANY
break;
}
// if second result return shortcut status code
if (lua_toboolean(rctx->Lc, 2)) {
flag |= RQUEUE_R_RESUME;
}
}
lua_settop(rctx->Lc, 0); // FIXME: This might not be necessary.
// we settop _before_ calling cb's and
// _before_ setting up for a coro resume.
}
rqu->flags |= flag;
return rqu->flags;
}
// specific function for queue-based returns.
static void proxy_return_rqu_cb(io_pending_t *pending) {
io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
mcp_rcontext_t *rctx = p->rctx;
// Hold the client object before we potentially return the rctx below.
conn *c = rctx->c;
if (p->client_resp) {
mcp_process_rqueue_return(rctx, p->queue_handle, p->client_resp);
}
rctx->pending_reqs--;
assert(rctx->pending_reqs > -1);
if (rctx->wait_count) {
mcp_process_rctx_wait(rctx, p->queue_handle);
} else {
mcp_funcgen_return_rctx(rctx);
}
do_cache_free(p->thread->io_cache, p);
// We always need a C object right now, but just in case.
if (c) {
// HACK
// see notes above proxy_run_rcontext.