1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module app;
7 
8 import core.memory : GC;
9 import core.thread : Thread;
10 import core.time : dur;
11 import logger = std.experimental.logger;
12 import std.algorithm : sort;
13 import std.array : array, appender, empty;
14 import std.datetime : Clock, Duration, dur, SysTime;
15 import std.datetime.stopwatch : StopWatch, AutoStart;
16 import std.exception : collectException;
17 import std.functional : toDelegate;
18 import std.stdio : writeln, writefln;
19 import std.typecons : tuple, Tuple;
20 
21 import my.actor;
22 import my.stat;
23 import my.gc.refc;
24 
25 immutable MByte = 1024.0 * 1024.0;
26 
27 void main(string[] args) {
28     import std.file : thisExePath;
29     import std.format : format;
30     import std.path : baseName;
31     import std.traits;
32     static import std.getopt;
33 
34     TestFn[string] metrics;
35     metrics["create"] = toDelegate(&testActorCreate);
36     metrics["send_msg"] = toDelegate(&testActorMsg);
37     metrics["delayed_msg1"] = () => testActorDelayedMsg(1.dur!"msecs", 5.dur!"msecs", 1000);
38     metrics["delayed_msg10"] = () => testActorDelayedMsg(12.dur!"msecs", 10.dur!"msecs", 100);
39     metrics["delayed_msg100"] = () => testActorDelayedMsg(100.dur!"msecs", 100.dur!"msecs", 10);
40     metrics["delayed_msg1000"] = () => testActorDelayedMsg(1000.dur!"msecs", 1000.dur!"msecs", 5);
41     metrics["slow_action"] = toDelegate(&testActorSlowAction);
42 
43     string[] metricName;
44     uint repeatTimes = 1;
45     auto helpInfo = std.getopt.getopt(args, "m|metric", format("metric to run %s",
46             metrics.byKey), &metricName, "r|repeat", "repeat the metric test", &repeatTimes);
47 
48     if (helpInfo.helpWanted) {
49         std.getopt.defaultGetoptPrinter(format!"usage: %s <options>\n"(thisExePath.baseName),
50                 helpInfo.options);
51         return;
52     }
53 
54     metricName = metricName.empty ? metrics.byKey.array.sort.array : metricName;
55 
56     foreach (const iter; 0 .. repeatTimes) {
57         writeln("# Iteration ", iter);
58         foreach (m; metricName) {
59             writeln("##############");
60             run(metrics[m]);
61             writeln;
62         }
63     }
64 }
65 
66 alias TestFn = Metric delegate();
67 
68 void run(TestFn t) {
69     auto m = t();
70     writeln("data points ", m.values.length);
71     auto data = m.values.makeData;
72     auto bstat = basicStat(data);
73     writeln(bstat);
74     writeln("95% is < ", (bstat.mean.value + bstat.sd.value * 2.0) / 1000000.0, " ms");
75     writeln("bytes per actor ", m.mem);
76 }
77 
78 struct Metric {
79     double[] values;
80     double mem;
81 }
82 
83 struct Mem {
84     ulong start;
85     double peek() {
86         const used = GC.stats.usedSize;
87         if (used < start)
88             return start - used;
89         return used - start;
90     }
91 }
92 
93 Mem mem() {
94     return Mem(GC.stats.usedSize);
95 }
96 
97 Metric testActorCreate() {
98     writeln("# Test time to create an actor");
99     writeln("unit: nanoseconds");
100 
101     Metric rval;
102 
103     auto sys = makeSystem;
104     auto m = mem;
105     auto perf() {
106         auto sw = StopWatch(AutoStart.yes);
107         foreach (_; 0 .. 1000)
108             sys.spawn((Actor* a) => impl(a, (int a) {}));
109         rval.values ~= sw.peek.total!"nsecs" / 1000.0;
110     }
111 
112     foreach (_; 0 .. 1000)
113         perf;
114 
115     rval.mem = m.peek / 1000000.0;
116 
117     return rval;
118 }
119 
120 Metric testActorMsg() {
121     writeln("# How long does it take to send an actor message from actor a->b");
122     writeln("unit: nanoseconds");
123 
124     Metric rval;
125 
126     auto sys = makeSystem;
127     auto m = mem;
128     ulong nrActors;
129     auto perf() {
130         int count;
131         auto a1 = sys.spawn((Actor* a) => impl(a, (ref Capture!(int*, "count") c, int x) {
132                 (*c.count)++;
133             }, capture(&count)));
134         nrActors++;
135 
136         Actor* spawnA2(Actor* self) {
137             static void fn(ref Capture!(Actor*, "self", WeakAddress, "a1") c, int x) {
138                 send(c.a1, x);
139                 send(c.self.address, x + 1);
140                 if (x > 100)
141                     c.self.shutdown;
142             }
143 
144             return impl(self, &fn, capture(self, a1));
145         }
146 
147         auto actors = appender!(WeakAddress[])();
148         actors.put(a1);
149         foreach (_; 0 .. 100) {
150             actors.put(sys.spawn(&spawnA2));
151             nrActors++;
152         }
153 
154         auto sw = StopWatch(AutoStart.yes);
155         foreach (a; actors.data)
156             send(a, 1);
157 
158         int reqs;
159         while (count < 10000) {
160             Thread.sleep(1.dur!"msecs");
161         }
162         rval.values ~= sw.peek.total!"nsecs" / cast(double) count;
163 
164         foreach (a; actors.data)
165             sendExit(a, ExitReason.userShutdown);
166     }
167 
168     foreach (_; 0 .. 100)
169         perf;
170 
171     rval.mem = m.peek / cast(double) nrActors;
172 
173     return rval;
174 }
175 
176 Metric testActorDelayedMsg(Duration delayFor, Duration rate, const ulong dataPoints) {
177     writeln("# Test delayed message trigger jitter");
178     writefln("delay: %s rate: %s", delayFor, rate);
179     writeln("What is the jitter of a delayed message compared to the expected arrival time");
180     writeln("unit: nanoseconds");
181 
182     Metric rval;
183 
184     import std.parallelism;
185 
186     auto sys = System(new TaskPool(4), true);
187     auto m = mem;
188 
189     auto perf() {
190         static struct Get {
191         }
192 
193         static struct Msg {
194             SysTime expectedArrival;
195         }
196 
197         static struct StartMsg {
198         }
199 
200         auto sender = sys.spawn((Actor* self) {
201             self.name = "sender";
202             //self.exitHandler((ref Actor self, ExitMsg m) nothrow{
203             //    logger.info("sender exit").collectException;
204             //    self.shutdown;
205             //});
206 
207             return impl(self, (ref Capture!(Actor*, "self", Duration, "delay",
208                 Duration, "rate") ctx, WeakAddress recv) {
209                 delayedSend(recv, delay(ctx.delay), Msg(Clock.currTime + ctx.delay));
210                 delayedSend(ctx.self, delay(ctx.rate), recv);
211             }, capture(self, delayFor, rate));
212         });
213 
214         auto collector = sys.spawn((Actor* self) {
215             self.name = "collector";
216             self.exitHandler((ref Actor self, ExitMsg m) nothrow{
217                 logger.info("collector exit").collectException;
218                 self.shutdown;
219             });
220 
221             auto st = tuple!("diffs")(refCounted((double[]).init));
222             alias CT = typeof(st);
223             return impl(self, (ref CT ctx, Duration d) {
224                 ctx.diffs.get ~= cast(double) d.total!"nsecs";
225             }, capture(st), (ref CT ctx, Get _) {
226                 auto tmp = ctx.diffs.get.dup;
227                 ctx.diffs.get = null;
228                 return tmp;
229             }, capture(st));
230         });
231 
232         auto recv = sys.spawn((Actor* self, WeakAddress collector) {
233             self.name = "recv";
234             //self.exitHandler((ref Actor self, ExitMsg m) nothrow{
235             //    logger.info("recv exit").collectException;
236             //    self.shutdown;
237             //});
238 
239             return impl(self, (ref Capture!(WeakAddress, "collector") ctx, Msg m) {
240                 send(ctx.collector, Clock.currTime - m.expectedArrival);
241             }, capture(collector));
242         }, collector);
243 
244         // one dies, both goes down.
245         collector.linkTo(sender);
246         collector.linkTo(recv);
247         send(sender, recv);
248 
249         auto self = scopedActor;
250         double[] values;
251         while (values.length < dataPoints) {
252             self.request(collector, infTimeout).send(Get.init).then((double[] d) {
253                 values ~= d;
254             });
255             Thread.sleep(50.dur!"msecs");
256         }
257         rval.values ~= values;
258         sendExit(collector, ExitReason.userShutdown);
259     }
260 
261     foreach (_; 0 .. 10)
262         perf;
263 
264     return rval;
265 }
266 
267 Metric testActorSlowAction() {
268     auto sys = makeSystem;
269 
270     static struct Msg {
271         SysTime ts;
272         int cnt;
273     }
274 
275     auto actor1 = sys.spawn((Actor* self) {
276         static void handler(ref Capture!(Actor*, "self") ctx, WeakAddress addr, int cnt) {
277             logger.infof("actor1: send request %s %s", cnt, Clock.currTime);
278             ctx.self.request(addr, infTimeout).send(Msg(Clock.currTime, cnt)).then((Msg msg) {
279                 logger.infof("actor1: reply %s %s", msg.cnt, Clock.currTime - msg.ts);
280             }, (ref Actor self, ErrorMsg _) @safe nothrow{
281                 logger.info("timeout").collectException;
282             });
283             if (cnt < 100)
284                 delayedSend(ctx.self, 1.dur!"seconds".delay, addr, cnt + 1);
285             //send(ctx.self, addr, cnt+1);
286         }
287 
288         return impl(self, &handler, capture(self));
289     });
290 
291     auto actor2 = sys.spawn((Actor* self) {
292         static Msg handler(Msg msg) {
293             logger.infof("actor2: recv %s %s", msg.cnt, Clock.currTime - msg.ts);
294             return msg;
295         }
296 
297         return impl(self, &handler);
298     });
299 
300     send(actor1, actor2, 0);
301 
302     foreach (_; 0 .. 5) {
303         import core.thread : Thread;
304         import core.time : dur;
305 
306         logger.info("### ", Clock.currTime, " ###");
307         Thread.sleep(1.dur!"seconds");
308     }
309 
310     return Metric.init;
311 }