src/​examples/​cpp03/​timeouts/​server.​cppsrc/​examples/​cpp11/​timeouts/​server.​cpp
1 /​/​1 /​/​
2 /​/​·​server.​cpp2 /​/​·​server.​cpp
3 /​/​·​~~~~~~~~~~3 /​/​·​~~~~~~~~~~
4 /​/​4 /​/​
5 /​/​·​Copyright·​(c)​·​2003-​2023·​Christopher·​M.​·​Kohlhoff·​(chris·​at·​kohlhoff·​dot·​com)​5 /​/​·​Copyright·​(c)​·​2003-​2023·​Christopher·​M.​·​Kohlhoff·​(chris·​at·​kohlhoff·​dot·​com)​
6 /​/​6 /​/​
7 /​/​·​Distributed·​under·​the·​Boost·​Software·​License,​·​Version·​1.​0.​·​(See·​accompanying7 /​/​·​Distributed·​under·​the·​Boost·​Software·​License,​·​Version·​1.​0.​·​(See·​accompanying
8 /​/​·​file·​LICENSE_1_0.​txt·​or·​copy·​at·​http:​/​/​www.​boost.​org/​LICENSE_1_0.​txt)​8 /​/​·​file·​LICENSE_1_0.​txt·​or·​copy·​at·​http:​/​/​www.​boost.​org/​LICENSE_1_0.​txt)​
9 /​/​9 /​/​
10 10
11 #include·​<algorithm>11 #include·​<algorithm>
12 #include·​<cstdlib>12 #include·​<cstdlib>
13 #include·​<deque>13 #include·​<deque>
14 #include·​<iostream>14 #include·​<iostream>
15 #include·​<memory>
15 #include·​<set>16 #include·​<set>
16 #include·​<string>17 #include·​<string>
17 #include·​<boost/​bind/​bind.​hpp>
18 #include·​<boost/​shared_ptr.​hpp>
19 #include·​<boost/​enable_shared_from_th​is.​hpp>
20 #include·​"asio/​buffer.​hpp"18 #include·​"asio/​buffer.​hpp"
21 #include·​"asio/​io_context.​hpp"19 #include·​"asio/​io_context.​hpp"
22 #include·​"asio/​ip/​tcp.​hpp"20 #include·​"asio/​ip/​tcp.​hpp"
23 #include·​"asio/​ip/​udp.​hpp"21 #include·​"asio/​ip/​udp.​hpp"
24 #include·​"asio/​read_until.​hpp"22 #include·​"asio/​read_until.​hpp"
25 #include·​"asio/​steady_timer.​hpp"23 #include·​"asio/​steady_timer.​hpp"
26 #include·​"asio/​write.​hpp"24 #include·​"asio/​write.​hpp"
27 25
28 using·​asio:​:​steady_timer;​26 using·​asio:​:​steady_timer;​
29 using·​asio:​:​ip:​:​tcp;​27 using·​asio:​:​ip:​:​tcp;​
30 using·​asio:​:​ip:​:​udp;​28 using·​asio:​:​ip:​:​udp;​
31 29
32 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​30 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
33 31
34 class·​subscriber32 class·​subscriber
35 {33 {
36 public:​34 public:​
37 ··​virtual·​~subscriber()​·{}35 ··​virtual·​~subscriber()​·=·default;​
38 ··​virtual·​void·​deliver(const·​std:​:​string&·​msg)​·​=·​0;​36 ··​virtual·​void·​deliver(const·​std:​:​string&·​msg)​·​=·​0;​
39 };​37 };​
40 38
41 typedef·boost:​:​shared_ptr<subscriber​>·​subscriber_ptr;​39 typedef·​std:​:​shared_ptr<subscriber​>·​subscriber_ptr;​
42 40
43 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​41 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
44 42
45 class·​channel43 class·​channel
46 {44 {
47 public:​45 public:​
48 ··​void·​join(subscriber_ptr·​subscriber)​46 ··​void·​join(subscriber_ptr·​subscriber)​
49 ··​{47 ··​{
50 ····​subscribers_.​insert(subscriber)​;​48 ····​subscribers_.​insert(subscriber)​;​
51 ··​}49 ··​}
52 50
53 ··​void·​leave(subscriber_ptr·​subscriber)​51 ··​void·​leave(subscriber_ptr·​subscriber)​
54 ··​{52 ··​{
55 ····​subscribers_.​erase(subscriber)​;​53 ····​subscribers_.​erase(subscriber)​;​
56 ··​}54 ··​}
57 55
58 ··​void·​deliver(const·​std:​:​string&·​msg)​56 ··​void·​deliver(const·​std:​:​string&·​msg)​
59 ··​{57 ··​{
60 ····std:​:​for_each(subscribers_​.​begin()​,​·​subscribers_.​end()​,​58 ····​for·(const·auto&·s·:​·​subscribers_)​
61 ········boost:​:​bind(&subscriber:​:​deliver,​59 ····{
62 ··········boost:​:​placeholders:​:​_1,​·boost:​:​ref(msg)​)​)​;​60 ······​s-​>deliver(msg)​;​
61 ····​}
63 ··​}62 ··​}
64 63
65 private:​64 private:​
66 ··​std:​:​set<subscriber_ptr>·​subscribers_;​65 ··​std:​:​set<subscriber_ptr>·​subscribers_;​
67 };​66 };​
68 67
69 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​68 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
70 69
71 /​/​70 /​/​
72 /​/​·​This·​class·​manages·​socket·​timeouts·​by·​applying·​the·​concept·​of·​a·​deadline.​71 /​/​·​This·​class·​manages·​socket·​timeouts·​by·​applying·​the·​concept·​of·​a·​deadline.​
73 /​/​·​Some·​asynchronous·​operations·​are·​given·​deadlines·​by·​which·​they·​must·​complete.​72 /​/​·​Some·​asynchronous·​operations·​are·​given·​deadlines·​by·​which·​they·​must·​complete.​
74 /​/​·​Deadlines·​are·​enforced·​by·​two·​"actors"·​that·​persist·​for·​the·​lifetime·​of·​the73 /​/​·​Deadlines·​are·​enforced·​by·​two·​"actors"·​that·​persist·​for·​the·​lifetime·​of·​the
75 /​/​·​session·​object,​·​one·​for·​input·​and·​one·​for·​output:​74 /​/​·​session·​object,​·​one·​for·​input·​and·​one·​for·​output:​
76 /​/​75 /​/​
77 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+·····················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+76 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+······················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
78 /​/​··​|················​|·····················​|················​|77 /​/​··​|················​|······················​|················​|
79 /​/​··​|·​check_deadline·​|<-​-​-​+················​|·​check_deadline·​|<-​-​-​+78 /​/​··​|·​check_deadline·​|<-​-​-​-​-​-​-​+·············​|·​check_deadline·​|<-​-​-​-​-​-​-​+
80 /​/​··​|················​|····|·async_wait()​···​|················​|····|·async_wait()​79 /​/​··​|················​|········|·············​|················​|········|
81 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|··on·input······​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|··on·output80 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|·············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|
82 /​/​··············​|·········|··deadline··················​|·········|··deadline81 /​/​···············​|············|··························​|············|
83 /​/​··············​+-​-​-​-​-​-​-​-​-​+····························+-​-​-​-​-​-​-​-​-​+82 /​/​··async_wait()​·|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····async_wait()​·|····+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
83 /​/​···​on·​input····​|····​|·····​lambda·····​|·····​on·​output···​|····​|·····​lambda·····​|
84 /​/​···​deadline····​+-​-​-​>|·······​in·······​|·····​deadline····​+-​-​-​>|·······​in·······​|
85 /​/​····················​|·​check_deadline·​|······················​|·​check_deadline·​|
86 /​/​····················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+······················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
84 /​/​87 /​/​
85 /​/​·​If·​either·​deadline·​actor·​determines·​that·​the·​corresponding·​deadline·​has88 /​/​·​If·​either·​deadline·​actor·​determines·​that·​the·​corresponding·​deadline·​has
86 /​/​·​expired,​·​the·​socket·​is·​closed·​and·​any·​outstanding·​operations·​are·​cancelled.​89 /​/​·​expired,​·​the·​socket·​is·​closed·​and·​any·​outstanding·​operations·​are·​cancelled.​
87 /​/​90 /​/​
88 /​/​·​The·​input·​actor·​reads·​messages·​from·​the·​socket,​·​where·​messages·​are·​delimited91 /​/​·​The·​input·​actor·​reads·​messages·​from·​the·​socket,​·​where·​messages·​are·​delimited
89 /​/​·​by·​the·​newline·​character:​92 /​/​·​by·​the·​newline·​character:​
90 /​/​93 /​/​
91 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​+94 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
92 /​/​··​|············​|95 /​/​··​|·············​|
93 /​/​··​|·start_read·​|<-​-​-​+96 /​/​··​|··read_line··​|<-​-​-​-​+
94 /​/​··​|············​|····​|97 /​/​··​|·············​|·····​|
95 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​+····​|98 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+·····​|
96 /​/​··········​|·········​|99 /​/​··········​|···········​|
97 /​/​··​async_-​·​|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​+100 /​/​··​async_-​·​|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
98 /​/​···​read_-​·​|····​|·············​|101 /​/​···​read_-​·​|····​|···lambda····​|
99 /​/​··​until()​·​+-​-​-​>|·handle_read·​|102 /​/​··​until()​·​+-​-​-​>|·····in······​|
100 /​/​···············​|·············​|103 /​/​···············​|··read_line··​|
101 /​/​···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​+104 /​/​···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
102 /​/​105 /​/​
103 /​/​·​The·​deadline·​for·​receiving·​a·​complete·​message·​is·​30·​seconds.​·​If·​a·​non-​empty106 /​/​·​The·​deadline·​for·​receiving·​a·​complete·​message·​is·​30·​seconds.​·​If·​a·​non-​empty
104 /​/​·​message·​is·​received,​·​it·​is·​delivered·​to·​all·​subscribers.​·​If·​a·​heartbeat·​(a107 /​/​·​message·​is·​received,​·​it·​is·​delivered·​to·​all·​subscribers.​·​If·​a·​heartbeat·​(a
105 /​/​·​message·​that·​consists·​of·​a·​single·​newline·​character)​·​is·​received,​·​a·​heartbeat108 /​/​·​message·​that·​consists·​of·​a·​single·​newline·​character)​·​is·​received,​·​a·​heartbeat
106 /​/​·​is·​enqueued·​for·​the·​client,​·​provided·​there·​are·​no·​other·​messages·​waiting·​to109 /​/​·​is·​enqueued·​for·​the·​client,​·​provided·​there·​are·​no·​other·​messages·​waiting·​to
107 /​/​·​be·​sent.​110 /​/​·​be·​sent.​
108 /​/​111 /​/​
109 /​/​·​The·​output·​actor·​is·​responsible·​for·​sending·​messages·​to·​the·​client:​112 /​/​·​The·​output·​actor·​is·​responsible·​for·​sending·​messages·​to·​the·​client:​
110 /​/​113 /​/​
111 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+114 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
112 /​/​··​|··············​|<-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+115 /​/​··​|················​|<-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
113 /​/​··​|·​await_output·​|······················​|116 /​/​··​|··​await_output··​|······················​|
114 /​/​··​|··············​|<-​-​-​+·················​|117 /​/​··​|················​|<-​-​-​-​-​-​-​+·············​|
115 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|·················​|118 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|·············​|
116 /​/​······|······|········|·async_wait()​····​|119 /​/​····|············|··········|·············​|
117 /​/​······|······​+-​-​-​-​-​-​-​-​+·················​|120 /​/​····|····async_-​·|··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····​|
118 /​/​······V·································​|121 /​/​····|·····wait()​·|··|·····lambda·····|····​|
119 /​/​··+-​-​-​-​-​-​-​-​-​-​-​-​-​+···············+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+122 /​/​····|············+-​>|·······in·······|····|
120 /​/​··|·············|·async_write()​·|··············​|123 /​/​····|···············|··await_output··|····​|
121 /​/​··​|·start_write·|-​-​-​-​-​-​-​-​-​-​-​-​-​-​>|·handle_write·​|124 /​/​····​|···············+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····​|
122 /​/​··|·············|···············|··············​|125 /​/​····V·····································​|
123 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+126 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
127 /​/​··​|··············​|·​async_write()​·​|····​lambda····​|
128 /​/​··​|··​write_line··​|-​-​-​-​-​-​-​-​-​-​-​-​-​-​>|······​in······​|
129 /​/​··​|··············​|···············​|··​write_line··​|
130 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
124 /​/​131 /​/​
125 /​/​·​The·​output·​actor·​first·​waits·​for·​an·​output·​message·​to·​be·​enqueued.​·​It·​does132 /​/​·​The·​output·​actor·​first·​waits·​for·​an·​output·​message·​to·​be·​enqueued.​·​It·​does
126 /​/​·​this·​by·​using·​a·​steady_timer·​as·​an·​asynchronous·​condition·​variable.​·​The133 /​/​·​this·​by·​using·​a·​steady_timer·​as·​an·​asynchronous·​condition·​variable.​·​The
127 /​/​·​steady_timer·​will·​be·​signalled·​whenever·​the·​output·​queue·​is·​non-​empty.​134 /​/​·​steady_timer·​will·​be·​signalled·​whenever·​the·​output·​queue·​is·​non-​empty.​
128 /​/​135 /​/​
129 /​/​·​Once·​a·​message·​is·​available,​·​it·​is·​sent·​to·​the·​client.​·​The·​deadline·​for136 /​/​·​Once·​a·​message·​is·​available,​·​it·​is·​sent·​to·​the·​client.​·​The·​deadline·​for
130 /​/​·​sending·​a·​complete·​message·​is·​30·​seconds.​·​After·​the·​message·​is·​successfully137 /​/​·​sending·​a·​complete·​message·​is·​30·​seconds.​·​After·​the·​message·​is·​successfully
131 /​/​·​sent,​·​the·​output·​actor·​again·​waits·​for·​the·​output·​queue·​to·​become·​non-​empty.​138 /​/​·​sent,​·​the·​output·​actor·​again·​waits·​for·​the·​output·​queue·​to·​become·​non-​empty.​
132 /​/​139 /​/​
133 class·​tcp_session140 class·​tcp_session
134 ··​:​·​public·​subscriber,​141 ··​:​·​public·​subscriber,​
135 ····​public·boost:​:​enable_shared_from_th​is<tcp_session>142 ····​public·​std:​:​enable_shared_from_th​is<tcp_session>
136 {143 {
137 public:​144 public:​
138 ··​tcp_session(asio:​:​io_context&·io_context,​·​channel&·​ch)​145 ··​tcp_session(tcp:​:​socket·socket,​·​channel&·​ch)​
139 ····​:​·​channel_(ch)​,​146 ····​:​·​channel_(ch)​,​
140 ······​socket_(io_context)​,​147 ······​socket_(std:​:​move(socket)​)​
141 ······input_deadline_(io_co​ntext)​,​
142 ······non_empty_output_queu​e_(io_context)​,​
143 ······output_deadline_(io_c​ontext)​
144 ··​{148 ··​{
145 ····​input_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​149 ····​input_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
146 ····​output_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​150 ····​output_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
147 151
148 ····​/​/​·​The·​non_empty_output_queu​e_·​steady_timer·​is·​set·​to·​the·​maximum·​time152 ····​/​/​·​The·​non_empty_output_queu​e_·​steady_timer·​is·​set·​to·​the·​maximum·​time
149 ····​/​/​·​point·​whenever·​the·​output·​queue·​is·​empty.​·​This·​ensures·​that·​the·​output153 ····​/​/​·​point·​whenever·​the·​output·​queue·​is·​empty.​·​This·​ensures·​that·​the·​output
150 ····​/​/​·​actor·​stays·​asleep·​until·​a·​message·​is·​put·​into·​the·​queue.​154 ····​/​/​·​actor·​stays·​asleep·​until·​a·​message·​is·​put·​into·​the·​queue.​
151 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​155 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
152 ··​}156 ··​}
153 157
154 ··​tcp:​:​socket&·​socket()​
155 ··​{
156 ····​return·​socket_;​
157 ··​}
158
159 ··​/​/​·​Called·​by·​the·​server·​object·​to·​initiate·​the·​four·​actors.​158 ··​/​/​·​Called·​by·​the·​server·​object·​to·​initiate·​the·​four·​actors.​
160 ··​void·​start()​159 ··​void·​start()​
161 ··​{160 ··​{
162 ····​channel_.​join(shared_from_this​()​)​;​161 ····​channel_.​join(shared_from_this​()​)​;​
163 162
164 ····start_read()​;​163 ····read_line()​;​
164 ····​check_deadline(input_​deadline_)​;​
165 165
166 ····​input_deadline_.​async_wait(
167 ········​boost:​:​bind(&tcp_session:​:​check_deadline,​
168 ········​shared_from_this()​,​·​&input_deadline_)​)​;​
169
170 ····​await_output()​;​166 ····​await_output()​;​
171 167 ····check_deadline(output​_deadline_)​;​
172 ····output_deadline_.​async_wait(
173 ········boost:​:​bind(&tcp_session:​:​check_deadline,​
174 ········shared_from_this()​,​·&output_deadline_)​)​;​
175 ··​}168 ··​}
176 169
177 private:​170 private:​
178 ··​void·​stop()​171 ··​void·​stop()​
179 ··​{172 ··​{
180 ····​channel_.​leave(shared_from_thi​s()​)​;​173 ····​channel_.​leave(shared_from_thi​s()​)​;​
181 174
182 ····asio:​:​error_code·​ignored_ec;​175 ····​std:​:​error_code·​ignored_error;​
183 ····​socket_.​close(ignored_ec)​;​176 ····​socket_.​close(ignored_error)​;​
184 ····​input_deadline_.​cancel()​;​177 ····​input_deadline_.​cancel()​;​
185 ····​non_empty_output_queu​e_.​cancel()​;​178 ····​non_empty_output_queu​e_.​cancel()​;​
186 ····​output_deadline_.​cancel()​;​179 ····​output_deadline_.​cancel()​;​
187 ··​}180 ··​}
188 181
189 ··​bool·​stopped()​·​const182 ··​bool·​stopped()​·​const
190 ··​{183 ··​{
191 ····​return·​!socket_.​is_open()​;​184 ····​return·​!socket_.​is_open()​;​
192 ··​}185 ··​}
193 186
194 ··​void·​deliver(const·​std:​:​string&·​msg)​187 ··​void·​deliver(const·​std:​:​string&·​msg)​·override
195 ··​{188 ··​{
196 ····​output_queue_.​push_back(msg·​+·​"\n")​;​189 ····​output_queue_.​push_back(msg·​+·​"\n")​;​
197 190
198 ····​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying·​the·​expiry191 ····​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying·​the·​expiry
199 ····​/​/​·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on·​the·​timer.​192 ····​/​/​·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on·​the·​timer.​
200 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​193 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​
201 ··​}194 ··​}
202 195
203 ··​void·start_read()​196 ··​void·read_line()​
204 ··​{197 ··​{
205 ····​/​/​·​Set·​a·​deadline·​for·​the·​read·​operation.​198 ····​/​/​·​Set·​a·​deadline·​for·​the·​read·​operation.​
206 ····​input_deadline_.​expires_after(asio:​:​chrono:​:​seconds(30)​)​;​199 ····​input_deadline_.​expires_after(std:​:​chrono:​:​seconds(30)​)​;​
207 200
208 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​read·​a·​newline-​delimited·​message.​201 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​read·​a·​newline-​delimited·​message.​
202 ····​auto·​self(shared_from_this​()​)​;​
209 ····​asio:​:​async_read_until(sock​et_,​203 ····​asio:​:​async_read_until(sock​et_,​
210 ········​asio:​:​dynamic_buffer(input_​buffer_)​,​·​'\n',​204 ········​asio:​:​dynamic_buffer(input_​buffer_)​,​·​'\n',​
211 ········boost:​:​bind(&tcp_session:​:​handle_read,​·​shared_from_this()​,​205 ········[this,​·self](const·std:​:​error_code&·error,​·​std:​:​size_t·n)​
212 ··········boost:​:​placeholders:​:​_1,​·boost:​:​placeholders:​:​_2)​)​;​206 ········{
213 ··}207 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
208 ··········​if·​(stopped()​)​
209 ············​return;​
214 210
215 ··void·handle_read(const·asio:​:​error_code&·ec,​·std:​:​size_t·n)​211 ··········if·(!error)​
216 ··​{212 ··········​{
217 ····​if·(stopped()​)​213 ············/​/​·Extract·the·newline-​delimited·message·from·the·buffer.​
218 ······​return;​214 ············std:​:​string·msg(input_buffer_.​substr(0,​·n·-​·1)​)​;​
215 ············​input_buffer_.​erase(0,​·​n)​;​
219 216
220 ····​if·​(!ec)​217 ············​if·​(!msg.​empty()​)​
221 ····​{218 ············​{
222 ······/​/​·Extract·the·newline-​delimited·message·from·the·buffer.​219 ··············channel_.​deliver(msg)​;​
223 ······std:​:​string·msg(input_buffer_.​substr(0,​·n·-​·1)​)​;​220 ············}
224 ······input_buffer_.​erase(0,​·n)​;​221 ············else
222 ············​{
225 223
226 ······​if·(!msg.​empty()​)​224 ··············/​/​·We·received·a·heartbeat·message·from·the·client.​·If·there's
227 ······{225 ··············/​/​·nothing·else·being·sent·or·ready·to·be·sent,​·send·a·heartbeat
228 ········channel_.​deliver(msg)​;​226 ··············/​/​·right·back.​
229 ······}227 ··············if·(output_queue_.​empty()​)​
230 ······else228 ··············{
231 ······{229 ················output_queue_.​push_back("\n")​;​
232 ········/​/​·We·received·a·heartbeat·message·from·the·client.​·If·there's·nothing
233 ········/​/​·else·being·sent·or·ready·to·be·sent,​·send·a·heartbeat·right·back.​
234 ········if·(output_queue_.​empty()​)​
235 ········{
236 ··········output_queue_.​push_back("\n")​;​
237 230
238 ··········​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying·the231 ················​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying
239 ··········​/​/​·​expiry·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on·the·timer.​232 ················​/​/​·the·​expiry·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on
240 ··········non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​233 ················/​/​·the·timer.​
241 ········}234 ················non_empty_output_queu​e_.​expires_at(
242 ······}235 ····················steady_timer:​:​time_point:​:​min()​)​;​
236 ··············​}
237 ············​}
243 238
244 ······start_read()​;​239 ············read_line()​;​
245 ····​}240 ··········​}
246 ····​else241 ··········​else
247 ····​{242 ··········​{
248 ······​stop()​;​243 ············​stop()​;​
249 ····​}244 ··········​}
245 ········​})​;​
250 ··​}246 ··​}
251 247
252 ··​void·​await_output()​248 ··​void·​await_output()​
253 ··​{249 ··​{
254 ····if·​(stopped()​)​250 ····auto·self(shared_from_this​()​)​;​
255 ······return;​251 ····non_empty_output_queu​e_.​async_wait(
252 ········​[this,​·​self](const·​std:​:​error_code&·​/​*error*/​)​
253 ········​{
254 ··········​/​/​·​Check·​if·​the·​session·​was·​stopped·​while·​the·​operation·​was·​pending.​
255 ··········​if·​(stopped()​)​
256 ············​return;​
256 257
257 ····​if·​(output_queue_.​empty()​)​258 ··········​if·​(output_queue_.​empty()​)​
258 ····​{259 ··········​{
259 ······​/​/​·​There·​are·​no·​messages·​that·​are·​ready·​to·​be·​sent.​·​The·​actor·​goes·to260 ············​/​/​·​There·​are·​no·​messages·​that·​are·​ready·​to·​be·​sent.​·​The·​actor·​goes
260 ······​/​/​·​sleep·​by·​waiting·​on·​the·​non_empty_output_queu​e_·​timer.​·​When·​a·new261 ············​/​/​·to·​sleep·​by·​waiting·​on·​the·​non_empty_output_queu​e_·​timer.​·​When·​a
261 ······​/​/​·​message·​is·​added,​·​the·​timer·​will·​be·​modified·​and·​the·​actor·will·wake.​262 ············​/​/​·new·​message·​is·​added,​·​the·​timer·​will·​be·​modified·​and·​the·​actor
262 ······non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​263 ············/​/​·will·wake.​
263 ······​non_empty_output_queu​e_.​async_wait(264 ············​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
264 ··········boost:​:​bind(&tcp_session:​:​await_output,​·shared_from_this()​)​)​;​265 ············await_output()​;​
265 ····​}266 ··········​}
266 ····​else267 ··········​else
267 ····​{268 ··········​{
268 ······start_write()​;​269 ············write_line()​;​
269 ····​}270 ··········​}
271 ········​})​;​
270 ··​}272 ··​}
271 273
272 ··​void·start_write()​274 ··​void·write_line()​
273 ··​{275 ··​{
274 ····​/​/​·​Set·​a·​deadline·​for·​the·​write·​operation.​276 ····​/​/​·​Set·​a·​deadline·​for·​the·​write·​operation.​
275 ····​output_deadline_.​expires_after(asio:​:​chrono:​:​seconds(30)​)​;​277 ····​output_deadline_.​expires_after(std:​:​chrono:​:​seconds(30)​)​;​
276 278
277 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​send·​a·​message.​279 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​send·​a·​message.​
280 ····​auto·​self(shared_from_this​()​)​;​
278 ····​asio:​:​async_write(socket_,​281 ····​asio:​:​async_write(socket_,​
279 ········​asio:​:​buffer(output_queue_.​front()​)​,​282 ········​asio:​:​buffer(output_queue_.​front()​)​,​
280 ········boost:​:​bind(&tcp_session:​:​handle_write,​283 ········[this,​·self](const·std:​:​error_code&·error,​·std:​:​size_t·/​*n*/​)​
281 ··········shared_from_this()​,​·boost:​:​placeholders:​:​_1)​)​;​284 ········{
282 ··}285 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
286 ··········​if·​(stopped()​)​
287 ············​return;​
283 288
284 ··void·handle_write(const·asio:​:​error_code&·ec)​289 ··········if·(!error)​
285 ··​{290 ··········​{
286 ····if·(stopped()​)​291 ············output_queue_.​pop_front()​;​
287 ······return;​
288 292
289 ····​if·(!ec)​293 ············await_output()​;​
290 ····{294 ··········}
291 ······output_queue_.​pop_front()​;​295 ··········else
292 296 ··········{
293 ······await_output()​;​297 ············stop()​;​
294 ····​}298 ··········​}
295 ····else299 ········})​;​
296 ····{
297 ······stop()​;​
298 ····}
299 ··​}300 ··​}
300 301
301 ··​void·​check_deadline(steady​_timer*·​deadline)​302 ··​void·​check_deadline(steady​_timer&·​deadline)​
302 ··​{303 ··​{
303 ····if·​(stopped()​)​304 ····auto·self(shared_from_this​()​)​;​
304 ······return;​305 ····deadline.​async_wait(
306 ········​[this,​·​self,​·​&deadline](const·​std:​:​error_code&·​/​*error*/​)​
307 ········​{
308 ··········​/​/​·​Check·​if·​the·​session·​was·​stopped·​while·​the·​operation·​was·​pending.​
309 ··········​if·​(stopped()​)​
310 ············​return;​
305 311
306 ····​/​/​·​Check·​whether·​the·​deadline·​has·​passed.​·​We·​compare·​the·​deadline·against312 ··········​/​/​·​Check·​whether·​the·​deadline·​has·​passed.​·​We·​compare·​the·​deadline
307 ····​/​/​·​the·​current·​time·​since·​a·​new·​asynchronous·​operation·​may·have·moved·the313 ··········​/​/​·against·​the·​current·​time·​since·​a·​new·​asynchronous·​operation·​may
308 ····​/​/​·​deadline·​before·​this·​actor·​had·​a·​chance·​to·​run.​314 ··········​/​/​·have·moved·the·​deadline·​before·​this·​actor·​had·​a·​chance·​to·​run.​
309 ····​if·​(deadline-​>expiry()​·​<=·​steady_timer:​:​clock_type:​:​now()​)​315 ··········​if·​(deadline.​expiry()​·​<=·​steady_timer:​:​clock_type:​:​now()​)​
310 ····​{316 ··········​{
311 ······​/​/​·​The·​deadline·​has·​passed.​·​Stop·​the·​session.​·​The·​other·​actors·​will317 ············​/​/​·​The·​deadline·​has·​passed.​·​Stop·​the·​session.​·​The·​other·​actors·​will
312 ······​/​/​·​terminate·​as·​soon·​as·​possible.​318 ············​/​/​·​terminate·​as·​soon·​as·​possible.​
313 ······​stop()​;​319 ············​stop()​;​
314 ····​}320 ··········​}
315 ····​else321 ··········​else
316 ····​{322 ··········​{
317 ······​/​/​·​Put·​the·​actor·​back·​to·​sleep.​323 ············​/​/​·​Put·​the·​actor·​back·​to·​sleep.​
318 ······​deadline-​>async_wait(324 ············check_deadline(deadli​ne)​;​
319 ··········boost:​:​bind(&tcp_session:​:​check_deadline,​325 ··········}
320 ··········shared_from_this()​,​·deadline)​)​;​326 ········})​;​
321 ····}
322 ··​}327 ··​}
323 328
324 ··​channel&·​channel_;​329 ··​channel&·​channel_;​
325 ··​tcp:​:​socket·​socket_;​330 ··​tcp:​:​socket·​socket_;​
326 ··​std:​:​string·​input_buffer_;​331 ··​std:​:​string·​input_buffer_;​
327 ··​steady_timer·​input_deadline_;​332 ··​steady_timer·​input_deadline_{socke​t_.​get_executor()​};​
328 ··​std:​:​deque<std:​:​string>·​output_queue_;​333 ··​std:​:​deque<std:​:​string>·​output_queue_;​
329 ··​steady_timer·​non_empty_output_queu​e_;​334 ··​steady_timer·​non_empty_output_queu​e_{socket_.​get_executor()​};​
330 ··​steady_timer·​output_deadline_;​335 ··​steady_timer·​output_deadline_{sock​et_.​get_executor()​};​
331 };​336 };​
332 337
333 typedef·boost:​:​shared_ptr<tcp_sessio​n>·​tcp_session_ptr;​338 typedef·​std:​:​shared_ptr<tcp_sessio​n>·​tcp_session_ptr;​
334 339
335 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​340 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
336 341
337 class·​udp_broadcaster342 class·​udp_broadcaster
338 ··​:​·​public·​subscriber343 ··​:​·​public·​subscriber
339 {344 {
340 public:​345 public:​
341 ··​udp_broadcaster(asio:​:​io_context&·​io_context,​346 ··​udp_broadcaster(asio:​:​io_context&·​io_context,​
342 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​347 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​
343 ····​:​·​socket_(io_context)​348 ····​:​·​socket_(io_context)​
344 ··​{349 ··​{
345 ····​socket_.​connect(broadcast_end​point)​;​350 ····​socket_.​connect(broadcast_end​point)​;​
346 ····​socket_.​set_option(udp:​:​socket:​:​broadcast(true)​)​;​351 ····​socket_.​set_option(udp:​:​socket:​:​broadcast(true)​)​;​
347 ··​}352 ··​}
348 353
349 private:​354 private:​
350 ··​void·​deliver(const·​std:​:​string&·​msg)​355 ··​void·​deliver(const·​std:​:​string&·​msg)​
351 ··​{356 ··​{
352 ····asio:​:​error_code·​ignored_ec;​357 ····​std:​:​error_code·​ignored_error;​
353 ····​socket_.​send(asio:​:​buffer(msg)​,​·​0,​·​ignored_ec)​;​358 ····​socket_.​send(asio:​:​buffer(msg)​,​·​0,​·​ignored_error)​;​
354 ··​}359 ··​}
355 360
356 ··​udp:​:​socket·​socket_;​361 ··​udp:​:​socket·​socket_;​
357 };​362 };​
358 363
359 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​364 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
360 365
361 class·​server366 class·​server
362 {367 {
363 public:​368 public:​
364 ··​server(asio:​:​io_context&·​io_context,​369 ··​server(asio:​:​io_context&·​io_context,​
365 ······​const·​tcp:​:​endpoint&·​listen_endpoint,​370 ······​const·​tcp:​:​endpoint&·​listen_endpoint,​
366 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​371 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​
367 ····​:​·​io_context_(io_contex​t)​,​372 ····​:​·​io_context_(io_contex​t)​,​
368 ······​acceptor_(io_context,​·​listen_endpoint)​373 ······​acceptor_(io_context,​·​listen_endpoint)​
369 ··​{374 ··​{
370 ····subscriber_ptr·bc(new·udp_broadcaster(io_co​ntext_,​·broadcast_endpoint)​)​;​375 ····​channel_.​join(
371 ····channel_.​join(bc)​;​376 ········std:​:​make_shared<udp_broad​caster>(
377 ··········​io_context_,​·​broadcast_endpoint)​)​;​
372 378
373 ····start_accept()​;​379 ····​accept()​;​
374 ··​}380 ··​}
375 381
376 ··void·start_accept()​382 private:​
383 ··​void·​accept()​
377 ··​{384 ··​{
378 ····tcp_session_ptr·new_session(new·tcp_session(io_contex​t_,​·channel_)​)​;​385 ····acceptor_.​async_accept(
386 ········​[this](const·​std:​:​error_code&·​error,​·​tcp:​:​socket·​socket)​
387 ········​{
388 ··········​if·​(!error)​
389 ··········​{
390 ············​std:​:​make_shared<tcp_sessi​on>(std:​:​move(socket)​,​·​channel_)​-​>start()​;​
391 ··········​}
379 392
380 ····acceptor_.​async_accept(new_sess​ion-​>socket()​,​393 ··········accept()​;​
381 ········boost:​:​bind(&server:​:​handle_accept,​394 ········})​;​
382 ··········this,​·new_session,​·boost:​:​placeholders:​:​_1)​)​;​
383 ··​}395 ··​}
384 396
385 ··​void·​handle_accept(tcp_ses​sion_ptr·​session,​
386 ······​const·​asio:​:​error_code&·​ec)​
387 ··​{
388 ····​if·​(!ec)​
389 ····​{
390 ······​session-​>start()​;​
391 ····​}
392
393 ····​start_accept()​;​
394 ··​}
395
396 private:​
397 ··​asio:​:​io_context&·​io_context_;​397 ··​asio:​:​io_context&·​io_context_;​
398 ··​tcp:​:​acceptor·​acceptor_;​398 ··​tcp:​:​acceptor·​acceptor_;​
399 ··​channel·​channel_;​399 ··​channel·​channel_;​
400 };​400 };​
401 401
402 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​402 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
403 403
404 int·​main(int·​argc,​·​char*·​argv[])​404 int·​main(int·​argc,​·​char*·​argv[])​
405 {405 {
406 ··​try406 ··​try
407 ··​{407 ··​{
408 ····​using·​namespace·​std;​·​/​/​·​For·​atoi.​408 ····​using·​namespace·​std;​·​/​/​·​For·​atoi.​
409 409
410 ····​if·​(argc·​!=·​4)​410 ····​if·​(argc·​!=·​4)​
411 ····​{411 ····​{
412 ······​std:​:​cerr·​<<·​"Usage:​·​server·​<listen_port>·​<bcast_address>·​<bcast_port>\n";​412 ······​std:​:​cerr·​<<·​"Usage:​·​server·​<listen_port>·​<bcast_address>·​<bcast_port>\n";​
413 ······​return·​1;​413 ······​return·​1;​
414 ····​}414 ····​}
415 415
416 ····​asio:​:​io_context·​io_context;​416 ····​asio:​:​io_context·​io_context;​
417 417
418 ····​tcp:​:​endpoint·​listen_endpoint(tcp:​:​v4()​,​·​atoi(argv[1])​)​;​418 ····​tcp:​:​endpoint·​listen_endpoint(tcp:​:​v4()​,​·​atoi(argv[1])​)​;​
419 419
420 ····​udp:​:​endpoint·​broadcast_endpoint(420 ····​udp:​:​endpoint·​broadcast_endpoint(
421 ········​asio:​:​ip:​:​make_address(argv[2])​,​·​atoi(argv[3])​)​;​421 ········​asio:​:​ip:​:​make_address(argv[2])​,​·​atoi(argv[3])​)​;​
422 422
423 ····​server·​s(io_context,​·​listen_endpoint,​·​broadcast_endpoint)​;​423 ····​server·​s(io_context,​·​listen_endpoint,​·​broadcast_endpoint)​;​
424 424
425 ····​io_context.​run()​;​425 ····​io_context.​run()​;​
426 ··​}426 ··​}
427 ··​catch·​(std:​:​exception&·​e)​427 ··​catch·​(std:​:​exception&·​e)​
428 ··​{428 ··​{
429 ····​std:​:​cerr·​<<·​"Exception:​·​"·​<<·​e.​what()​·​<<·​"\n";​429 ····​std:​:​cerr·​<<·​"Exception:​·​"·​<<·​e.​what()​·​<<·​"\n";​
430 ··​}430 ··​}
431 431
432 ··​return·​0;​432 ··​return·​0;​
433 }433 }