1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module app;
7 
8 import core.memory : GC;
9 import core.thread : Thread;
10 import core.time : dur;
11 import logger = std.experimental.logger;
12 import std.algorithm : sort;
13 import std.array : array, appender, empty;
14 import std.datetime : Clock, Duration, dur, SysTime;
15 import std.datetime.stopwatch : StopWatch, AutoStart;
16 import std.exception : collectException;
17 import std.functional : toDelegate;
18 import std.stdio : writeln, writefln;
19 import std.typecons : tuple, Tuple;
20 
21 import my.actor;
22 import my.stat;
23 import my.gc.refc;
24 
25 immutable MByte = 1024.0 * 1024.0;
26 
27 void main(string[] args) {
28     import std.file : thisExePath;
29     import std.format : format;
30     import std.path : baseName;
31     import std.traits;
32     static import std.getopt;
33 
34     TestFn[string] metrics;
35     metrics["create"] = toDelegate(&testActorCreate);
36     metrics["send_msg"] = toDelegate(&testActorMsg);
37     metrics["delayed_msg1"] = () => testActorDelayedMsg(1.dur!"msecs", 5.dur!"msecs", 1000);
38     metrics["delayed_msg10"] = () => testActorDelayedMsg(12.dur!"msecs", 10.dur!"msecs", 100);
39     metrics["delayed_msg100"] = () => testActorDelayedMsg(100.dur!"msecs", 100.dur!"msecs", 10);
40     metrics["delayed_msg1000"] = () => testActorDelayedMsg(1000.dur!"msecs", 1000.dur!"msecs", 5);
41 
42     string[] metricName;
43     uint repeatTimes = 1;
44     auto helpInfo = std.getopt.getopt(args, "m|metric", format("metric to run %s",
45             metrics.byKey), &metricName, "r|repeat", "repeat the metric test", &repeatTimes);
46 
47     if (helpInfo.helpWanted) {
48         std.getopt.defaultGetoptPrinter(format!"usage: %s <options>\n"(thisExePath.baseName),
49                 helpInfo.options);
50         return;
51     }
52 
53     metricName = metricName.empty ? metrics.byKey.array.sort.array : metricName;
54 
55     foreach (const iter; 0 .. repeatTimes) {
56         writeln("# Iteration ", iter);
57         foreach (m; metricName) {
58             writeln("##############");
59             run(metrics[m]);
60             writeln;
61         }
62     }
63 }
64 
65 alias TestFn = Metric delegate();
66 
67 void run(TestFn t) {
68     auto m = t();
69     writeln("data points ", m.values.length);
70     auto data = m.values.makeData;
71     auto bstat = basicStat(data);
72     writeln(bstat);
73     writeln("95% is < ", (bstat.mean.value + bstat.sd.value * 2.0) / 1000000.0, " ms");
74     writeln("bytes per actor ", m.mem);
75 }
76 
77 struct Metric {
78     double[] values;
79     double mem;
80 }
81 
82 struct Mem {
83     ulong start;
84     double peek() {
85         const used = GC.stats.usedSize;
86         if (used < start)
87             return start - used;
88         return used - start;
89     }
90 }
91 
92 Mem mem() {
93     return Mem(GC.stats.usedSize);
94 }
95 
96 Metric testActorCreate() {
97     writeln("# Test time to create an actor");
98     writeln("unit: nanoseconds");
99 
100     Metric rval;
101 
102     auto sys = makeSystem;
103     auto m = mem;
104     auto perf() {
105         auto sw = StopWatch(AutoStart.yes);
106         foreach (_; 0 .. 1000)
107             sys.spawn((Actor* a) => impl(a, (int a) {}));
108         rval.values ~= sw.peek.total!"nsecs" / 1000.0;
109     }
110 
111     foreach (_; 0 .. 1000)
112         perf;
113 
114     rval.mem = m.peek / 1000000.0;
115 
116     return rval;
117 }
118 
119 Metric testActorMsg() {
120     writeln("# How long does it take to send an actor message from actor a->b");
121     writeln("unit: nanoseconds");
122 
123     Metric rval;
124 
125     auto sys = makeSystem;
126     auto m = mem;
127     ulong nrActors;
128     auto perf() {
129         int count;
130         auto a1 = sys.spawn((Actor* a) => impl(a, (ref Capture!(int*, "count") c, int x) {
131                 (*c.count)++;
132             }, capture(&count)));
133         nrActors++;
134 
135         Actor* spawnA2(Actor* self) {
136             static void fn(ref Capture!(Actor*, "self", WeakAddress, "a1") c, int x) {
137                 send(c.a1, x);
138                 send(c.self.address, x + 1);
139                 if (x > 100)
140                     c.self.shutdown;
141             }
142 
143             return impl(self, &fn, capture(self, a1));
144         }
145 
146         auto actors = appender!(WeakAddress[])();
147         actors.put(a1);
148         foreach (_; 0 .. 100) {
149             actors.put(sys.spawn(&spawnA2));
150             nrActors++;
151         }
152 
153         auto sw = StopWatch(AutoStart.yes);
154         foreach (a; actors.data)
155             send(a, 1);
156 
157         int reqs;
158         while (count < 10000) {
159             Thread.sleep(1.dur!"msecs");
160         }
161         rval.values ~= sw.peek.total!"nsecs" / cast(double) count;
162 
163         foreach (a; actors.data)
164             sendExit(a, ExitReason.userShutdown);
165     }
166 
167     foreach (_; 0 .. 100)
168         perf;
169 
170     rval.mem = m.peek / cast(double) nrActors;
171 
172     return rval;
173 }
174 
175 Metric testActorDelayedMsg(Duration delayFor, Duration rate, const ulong dataPoints) {
176     writeln("# Test delayed message trigger jitter");
177     writefln("delay: %s rate: %s", delayFor, rate);
178     writeln("What is the jitter of a delayed message compared to the expected arrival time");
179     writeln("unit: nanoseconds");
180 
181     Metric rval;
182 
183     import std.parallelism;
184 
185     auto sys = System(new TaskPool(4), true);
186     auto m = mem;
187 
188     auto perf() {
189         static struct Get {
190         }
191 
192         static struct Msg {
193             SysTime expectedArrival;
194         }
195 
196         static struct StartMsg {
197         }
198 
199         auto sender = sys.spawn((Actor* self) {
200             self.name = "sender";
201             //self.exitHandler((ref Actor self, ExitMsg m) nothrow{
202             //    logger.info("sender exit").collectException;
203             //    self.shutdown;
204             //});
205 
206             return impl(self, (ref Capture!(Actor*, "self", Duration, "delay",
207                 Duration, "rate") ctx, WeakAddress recv) {
208                 delayedSend(recv, delay(ctx.delay), Msg(Clock.currTime + ctx.delay));
209                 delayedSend(ctx.self, delay(ctx.rate), recv);
210             }, capture(self, delayFor, rate));
211         });
212 
213         auto collector = sys.spawn((Actor* self) {
214             self.name = "collector";
215             self.exitHandler((ref Actor self, ExitMsg m) nothrow{
216                 logger.info("collector exit").collectException;
217                 self.shutdown;
218             });
219 
220             auto st = tuple!("diffs")(refCounted((double[]).init));
221             alias CT = typeof(st);
222             return impl(self, (ref CT ctx, Duration d) {
223                 ctx.diffs.get ~= cast(double) d.total!"nsecs";
224             }, capture(st), (ref CT ctx, Get _) {
225                 auto tmp = ctx.diffs.get.dup;
226                 ctx.diffs.get = null;
227                 return tmp;
228             }, capture(st));
229         });
230 
231         auto recv = sys.spawn((Actor* self, WeakAddress collector) {
232             self.name = "recv";
233             //self.exitHandler((ref Actor self, ExitMsg m) nothrow{
234             //    logger.info("recv exit").collectException;
235             //    self.shutdown;
236             //});
237 
238             return impl(self, (ref Capture!(WeakAddress, "collector") ctx, Msg m) {
239                 send(ctx.collector, Clock.currTime - m.expectedArrival);
240             }, capture(collector));
241         }, collector);
242 
243         // one dies, both goes down.
244         collector.linkTo(sender);
245         collector.linkTo(recv);
246         send(sender, recv);
247 
248         auto self = scopedActor;
249         double[] values;
250         while (values.length < dataPoints) {
251             self.request(collector, infTimeout).send(Get.init).then((double[] d) {
252                 values ~= d;
253             });
254             Thread.sleep(50.dur!"msecs");
255         }
256         rval.values ~= values;
257         sendExit(collector, ExitReason.userShutdown);
258     }
259 
260     foreach (_; 0 .. 10)
261         perf;
262 
263     return rval;
264 }