1 /** 2 Copyright: Copyright (c) 2020, Sebastiaan de Schaetzen. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Sebastiaan de Schaetzen 5 6 Author: Joakim Brännström (joakim.brannstrom@gmx.com) (modifications) 7 8 **All** credit goes to Sebastiaan. It is only copied here for convenience. 9 10 A simple to use async/await library. 11 12 # dawait - A simple to use async/await library 13 14 This library provides a very easy-to-use async/await library for D. 15 It consists of only three functions: `async`, `await`, and `startScheduler`. 16 The library is build on top of D's fibers and allows for easier cooperative multitasking. 17 18 ## Functionality 19 20 |Function|Description| 21 |--------|-----------| 22 |`startScheduler(void delegate() callback)`| Starts the scheduler with an initial task.| 23 |`async(void delegate() callback)`|Runs the given delegate in a separate fiber.| 24 |`await(lazy T task)`|Runs the expression in a separate thread. Once the thread has completely, the result is returned.| 25 26 ## Code Example 27 ```d 28 import std.stdio; 29 30 int calculateTheAnswer() { 31 import core.thread : Thread; 32 Thread.sleep(5.seconds); 33 return 42; 34 } 35 36 void doTask() { 37 writeln("Calculating the answer to life, the universe, and everything..."); 38 int answer = await(calculateTheAnswer()); 39 writeln("The answer is: ", answer); 40 } 41 42 void main() { 43 startScheduler({ 44 doTask(); 45 }); 46 } 47 ``` 48 */ 49 module my.await; 50 51 import std.parallelism; 52 import std.container; 53 import core.thread.fiber; 54 import core.sync.semaphore; 55 56 private SList!Fiber fibersQueued = SList!Fiber(); 57 private size_t globalWaitingOnThreads = 0; 58 private shared Semaphore globalSync; 59 60 /** 61 Creates an async task. 62 An async task is a task that will be running in a separate fiber, independent 63 from the current fiber. 64 65 Params: 66 task = The task to run. 67 */ 68 void async(void delegate() task) { 69 auto fiber = new Fiber(task); 70 fibersQueued.insert(fiber); 71 } 72 73 @("async queues task") 74 unittest { 75 scope (exit) 76 fibersQueued = SList!Fiber(); 77 // there should be no queued tasks at first" 78 assert(fibersQueued.empty); 79 async({}); 80 // there should be a single task 81 assert(!fibersQueued.empty); 82 } 83 84 @("async should not immediately execute its task") 85 unittest { 86 scope (exit) 87 fibersQueued = SList!Fiber(); 88 bool executed = false; 89 auto executeIt = { executed = true; }; 90 async(executeIt); 91 // async should not execute its operand 92 assert(!executed); 93 } 94 95 /** 96 Runs the argument in a separate task, waiting for the result. 97 */ 98 T await(T)(lazy T task) 99 in (Fiber.getThis() !is null && globalSync !is null) { 100 globalWaitingOnThreads++; 101 shared finished = false; 102 103 auto semaphore = globalSync; 104 T result; 105 scopedTask({ 106 scope (exit) 107 finished = true; 108 assert(semaphore !is null); 109 result = task; 110 (cast(Semaphore) semaphore).notify(); 111 }).executeInNewThread(); 112 113 while (!finished) { 114 Fiber.yield(); 115 } 116 globalWaitingOnThreads--; 117 118 return result; 119 } 120 121 @("await can run a quick thread") 122 unittest { 123 scope (exit) 124 fibersQueued = SList!Fiber(); 125 bool executed = false; 126 startScheduler({ await(executed = true); }); 127 // a quick thread should run 128 assert(executed); 129 } 130 131 @("await can run a slow thread") 132 unittest { 133 scope (exit) 134 fibersQueued = SList!Fiber(); 135 bool executed = false; 136 137 bool largeTask() { 138 import core.thread : Thread; 139 140 Thread.sleep(2.seconds); 141 executed = true; 142 return true; 143 } 144 145 startScheduler({ await(largeTask()); }); 146 // a slow thread should run 147 assert(executed); 148 } 149 150 @("await should return the value that was calculated") 151 unittest { 152 scope (exit) 153 fibersQueued = SList!Fiber(); 154 bool executed = false; 155 156 bool someTask() { 157 return true; 158 } 159 160 startScheduler({ executed = await(someTask()); }); 161 // a slow thread should run 162 assert(executed); 163 } 164 165 /** 166 Starts the scheduler. 167 */ 168 void startScheduler(void delegate() firstTask) { 169 globalSync = cast(shared) new Semaphore; 170 async({ firstTask(); }); 171 172 while (!fibersQueued.empty) { 173 auto fibersRunning = fibersQueued; 174 fibersQueued = SList!Fiber(); 175 foreach (Fiber fiber; fibersRunning) { 176 fiber.call(); 177 if (fiber.state != Fiber.State.TERM) 178 fibersQueued.insert(fiber); 179 } 180 181 if (globalWaitingOnThreads > 0) { 182 (cast(Semaphore) globalSync).wait(); 183 } 184 } 185 } 186 187 @("startScheduler should run initial task") 188 unittest { 189 scope (exit) 190 fibersQueued = SList!Fiber(); 191 bool executed = false; 192 startScheduler({ executed = true; }); 193 // startScheduler should execute the initial task 194 assert(executed); 195 } 196 197 @("startScheduler should also run tasks registered before itself") 198 unittest { 199 scope (exit) 200 fibersQueued = SList!Fiber(); 201 bool executed = false; 202 async({ executed = true; }); 203 startScheduler({}); 204 // startScheduler should execute the task executed before itself 205 assert(executed); 206 } 207 208 @("startScheduler should also run tasks registered by the initial task") 209 unittest { 210 scope (exit) 211 fibersQueued = SList!Fiber(); 212 bool executed = false; 213 startScheduler({ async({ executed = true; }); }); 214 // startScheduler should execute the task created during the initial task 215 assert(executed); 216 }