1 /**
2 Copyright: Copyright (c) 2020, Sebastiaan de Schaetzen. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Sebastiaan de Schaetzen
5 
6 Author: Joakim Brännström (joakim.brannstrom@gmx.com) (modifications)
7 
8 **All** credit goes to Sebastiaan. It is only copied here for convenience.
9 
10 A simple to use async/await library.
11 
12 # dawait - A simple to use async/await library
13 
14 This library provides a very easy-to-use async/await library for D.
15 It consists of only three functions: `async`, `await`, and `startScheduler`.
16 The library is build on top of D's fibers and allows for easier cooperative multitasking.
17 
18 ## Functionality
19 
20 |Function|Description|
21 |--------|-----------|
22 |`startScheduler(void delegate() callback)`| Starts the scheduler with an initial task.|
23 |`async(void delegate() callback)`|Runs the given delegate in a separate fiber.|
24 |`await(lazy T task)`|Runs the expression in a separate thread. Once the thread has completely, the result is returned.|
25 
26 ## Code Example
27 ```d
28 import std.stdio;
29 
30 int calculateTheAnswer() {
31     import core.thread : Thread;
32     Thread.sleep(5.seconds);
33     return 42;
34 }
35 
36 void doTask() {
37     writeln("Calculating the answer to life, the universe, and everything...");
38     int answer = await(calculateTheAnswer());
39     writeln("The answer is: ", answer);
40 }
41 
42 void main() {
43     startScheduler({
44         doTask();
45     });
46 }
47 ```
48 */
49 module my.await;
50 
51 import std.parallelism;
52 import std.container;
53 import core.thread.fiber;
54 import core.sync.semaphore;
55 
56 private SList!Fiber fibersQueued = SList!Fiber();
57 private size_t globalWaitingOnThreads = 0;
58 private shared Semaphore globalSync;
59 
60 /**
61 Creates an async task.
62 An async task is a task that will be running in a separate fiber, independent
63 from the current fiber.
64 
65 Params:
66     task = The task to run.
67 */
68 void async(void delegate() task) {
69     auto fiber = new Fiber(task);
70     fibersQueued.insert(fiber);
71 }
72 
73 @("async queues task")
74 unittest {
75     scope (exit)
76         fibersQueued = SList!Fiber();
77     // there should be no queued tasks at first"
78     assert(fibersQueued.empty);
79     async({});
80     // there should be a single task
81     assert(!fibersQueued.empty);
82 }
83 
84 @("async should not immediately execute its task")
85 unittest {
86     scope (exit)
87         fibersQueued = SList!Fiber();
88     bool executed = false;
89     auto executeIt = { executed = true; };
90     async(executeIt);
91     // async should not execute its operand
92     assert(!executed);
93 }
94 
95 /**
96 Runs the argument in a separate task, waiting for the result.
97 */
98 T await(T)(lazy T task)
99 in (Fiber.getThis() !is null && globalSync !is null) {
100     globalWaitingOnThreads++;
101     shared finished = false;
102 
103     auto semaphore = globalSync;
104     T result;
105     scopedTask({
106         scope (exit)
107             finished = true;
108         assert(semaphore !is null);
109         result = task;
110         (cast(Semaphore) semaphore).notify();
111     }).executeInNewThread();
112 
113     while (!finished) {
114         Fiber.yield();
115     }
116     globalWaitingOnThreads--;
117 
118     return result;
119 }
120 
121 @("await can run a quick thread")
122 unittest {
123     scope (exit)
124         fibersQueued = SList!Fiber();
125     bool executed = false;
126     startScheduler({ await(executed = true); });
127     // a quick thread should run
128     assert(executed);
129 }
130 
131 @("await can run a slow thread")
132 unittest {
133     scope (exit)
134         fibersQueued = SList!Fiber();
135     bool executed = false;
136 
137     bool largeTask() {
138         import core.thread : Thread;
139 
140         Thread.sleep(2.seconds);
141         executed = true;
142         return true;
143     }
144 
145     startScheduler({ await(largeTask()); });
146     // a slow thread should run
147     assert(executed);
148 }
149 
150 @("await should return the value that was calculated")
151 unittest {
152     scope (exit)
153         fibersQueued = SList!Fiber();
154     bool executed = false;
155 
156     bool someTask() {
157         return true;
158     }
159 
160     startScheduler({ executed = await(someTask()); });
161     // a slow thread should run
162     assert(executed);
163 }
164 
165 /**
166 Starts the scheduler.
167 */
168 void startScheduler(void delegate() firstTask) {
169     globalSync = cast(shared) new Semaphore;
170     async({ firstTask(); });
171 
172     while (!fibersQueued.empty) {
173         auto fibersRunning = fibersQueued;
174         fibersQueued = SList!Fiber();
175         foreach (Fiber fiber; fibersRunning) {
176             fiber.call();
177             if (fiber.state != Fiber.State.TERM)
178                 fibersQueued.insert(fiber);
179         }
180 
181         if (globalWaitingOnThreads > 0) {
182             (cast(Semaphore) globalSync).wait();
183         }
184     }
185 }
186 
187 @("startScheduler should run initial task")
188 unittest {
189     scope (exit)
190         fibersQueued = SList!Fiber();
191     bool executed = false;
192     startScheduler({ executed = true; });
193     // startScheduler should execute the initial task
194     assert(executed);
195 }
196 
197 @("startScheduler should also run tasks registered before itself")
198 unittest {
199     scope (exit)
200         fibersQueued = SList!Fiber();
201     bool executed = false;
202     async({ executed = true; });
203     startScheduler({});
204     // startScheduler should execute the task executed before itself
205     assert(executed);
206 }
207 
208 @("startScheduler should also run tasks registered by the initial task")
209 unittest {
210     scope (exit)
211         fibersQueued = SList!Fiber();
212     bool executed = false;
213     startScheduler({ async({ executed = true; }); });
214     // startScheduler should execute the task created during the initial task
215     assert(executed);
216 }