src/​examples/​cpp03/​timeouts/​server.​cppsrc/​examples/​cpp11/​timeouts/​server.​cpp
1 /​/​1 /​/​
2 /​/​·​server.​cpp2 /​/​·​server.​cpp
3 /​/​·​~~~~~~~~~~3 /​/​·​~~~~~~~~~~
4 /​/​4 /​/​
5 /​/​·​Copyright·​(c)​·​2003-​2018·​Christopher·​M.​·​Kohlhoff·​(chris·​at·​kohlhoff·​dot·​com)​5 /​/​·​Copyright·​(c)​·​2003-​2018·​Christopher·​M.​·​Kohlhoff·​(chris·​at·​kohlhoff·​dot·​com)​
6 /​/​6 /​/​
7 /​/​·​Distributed·​under·​the·​Boost·​Software·​License,​·​Version·​1.​0.​·​(See·​accompanying7 /​/​·​Distributed·​under·​the·​Boost·​Software·​License,​·​Version·​1.​0.​·​(See·​accompanying
8 /​/​·​file·​LICENSE_1_0.​txt·​or·​copy·​at·​http:​/​/​www.​boost.​org/​LICENSE_1_0.​txt)​8 /​/​·​file·​LICENSE_1_0.​txt·​or·​copy·​at·​http:​/​/​www.​boost.​org/​LICENSE_1_0.​txt)​
9 /​/​9 /​/​
10 10
11 #include·​<algorithm>11 #include·​<algorithm>
12 #include·​<cstdlib>12 #include·​<cstdlib>
13 #include·​<deque>13 #include·​<deque>
14 #include·​<iostream>14 #include·​<iostream>
15 #include·​<memory>
15 #include·​<set>16 #include·​<set>
16 #include·​<string>17 #include·​<string>
17 #include·​<boost/​bind.​hpp>
18 #include·​<boost/​shared_ptr.​hpp>
19 #include·​<boost/​enable_shared_from_th​is.​hpp>
20 #include·​"asio/​buffer.​hpp"18 #include·​"asio/​buffer.​hpp"
21 #include·​"asio/​io_context.​hpp"19 #include·​"asio/​io_context.​hpp"
22 #include·​"asio/​ip/​tcp.​hpp"20 #include·​"asio/​ip/​tcp.​hpp"
23 #include·​"asio/​ip/​udp.​hpp"21 #include·​"asio/​ip/​udp.​hpp"
24 #include·​"asio/​read_until.​hpp"22 #include·​"asio/​read_until.​hpp"
25 #include·​"asio/​steady_timer.​hpp"23 #include·​"asio/​steady_timer.​hpp"
26 #include·​"asio/​write.​hpp"24 #include·​"asio/​write.​hpp"
27 25
28 using·​asio:​:​steady_timer;​26 using·​asio:​:​steady_timer;​
29 using·​asio:​:​ip:​:​tcp;​27 using·​asio:​:​ip:​:​tcp;​
30 using·​asio:​:​ip:​:​udp;​28 using·​asio:​:​ip:​:​udp;​
31 29
32 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​30 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
33 31
34 class·​subscriber32 class·​subscriber
35 {33 {
36 public:​34 public:​
37 ··​virtual·​~subscriber()​·{}35 ··​virtual·​~subscriber()​·=·default;​
38 ··​virtual·​void·​deliver(const·​std:​:​string&·​msg)​·​=·​0;​36 ··​virtual·​void·​deliver(const·​std:​:​string&·​msg)​·​=·​0;​
39 };​37 };​
40 38
41 typedef·boost:​:​shared_ptr<subscriber​>·​subscriber_ptr;​39 typedef·​std:​:​shared_ptr<subscriber​>·​subscriber_ptr;​
42 40
43 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​41 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
44 42
45 class·​channel43 class·​channel
46 {44 {
47 public:​45 public:​
48 ··​void·​join(subscriber_ptr·​subscriber)​46 ··​void·​join(subscriber_ptr·​subscriber)​
49 ··​{47 ··​{
50 ····​subscribers_.​insert(subscriber)​;​48 ····​subscribers_.​insert(subscriber)​;​
51 ··​}49 ··​}
52 50
53 ··​void·​leave(subscriber_ptr·​subscriber)​51 ··​void·​leave(subscriber_ptr·​subscriber)​
54 ··​{52 ··​{
55 ····​subscribers_.​erase(subscriber)​;​53 ····​subscribers_.​erase(subscriber)​;​
56 ··​}54 ··​}
57 55
58 ··​void·​deliver(const·​std:​:​string&·​msg)​56 ··​void·​deliver(const·​std:​:​string&·​msg)​
59 ··​{57 ··​{
60 ····std:​:​for_each(subscribers_​.​begin()​,​·​subscribers_.​end()​,​58 ····​for·(const·auto&·s·:​·​subscribers_)​
61 ········boost:​:​bind(&subscriber:​:​deliver,​·_1,​·boost:​:​ref(msg)​)​)​;​59 ····{
60 ······​s-​>deliver(msg)​;​
61 ····​}
62 ··​}62 ··​}
63 63
64 private:​64 private:​
65 ··​std:​:​set<subscriber_ptr>·​subscribers_;​65 ··​std:​:​set<subscriber_ptr>·​subscribers_;​
66 };​66 };​
67 67
68 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​68 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
69 69
70 /​/​70 /​/​
71 /​/​·​This·​class·​manages·​socket·​timeouts·​by·​applying·​the·​concept·​of·​a·​deadline.​71 /​/​·​This·​class·​manages·​socket·​timeouts·​by·​applying·​the·​concept·​of·​a·​deadline.​
72 /​/​·​Some·​asynchronous·​operations·​are·​given·​deadlines·​by·​which·​they·​must·​complete.​72 /​/​·​Some·​asynchronous·​operations·​are·​given·​deadlines·​by·​which·​they·​must·​complete.​
73 /​/​·​Deadlines·​are·​enforced·​by·​two·​"actors"·​that·​persist·​for·​the·​lifetime·​of·​the73 /​/​·​Deadlines·​are·​enforced·​by·​two·​"actors"·​that·​persist·​for·​the·​lifetime·​of·​the
74 /​/​·​session·​object,​·​one·​for·​input·​and·​one·​for·​output:​74 /​/​·​session·​object,​·​one·​for·​input·​and·​one·​for·​output:​
75 /​/​75 /​/​
76 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+·····················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+76 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+······················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
77 /​/​··​|················​|·····················​|················​|77 /​/​··​|················​|······················​|················​|
78 /​/​··​|·​check_deadline·​|<-​-​-​+················​|·​check_deadline·​|<-​-​-​+78 /​/​··​|·​check_deadline·​|<-​-​-​-​-​-​-​+·············​|·​check_deadline·​|<-​-​-​-​-​-​-​+
79 /​/​··​|················​|····|·async_wait()​···​|················​|····|·async_wait()​79 /​/​··​|················​|········|·············​|················​|········|
80 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|··on·input······​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|··on·output80 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|·············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|
81 /​/​··············​|·········|··deadline··················​|·········|··deadline81 /​/​···············​|············|··························​|············|
82 /​/​··············​+-​-​-​-​-​-​-​-​-​+····························+-​-​-​-​-​-​-​-​-​+82 /​/​··async_wait()​·|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····async_wait()​·|····+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
83 /​/​···​on·​input····​|····​|·····​lambda·····​|·····​on·​output···​|····​|·····​lambda·····​|
84 /​/​···​deadline····​+-​-​-​>|·······​in·······​|·····​deadline····​+-​-​-​>|·······​in·······​|
85 /​/​····················​|·​check_deadline·​|······················​|·​check_deadline·​|
86 /​/​····················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+······················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
83 /​/​87 /​/​
84 /​/​·​If·​either·​deadline·​actor·​determines·​that·​the·​corresponding·​deadline·​has88 /​/​·​If·​either·​deadline·​actor·​determines·​that·​the·​corresponding·​deadline·​has
85 /​/​·​expired,​·​the·​socket·​is·​closed·​and·​any·​outstanding·​operations·​are·​cancelled.​89 /​/​·​expired,​·​the·​socket·​is·​closed·​and·​any·​outstanding·​operations·​are·​cancelled.​
86 /​/​90 /​/​
87 /​/​·​The·​input·​actor·​reads·​messages·​from·​the·​socket,​·​where·​messages·​are·​delimited91 /​/​·​The·​input·​actor·​reads·​messages·​from·​the·​socket,​·​where·​messages·​are·​delimited
88 /​/​·​by·​the·​newline·​character:​92 /​/​·​by·​the·​newline·​character:​
89 /​/​93 /​/​
90 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​+94 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
91 /​/​··​|············​|95 /​/​··​|·············​|
92 /​/​··​|·start_read·​|<-​-​-​+96 /​/​··​|··read_line··​|<-​-​-​-​+
93 /​/​··​|············​|····​|97 /​/​··​|·············​|·····​|
94 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​+····​|98 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+·····​|
95 /​/​··········​|·········​|99 /​/​··········​|···········​|
96 /​/​··​async_-​·​|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​+100 /​/​··​async_-​·​|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
97 /​/​···​read_-​·​|····​|·············​|101 /​/​···​read_-​·​|····​|···lambda····​|
98 /​/​··​until()​·​+-​-​-​>|·handle_read·​|102 /​/​··​until()​·​+-​-​-​>|·····in······​|
99 /​/​···············​|·············​|103 /​/​···············​|··read_line··​|
100 /​/​···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​+104 /​/​···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
101 /​/​105 /​/​
102 /​/​·​The·​deadline·​for·​receiving·​a·​complete·​message·​is·​30·​seconds.​·​If·​a·​non-​empty106 /​/​·​The·​deadline·​for·​receiving·​a·​complete·​message·​is·​30·​seconds.​·​If·​a·​non-​empty
103 /​/​·​message·​is·​received,​·​it·​is·​delivered·​to·​all·​subscribers.​·​If·​a·​heartbeat·​(a107 /​/​·​message·​is·​received,​·​it·​is·​delivered·​to·​all·​subscribers.​·​If·​a·​heartbeat·​(a
104 /​/​·​message·​that·​consists·​of·​a·​single·​newline·​character)​·​is·​received,​·​a·​heartbeat108 /​/​·​message·​that·​consists·​of·​a·​single·​newline·​character)​·​is·​received,​·​a·​heartbeat
105 /​/​·​is·​enqueued·​for·​the·​client,​·​provided·​there·​are·​no·​other·​messages·​waiting·​to109 /​/​·​is·​enqueued·​for·​the·​client,​·​provided·​there·​are·​no·​other·​messages·​waiting·​to
106 /​/​·​be·​sent.​110 /​/​·​be·​sent.​
107 /​/​111 /​/​
108 /​/​·​The·​output·​actor·​is·​responsible·​for·​sending·​messages·​to·​the·​client:​112 /​/​·​The·​output·​actor·​is·​responsible·​for·​sending·​messages·​to·​the·​client:​
109 /​/​113 /​/​
110 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+114 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
111 /​/​··​|··············​|<-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+115 /​/​··​|················​|<-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
112 /​/​··​|·​await_output·​|······················​|116 /​/​··​|··​await_output··​|······················​|
113 /​/​··​|··············​|<-​-​-​+·················​|117 /​/​··​|················​|<-​-​-​-​-​-​-​+·············​|
114 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|·················​|118 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|·············​|
115 /​/​······|······|········|·async_wait()​····​|119 /​/​····|············|··········|·············​|
116 /​/​······|······​+-​-​-​-​-​-​-​-​+·················​|120 /​/​····|····async_-​·|··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····​|
117 /​/​······V·································​|121 /​/​····|·····wait()​·|··|·····lambda·····|····​|
118 /​/​··+-​-​-​-​-​-​-​-​-​-​-​-​-​+···············+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+122 /​/​····|············+-​>|·······in·······|····|
119 /​/​··|·············|·async_write()​·|··············​|123 /​/​····|···············|··await_output··|····​|
120 /​/​··​|·start_write·|-​-​-​-​-​-​-​-​-​-​-​-​-​-​>|·handle_write·​|124 /​/​····​|···············+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····​|
121 /​/​··|·············|···············|··············​|125 /​/​····V·····································​|
122 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+126 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
127 /​/​··​|··············​|·​async_write()​·​|····​lambda····​|
128 /​/​··​|··​write_line··​|-​-​-​-​-​-​-​-​-​-​-​-​-​-​>|······​in······​|
129 /​/​··​|··············​|···············​|··​write_line··​|
130 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
123 /​/​131 /​/​
124 /​/​·​The·​output·​actor·​first·​waits·​for·​an·​output·​message·​to·​be·​enqueued.​·​It·​does132 /​/​·​The·​output·​actor·​first·​waits·​for·​an·​output·​message·​to·​be·​enqueued.​·​It·​does
125 /​/​·​this·​by·​using·​a·​steady_timer·​as·​an·​asynchronous·​condition·​variable.​·​The133 /​/​·​this·​by·​using·​a·​steady_timer·​as·​an·​asynchronous·​condition·​variable.​·​The
126 /​/​·​steady_timer·​will·​be·​signalled·​whenever·​the·​output·​queue·​is·​non-​empty.​134 /​/​·​steady_timer·​will·​be·​signalled·​whenever·​the·​output·​queue·​is·​non-​empty.​
127 /​/​135 /​/​
128 /​/​·​Once·​a·​message·​is·​available,​·​it·​is·​sent·​to·​the·​client.​·​The·​deadline·​for136 /​/​·​Once·​a·​message·​is·​available,​·​it·​is·​sent·​to·​the·​client.​·​The·​deadline·​for
129 /​/​·​sending·​a·​complete·​message·​is·​30·​seconds.​·​After·​the·​message·​is·​successfully137 /​/​·​sending·​a·​complete·​message·​is·​30·​seconds.​·​After·​the·​message·​is·​successfully
130 /​/​·​sent,​·​the·​output·​actor·​again·​waits·​for·​the·​output·​queue·​to·​become·​non-​empty.​138 /​/​·​sent,​·​the·​output·​actor·​again·​waits·​for·​the·​output·​queue·​to·​become·​non-​empty.​
131 /​/​139 /​/​
132 class·​tcp_session140 class·​tcp_session
133 ··​:​·​public·​subscriber,​141 ··​:​·​public·​subscriber,​
134 ····​public·boost:​:​enable_shared_from_th​is<tcp_session>142 ····​public·​std:​:​enable_shared_from_th​is<tcp_session>
135 {143 {
136 public:​144 public:​
137 ··​tcp_session(asio:​:​io_context&·io_context,​·​channel&·​ch)​145 ··​tcp_session(tcp:​:​socket·socket,​·​channel&·​ch)​
138 ····​:​·​channel_(ch)​,​146 ····​:​·​channel_(ch)​,​
139 ······​socket_(io_context)​,​147 ······​socket_(std:​:​move(socket)​)​
140 ······input_deadline_(io_co​ntext)​,​
141 ······non_empty_output_queu​e_(io_context)​,​
142 ······output_deadline_(io_c​ontext)​
143 ··​{148 ··​{
144 ····​input_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​149 ····​input_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
145 ····​output_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​150 ····​output_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
146 151
147 ····​/​/​·​The·​non_empty_output_queu​e_·​steady_timer·​is·​set·​to·​the·​maximum·​time152 ····​/​/​·​The·​non_empty_output_queu​e_·​steady_timer·​is·​set·​to·​the·​maximum·​time
148 ····​/​/​·​point·​whenever·​the·​output·​queue·​is·​empty.​·​This·​ensures·​that·​the·​output153 ····​/​/​·​point·​whenever·​the·​output·​queue·​is·​empty.​·​This·​ensures·​that·​the·​output
149 ····​/​/​·​actor·​stays·​asleep·​until·​a·​message·​is·​put·​into·​the·​queue.​154 ····​/​/​·​actor·​stays·​asleep·​until·​a·​message·​is·​put·​into·​the·​queue.​
150 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​155 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
151 ··​}156 ··​}
152 157
153 ··​tcp:​:​socket&·​socket()​
154 ··​{
155 ····​return·​socket_;​
156 ··​}
157
158 ··​/​/​·​Called·​by·​the·​server·​object·​to·​initiate·​the·​four·​actors.​158 ··​/​/​·​Called·​by·​the·​server·​object·​to·​initiate·​the·​four·​actors.​
159 ··​void·​start()​159 ··​void·​start()​
160 ··​{160 ··​{
161 ····​channel_.​join(shared_from_this​()​)​;​161 ····​channel_.​join(shared_from_this​()​)​;​
162 162
163 ····start_read()​;​163 ····read_line()​;​
164 164 ····check_deadline(input_​deadline_)​;​
165 ····input_deadline_.​async_wait(
166 ········boost:​:​bind(&tcp_session:​:​check_deadline,​
167 ········shared_from_this()​,​·&input_deadline_)​)​;​
168 165
169 ····​await_output()​;​166 ····​await_output()​;​
170 167 ····check_deadline(output​_deadline_)​;​
171 ····output_deadline_.​async_wait(
172 ········boost:​:​bind(&tcp_session:​:​check_deadline,​
173 ········shared_from_this()​,​·&output_deadline_)​)​;​
174 ··​}168 ··​}
175 169
176 private:​170 private:​
177 ··​void·​stop()​171 ··​void·​stop()​
178 ··​{172 ··​{
179 ····​channel_.​leave(shared_from_thi​s()​)​;​173 ····​channel_.​leave(shared_from_thi​s()​)​;​
180 174
181 ····asio:​:​error_code·​ignored_ec;​175 ····​std:​:​error_code·​ignored_error;​
182 ····​socket_.​close(ignored_ec)​;​176 ····​socket_.​close(ignored_error)​;​
183 ····​input_deadline_.​cancel()​;​177 ····​input_deadline_.​cancel()​;​
184 ····​non_empty_output_queu​e_.​cancel()​;​178 ····​non_empty_output_queu​e_.​cancel()​;​
185 ····​output_deadline_.​cancel()​;​179 ····​output_deadline_.​cancel()​;​
186 ··​}180 ··​}
187 181
188 ··​bool·​stopped()​·​const182 ··​bool·​stopped()​·​const
189 ··​{183 ··​{
190 ····​return·​!socket_.​is_open()​;​184 ····​return·​!socket_.​is_open()​;​
191 ··​}185 ··​}
192 186
193 ··​void·​deliver(const·​std:​:​string&·​msg)​187 ··​void·​deliver(const·​std:​:​string&·​msg)​·override
194 ··​{188 ··​{
195 ····​output_queue_.​push_back(msg·​+·​"\n")​;​189 ····​output_queue_.​push_back(msg·​+·​"\n")​;​
196 190
197 ····​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying·​the·​expiry191 ····​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying·​the·​expiry
198 ····​/​/​·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on·​the·​timer.​192 ····​/​/​·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on·​the·​timer.​
199 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​193 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​
200 ··​}194 ··​}
201 195
202 ··​void·start_read()​196 ··​void·read_line()​
203 ··​{197 ··​{
204 ····​/​/​·​Set·​a·​deadline·​for·​the·​read·​operation.​198 ····​/​/​·​Set·​a·​deadline·​for·​the·​read·​operation.​
205 ····​input_deadline_.​expires_after(asio:​:​chrono:​:​seconds(30)​)​;​199 ····​input_deadline_.​expires_after(std:​:​chrono:​:​seconds(30)​)​;​
206 200
207 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​read·​a·​newline-​delimited·​message.​201 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​read·​a·​newline-​delimited·​message.​
202 ····​auto·​self(shared_from_this​()​)​;​
208 ····​asio:​:​async_read_until(sock​et_,​203 ····​asio:​:​async_read_until(sock​et_,​
209 ········​asio:​:​dynamic_buffer(input_​buffer_)​,​·​'\n',​204 ········​asio:​:​dynamic_buffer(input_​buffer_)​,​·​'\n',​
210 ········boost:​:​bind(&tcp_session:​:​handle_read,​·shared_from_this()​,​·_1,​·_2)​)​;​205 ········[this,​·self](const·std:​:​error_code&·error,​·std:​:​size_t·n)​
211 ··}
212
213 ··void·handle_read(const·asio:​:​error_code&·ec,​·std:​:​size_t·n)​
214 ··{
215 ····if·(stopped()​)​
216 ······return;​
217
218 ····if·(!ec)​
219 ····{
220 ······/​/​·Extract·the·newline-​delimited·message·from·the·buffer.​
221 ······std:​:​string·msg(input_buffer_.​substr(0,​·n·-​·1)​)​;​
222 ······input_buffer_.​erase(0,​·n)​;​
223
224 ······if·(!msg.​empty()​)​
225 ······{
226 ········channel_.​deliver(msg)​;​
227 ······}
228 ······else
229 ······{
230 ········/​/​·We·received·a·heartbeat·message·from·the·client.​·If·there's·nothing
231 ········/​/​·else·being·sent·or·ready·to·be·sent,​·send·a·heartbeat·right·back.​
232 ········if·(output_queue_.​empty()​)​
233 ········​{206 ········​{
234 ··········​output_queue_.​push_back("\n")​;​207 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
235 208 ··········if·(stopped()​)​
236 ··········/​/​·Signal·that·the·output·queue·contains·messages.​·Modifying·the209 ············return;​
237 ··········/​/​·expiry·will·wake·the·output·actor,​·if·it·is·waiting·on·the·timer.​210
238 ··········non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​211 ··········​if·(!error)​
239 ········}212 ··········{
240 ······}213 ············/​/​·Extract·the·newline-​delimited·message·from·the·buffer.​
241 214 ············std:​:​string·msg(input_buffer_.​substr(0,​·n·-​·1)​)​;​
242 ······start_read()​;​215 ············input_buffer_.​erase(0,​·n)​;​
243 ····}216
244 ····else217 ············if·(!msg.​empty()​)​
245 ····​{218 ············​{
246 ······stop()​;​219 ··············channel_.​deliver(msg)​;​
247 ····​}220 ············​}
221 ············​else
222 ············​{
223
224 ··············​/​/​·​We·​received·​a·​heartbeat·​message·​from·​the·​client.​·​If·​there's
225 ··············​/​/​·​nothing·​else·​being·​sent·​or·​ready·​to·​be·​sent,​·​send·​a·​heartbeat
226 ··············​/​/​·​right·​back.​
227 ··············​if·​(output_queue_.​empty()​)​
228 ··············​{
229 ················​output_queue_.​push_back("\n")​;​
230
231 ················​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying
232 ················​/​/​·​the·​expiry·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on
233 ················​/​/​·​the·​timer.​
234 ················​non_empty_output_queu​e_.​expires_at(
235 ····················​steady_timer:​:​time_point:​:​min()​)​;​
236 ··············​}
237 ············​}
238
239 ············​read_line()​;​
240 ··········​}
241 ··········​else
242 ··········​{
243 ············​stop()​;​
244 ··········​}
245 ········​})​;​
248 ··​}246 ··​}
249 247
250 ··​void·​await_output()​248 ··​void·​await_output()​
251 ··​{249 ··​{
252 ····if·​(stopped()​)​250 ····auto·self(shared_from_this​()​)​;​
253 ······return;​251 ····non_empty_output_queu​e_.​async_wait(
254 252 ········[this,​·self](const·std:​:​error_code&·/​*error*/​)​
255 ····if·(output_queue_.​empty()​)​253 ········{
256 ····{254 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
257 ······/​/​·There·are·no·messages·that·are·ready·to·be·sent.​·The·actor·goes·to255 ··········if·(stopped()​)​
258 ······/​/​·sleep·by·waiting·on·the·non_empty_output_queu​e_·timer.​·When·a·new256 ············return;​
259 ······/​/​·message·is·added,​·the·timer·will·be·modified·and·the·actor·will·wake.​257
260 ······non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​258 ··········if·(output_queue_.​empty()​)​
261 ······non_empty_output_queu​e_.​async_wait(259 ··········{
262 ··········boost:​:​bind(&tcp_session:​:​await_output,​·shared_from_this()​)​)​;​260 ············/​/​·There·are·no·messages·that·are·ready·to·be·sent.​·The·actor·goes
263 ····}261 ············/​/​·to·sleep·by·waiting·on·the·non_empty_output_queu​e_·timer.​·When·a
264 ····​else262 ············/​/​·new·message·is·added,​·the·timer·will·be·modified·and·the·actor
265 ····{263 ············/​/​·will·wake.​
266 ······​start_write()​;​264 ············non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
267 ····}265 ············await_output()​;​
266 ··········​}
267 ··········​else
268 ··········​{
269 ············​write_line()​;​
270 ··········​}
271 ········​})​;​
268 ··​}272 ··​}
269 273
270 ··​void·start_write()​274 ··​void·write_line()​
271 ··​{275 ··​{
272 ····​/​/​·​Set·​a·​deadline·​for·​the·​write·​operation.​276 ····​/​/​·​Set·​a·​deadline·​for·​the·​write·​operation.​
273 ····​output_deadline_.​expires_after(asio:​:​chrono:​:​seconds(30)​)​;​277 ····​output_deadline_.​expires_after(std:​:​chrono:​:​seconds(30)​)​;​
274 278
275 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​send·​a·​message.​279 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​send·​a·​message.​
280 ····​auto·​self(shared_from_this​()​)​;​
276 ····​asio:​:​async_write(socket_,​281 ····​asio:​:​async_write(socket_,​
277 ········​asio:​:​buffer(output_queue_.​front()​)​,​282 ········​asio:​:​buffer(output_queue_.​front()​)​,​
278 ········boost:​:​bind(&tcp_session:​:​handle_write,​·shared_from_this()​,​·_1)​)​;​283 ········[this,​·self](const·std:​:​error_code&·error,​·std:​:​size_t·/​*n*/​)​
279 ··}284 ········{
280 285 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
281 ··void·handle_write(const·asio:​:​error_code&·ec)​286 ··········if·(stopped()​)​
282 ··{287 ············return;​
283 ····if·(stopped()​)​288
284 ······return;​289 ··········if·(!error)​
285 290 ··········{
286 ····if·(!ec)​291 ············output_queue_.​pop_front()​;​
287 ····{292
288 ······output_queue_.​pop_front()​;​293 ············await_output()​;​
289 294 ··········}
290 ······await_output()​;​295 ··········else
291 ····}296 ··········{
292 ····else297 ············stop()​;​
293 ····{298 ··········}
294 ······stop()​;​299 ········})​;​
295 ····​}300 ··​}
296 ··}301
297 302 ··void·check_deadline(steady​_timer&·deadline)​
298 ··void·check_deadline(steady​_timer*·deadline)​303 ··{
299 ··{304 ····auto·self(shared_from_this​()​)​;​
300 ····​if·(stopped()​)​305 ····deadline.​async_wait(
301 ······​return;​306 ········[this,​·self,​·&deadline](const·std:​:​error_code&·/​*error*/​)​
302 307 ········{
303 ····/​/​·Check·whether·​the·deadline·has·passed.​·We·compare·​the·deadline·against308 ··········/​/​·Check·if·​the·session·was·​stopped·while·​the·operation·was·pending.​
304 ····/​/​·the·current·time·since·a·new·asynchronous·operation·may·have·moved·the309 ··········if·(stopped()​)​
305 ····/​/​·deadline·before·this·actor·had·a·chance·to·run.​310 ············return;​
306 ····if·(deadline-​>expiry()​·<=·steady_timer:​:​clock_type:​:​now()​)​311
307 ····{312 ··········/​/​·Check·whether·the·deadline·has·passed.​·We·compare·the·deadline
308 ······/​/​·The·deadline·has·passed.​·Stop·the·session.​·The·​other·actors·will313 ··········/​/​·​against·the·current·time·since·a·new·asynchronous·​operation·may
309 ······​/​/​·​terminate·as·soon·​as·possible.​314 ··········​/​/​·have·moved·​the·deadline·before·this·actor·had·a·chance·to·run.​
310 ······​stop()​;​315 ··········if·(deadline.​expiry()​·<=·​steady_timer:​:​clock_type:​:​now()​)​
311 ····}316 ··········{
312 ····​else317 ············/​/​·The·deadline·has·passed.​·Stop·the·session.​·The·other·actors·will
313 ····{318 ············/​/​·terminate·as·soon·as·possible.​
314 ······/​/​·Put·the·actor·back·to·​sleep.​319 ············​stop()​;​
315 ······deadline-​>async_wait(320 ··········}
316 ··········boost:​:​bind(&tcp_session:​:​check_deadline,​321 ··········​else
317 ··········shared_from_this()​,​·deadline)​)​;​322 ··········{
318 ····}323 ············/​/​·Put·the·actor·back·to·sleep.​
324 ············​check_deadline(deadli​ne)​;​
325 ··········​}
326 ········​})​;​
319 ··​}327 ··​}
320 328
321 ··​channel&·​channel_;​329 ··​channel&·​channel_;​
322 ··​tcp:​:​socket·​socket_;​330 ··​tcp:​:​socket·​socket_;​
323 ··​std:​:​string·​input_buffer_;​331 ··​std:​:​string·​input_buffer_;​
324 ··​steady_timer·​input_deadline_;​332 ··​steady_timer·​input_deadline_{socke​t_.​get_executor()​.​context()​};​
325 ··​std:​:​deque<std:​:​string>·​output_queue_;​333 ··​std:​:​deque<std:​:​string>·​output_queue_;​
326 ··​steady_timer·​non_empty_output_queu​e_;​334 ··​steady_timer·​non_empty_output_queu​e_{socket_.​get_executor()​.​context()​};​
327 ··​steady_timer·​output_deadline_;​335 ··​steady_timer·​output_deadline_{sock​et_.​get_executor()​.​context()​};​
328 };​336 };​
329 337
330 typedef·boost:​:​shared_ptr<tcp_sessio​n>·​tcp_session_ptr;​338 typedef·​std:​:​shared_ptr<tcp_sessio​n>·​tcp_session_ptr;​
331 339
332 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​340 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
333 341
334 class·​udp_broadcaster342 class·​udp_broadcaster
335 ··​:​·​public·​subscriber343 ··​:​·​public·​subscriber
336 {344 {
337 public:​345 public:​
338 ··​udp_broadcaster(asio:​:​io_context&·​io_context,​346 ··​udp_broadcaster(asio:​:​io_context&·​io_context,​
339 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​347 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​
340 ····​:​·​socket_(io_context)​348 ····​:​·​socket_(io_context)​
341 ··​{349 ··​{
342 ····​socket_.​connect(broadcast_end​point)​;​350 ····​socket_.​connect(broadcast_end​point)​;​
343 ····​socket_.​set_option(udp:​:​socket:​:​broadcast(true)​)​;​351 ····​socket_.​set_option(udp:​:​socket:​:​broadcast(true)​)​;​
344 ··​}352 ··​}
345 353
346 private:​354 private:​
347 ··​void·​deliver(const·​std:​:​string&·​msg)​355 ··​void·​deliver(const·​std:​:​string&·​msg)​
348 ··​{356 ··​{
349 ····asio:​:​error_code·​ignored_ec;​357 ····​std:​:​error_code·​ignored_error;​
350 ····​socket_.​send(asio:​:​buffer(msg)​,​·​0,​·​ignored_ec)​;​358 ····​socket_.​send(asio:​:​buffer(msg)​,​·​0,​·​ignored_error)​;​
351 ··​}359 ··​}
352 360
353 ··​udp:​:​socket·​socket_;​361 ··​udp:​:​socket·​socket_;​
354 };​362 };​
355 363
356 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​364 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
357 365
358 class·​server366 class·​server
359 {367 {
360 public:​368 public:​
361 ··​server(asio:​:​io_context&·​io_context,​369 ··​server(asio:​:​io_context&·​io_context,​
362 ······​const·​tcp:​:​endpoint&·​listen_endpoint,​370 ······​const·​tcp:​:​endpoint&·​listen_endpoint,​
363 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​371 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​
364 ····​:​·​io_context_(io_contex​t)​,​372 ····​:​·​io_context_(io_contex​t)​,​
365 ······​acceptor_(io_context,​·​listen_endpoint)​373 ······​acceptor_(io_context,​·​listen_endpoint)​
366 ··​{374 ··​{
367 ····subscriber_ptr·bc(new·udp_broadcaster(io_co​ntext_,​·broadcast_endpoint)​)​;​375 ····​channel_.​join(
368 ····channel_.​join(bc)​;​376 ········std:​:​make_shared<udp_broad​caster>(
377 ··········​io_context_,​·​broadcast_endpoint)​)​;​
369 378
370 ····start_accept()​;​379 ····​accept()​;​
371 ··​}380 ··​}
372 381
373 ··void·start_accept()​382 private:​
374 ··{383 ··void·accept()​
375 ····tcp_session_ptr·new_session(new·tcp_session(io_contex​t_,​·channel_)​)​;​
376
377 ····acceptor_.​async_accept(new_sess​ion-​>socket()​,​
378 ········boost:​:​bind(&server:​:​handle_accept,​·this,​·new_session,​·_1)​)​;​
379 ··}
380
381 ··void·handle_accept(tcp_ses​sion_ptr·session,​
382 ······const·asio:​:​error_code&·ec)​
383 ··​{384 ··​{
384 ····if·(!ec)​385 ····acceptor_.​async_accept(
385 ····{386 ········[this](const·std:​:​error_code&·error,​·tcp:​:​socket·socket)​
386 ······session-​>start()​;​387 ········{
387 ····}388 ··········if·(!error)​
389 ··········​{
390 ············​std:​:​make_shared<tcp_sessi​on>(std:​:​move(socket)​,​·​channel_)​-​>start()​;​
391 ··········​}
388 392
389 ····start_accept()​;​393 ··········accept()​;​
394 ········​})​;​
390 ··​}395 ··​}
391 396
392 private:​
393 ··​asio:​:​io_context&·​io_context_;​397 ··​asio:​:​io_context&·​io_context_;​
394 ··​tcp:​:​acceptor·​acceptor_;​398 ··​tcp:​:​acceptor·​acceptor_;​
395 ··​channel·​channel_;​399 ··​channel·​channel_;​
396 };​400 };​
397 401
398 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​402 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
399 403
400 int·​main(int·​argc,​·​char*·​argv[])​404 int·​main(int·​argc,​·​char*·​argv[])​
401 {405 {
402 ··​try406 ··​try
403 ··​{407 ··​{
404 ····​using·​namespace·​std;​·​/​/​·​For·​atoi.​408 ····​using·​namespace·​std;​·​/​/​·​For·​atoi.​
405 409
406 ····​if·​(argc·​!=·​4)​410 ····​if·​(argc·​!=·​4)​
407 ····​{411 ····​{
408 ······​std:​:​cerr·​<<·​"Usage:​·​server·​<listen_port>·​<bcast_address>·​<bcast_port>\n";​412 ······​std:​:​cerr·​<<·​"Usage:​·​server·​<listen_port>·​<bcast_address>·​<bcast_port>\n";​
409 ······​return·​1;​413 ······​return·​1;​
410 ····​}414 ····​}
411 415
412 ····​asio:​:​io_context·​io_context;​416 ····​asio:​:​io_context·​io_context;​
413 417
414 ····​tcp:​:​endpoint·​listen_endpoint(tcp:​:​v4()​,​·​atoi(argv[1])​)​;​418 ····​tcp:​:​endpoint·​listen_endpoint(tcp:​:​v4()​,​·​atoi(argv[1])​)​;​
415 419
416 ····​udp:​:​endpoint·​broadcast_endpoint(420 ····​udp:​:​endpoint·​broadcast_endpoint(
417 ········​asio:​:​ip:​:​make_address(argv[2])​,​·​atoi(argv[3])​)​;​421 ········​asio:​:​ip:​:​make_address(argv[2])​,​·​atoi(argv[3])​)​;​
418 422
419 ····​server·​s(io_context,​·​listen_endpoint,​·​broadcast_endpoint)​;​423 ····​server·​s(io_context,​·​listen_endpoint,​·​broadcast_endpoint)​;​
420 424
421 ····​io_context.​run()​;​425 ····​io_context.​run()​;​
422 ··​}426 ··​}
423 ··​catch·​(std:​:​exception&·​e)​427 ··​catch·​(std:​:​exception&·​e)​
424 ··​{428 ··​{
425 ····​std:​:​cerr·​<<·​"Exception:​·​"·​<<·​e.​what()​·​<<·​"\n";​429 ····​std:​:​cerr·​<<·​"Exception:​·​"·​<<·​e.​what()​·​<<·​"\n";​
426 ··​}430 ··​}
427 431
428 ··​return·​0;​432 ··​return·​0;​
429 }433 }