1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module app; 7 8 import core.memory : GC; 9 import core.thread : Thread; 10 import core.time : dur; 11 import logger = std.experimental.logger; 12 import std.algorithm : sort; 13 import std.array : array, appender, empty; 14 import std.datetime : Clock, Duration, dur, SysTime; 15 import std.datetime.stopwatch : StopWatch, AutoStart; 16 import std.exception : collectException; 17 import std.functional : toDelegate; 18 import std.stdio : writeln, writefln; 19 import std.typecons : tuple, Tuple; 20 21 import my.actor; 22 import my.stat; 23 import my.gc.refc; 24 25 immutable MByte = 1024.0 * 1024.0; 26 27 void main(string[] args) { 28 import std.file : thisExePath; 29 import std.format : format; 30 import std.path : baseName; 31 import std.traits; 32 static import std.getopt; 33 34 TestFn[string] metrics; 35 metrics["create"] = toDelegate(&testActorCreate); 36 metrics["send_msg"] = toDelegate(&testActorMsg); 37 metrics["delayed_msg1"] = () => testActorDelayedMsg(1.dur!"msecs", 5.dur!"msecs", 1000); 38 metrics["delayed_msg10"] = () => testActorDelayedMsg(12.dur!"msecs", 10.dur!"msecs", 100); 39 metrics["delayed_msg100"] = () => testActorDelayedMsg(100.dur!"msecs", 100.dur!"msecs", 10); 40 metrics["delayed_msg1000"] = () => testActorDelayedMsg(1000.dur!"msecs", 1000.dur!"msecs", 5); 41 metrics["slow_action"] = toDelegate(&testActorSlowAction); 42 43 string[] metricName; 44 uint repeatTimes = 1; 45 auto helpInfo = std.getopt.getopt(args, "m|metric", format("metric to run %s", 46 metrics.byKey), &metricName, "r|repeat", "repeat the metric test", &repeatTimes); 47 48 if (helpInfo.helpWanted) { 49 std.getopt.defaultGetoptPrinter(format!"usage: %s <options>\n"(thisExePath.baseName), 50 helpInfo.options); 51 return; 52 } 53 54 metricName = metricName.empty ? metrics.byKey.array.sort.array : metricName; 55 56 foreach (const iter; 0 .. repeatTimes) { 57 writeln("# Iteration ", iter); 58 foreach (m; metricName) { 59 writeln("##############"); 60 run(metrics[m]); 61 writeln; 62 } 63 } 64 } 65 66 alias TestFn = Metric delegate(); 67 68 void run(TestFn t) { 69 auto m = t(); 70 writeln("data points ", m.values.length); 71 auto data = m.values.makeData; 72 auto bstat = basicStat(data); 73 writeln(bstat); 74 writeln("95% is < ", (bstat.mean.value + bstat.sd.value * 2.0) / 1000000.0, " ms"); 75 writeln("bytes per actor ", m.mem); 76 } 77 78 struct Metric { 79 double[] values; 80 double mem; 81 } 82 83 struct Mem { 84 ulong start; 85 double peek() { 86 const used = GC.stats.usedSize; 87 if (used < start) 88 return start - used; 89 return used - start; 90 } 91 } 92 93 Mem mem() { 94 return Mem(GC.stats.usedSize); 95 } 96 97 Metric testActorCreate() { 98 writeln("# Test time to create an actor"); 99 writeln("unit: nanoseconds"); 100 101 Metric rval; 102 103 auto sys = makeSystem; 104 auto m = mem; 105 auto perf() { 106 auto sw = StopWatch(AutoStart.yes); 107 foreach (_; 0 .. 1000) 108 sys.spawn((Actor* a) => impl(a, (int a) {})); 109 rval.values ~= sw.peek.total!"nsecs" / 1000.0; 110 } 111 112 foreach (_; 0 .. 1000) 113 perf; 114 115 rval.mem = m.peek / 1000000.0; 116 117 return rval; 118 } 119 120 Metric testActorMsg() { 121 writeln("# How long does it take to send an actor message from actor a->b"); 122 writeln("unit: nanoseconds"); 123 124 Metric rval; 125 126 auto sys = makeSystem; 127 auto m = mem; 128 ulong nrActors; 129 auto perf() { 130 int count; 131 auto a1 = sys.spawn((Actor* a) => impl(a, (ref Capture!(int*, "count") c, int x) { 132 (*c.count)++; 133 }, capture(&count))); 134 nrActors++; 135 136 Actor* spawnA2(Actor* self) { 137 static void fn(ref Capture!(Actor*, "self", WeakAddress, "a1") c, int x) { 138 send(c.a1, x); 139 send(c.self.address, x + 1); 140 if (x > 100) 141 c.self.shutdown; 142 } 143 144 return impl(self, &fn, capture(self, a1)); 145 } 146 147 auto actors = appender!(WeakAddress[])(); 148 actors.put(a1); 149 foreach (_; 0 .. 100) { 150 actors.put(sys.spawn(&spawnA2)); 151 nrActors++; 152 } 153 154 auto sw = StopWatch(AutoStart.yes); 155 foreach (a; actors.data) 156 send(a, 1); 157 158 int reqs; 159 while (count < 10000) { 160 Thread.sleep(1.dur!"msecs"); 161 } 162 rval.values ~= sw.peek.total!"nsecs" / cast(double) count; 163 164 foreach (a; actors.data) 165 sendExit(a, ExitReason.userShutdown); 166 } 167 168 foreach (_; 0 .. 100) 169 perf; 170 171 rval.mem = m.peek / cast(double) nrActors; 172 173 return rval; 174 } 175 176 Metric testActorDelayedMsg(Duration delayFor, Duration rate, const ulong dataPoints) { 177 writeln("# Test delayed message trigger jitter"); 178 writefln("delay: %s rate: %s", delayFor, rate); 179 writeln("What is the jitter of a delayed message compared to the expected arrival time"); 180 writeln("unit: nanoseconds"); 181 182 Metric rval; 183 184 import std.parallelism; 185 186 auto sys = System(new TaskPool(4), true); 187 auto m = mem; 188 189 auto perf() { 190 static struct Get { 191 } 192 193 static struct Msg { 194 SysTime expectedArrival; 195 } 196 197 static struct StartMsg { 198 } 199 200 auto sender = sys.spawn((Actor* self) { 201 self.name = "sender"; 202 //self.exitHandler((ref Actor self, ExitMsg m) nothrow{ 203 // logger.info("sender exit").collectException; 204 // self.shutdown; 205 //}); 206 207 return impl(self, (ref Capture!(Actor*, "self", Duration, "delay", 208 Duration, "rate") ctx, WeakAddress recv) { 209 delayedSend(recv, delay(ctx.delay), Msg(Clock.currTime + ctx.delay)); 210 delayedSend(ctx.self, delay(ctx.rate), recv); 211 }, capture(self, delayFor, rate)); 212 }); 213 214 auto collector = sys.spawn((Actor* self) { 215 self.name = "collector"; 216 self.exitHandler((ref Actor self, ExitMsg m) nothrow{ 217 logger.info("collector exit").collectException; 218 self.shutdown; 219 }); 220 221 auto st = tuple!("diffs")(refCounted((double[]).init)); 222 alias CT = typeof(st); 223 return impl(self, (ref CT ctx, Duration d) { 224 ctx.diffs.get ~= cast(double) d.total!"nsecs"; 225 }, capture(st), (ref CT ctx, Get _) { 226 auto tmp = ctx.diffs.get.dup; 227 ctx.diffs.get = null; 228 return tmp; 229 }, capture(st)); 230 }); 231 232 auto recv = sys.spawn((Actor* self, WeakAddress collector) { 233 self.name = "recv"; 234 //self.exitHandler((ref Actor self, ExitMsg m) nothrow{ 235 // logger.info("recv exit").collectException; 236 // self.shutdown; 237 //}); 238 239 return impl(self, (ref Capture!(WeakAddress, "collector") ctx, Msg m) { 240 send(ctx.collector, Clock.currTime - m.expectedArrival); 241 }, capture(collector)); 242 }, collector); 243 244 // one dies, both goes down. 245 collector.linkTo(sender); 246 collector.linkTo(recv); 247 send(sender, recv); 248 249 auto self = scopedActor; 250 double[] values; 251 while (values.length < dataPoints) { 252 self.request(collector, infTimeout).send(Get.init).then((double[] d) { 253 values ~= d; 254 }); 255 Thread.sleep(50.dur!"msecs"); 256 } 257 rval.values ~= values; 258 sendExit(collector, ExitReason.userShutdown); 259 } 260 261 foreach (_; 0 .. 10) 262 perf; 263 264 return rval; 265 } 266 267 Metric testActorSlowAction() { 268 auto sys = makeSystem; 269 270 static struct Msg { 271 SysTime ts; 272 int cnt; 273 } 274 275 auto actor1 = sys.spawn((Actor* self) { 276 static void handler(ref Capture!(Actor*, "self") ctx, WeakAddress addr, int cnt) { 277 logger.infof("actor1: send request %s %s", cnt, Clock.currTime); 278 ctx.self.request(addr, infTimeout).send(Msg(Clock.currTime, cnt)).then((Msg msg) { 279 logger.infof("actor1: reply %s %s", msg.cnt, Clock.currTime - msg.ts); 280 }, (ref Actor self, ErrorMsg _) @safe nothrow{ 281 logger.info("timeout").collectException; 282 }); 283 if (cnt < 100) 284 delayedSend(ctx.self, 1.dur!"seconds".delay, addr, cnt + 1); 285 //send(ctx.self, addr, cnt+1); 286 } 287 288 return impl(self, &handler, capture(self)); 289 }); 290 291 auto actor2 = sys.spawn((Actor* self) { 292 static Msg handler(Msg msg) { 293 logger.infof("actor2: recv %s %s", msg.cnt, Clock.currTime - msg.ts); 294 return msg; 295 } 296 297 return impl(self, &handler); 298 }); 299 300 send(actor1, actor2, 0); 301 302 foreach (_; 0 .. 5) { 303 import core.thread : Thread; 304 import core.time : dur; 305 306 logger.info("### ", Clock.currTime, " ###"); 307 Thread.sleep(1.dur!"seconds"); 308 } 309 310 return Metric.init; 311 }