src/​examples/​cpp03/​timeouts/​server.​cppsrc/​examples/​cpp11/​timeouts/​server.​cpp
1 /​/​1 /​/​
2 /​/​·​server.​cpp2 /​/​·​server.​cpp
3 /​/​·​~~~~~~~~~~3 /​/​·​~~~~~~~~~~
4 /​/​4 /​/​
5 /​/​·​Copyright·​(c)​·​2003-​2021·​Christopher·​M.​·​Kohlhoff·​(chris·​at·​kohlhoff·​dot·​com)​5 /​/​·​Copyright·​(c)​·​2003-​2021·​Christopher·​M.​·​Kohlhoff·​(chris·​at·​kohlhoff·​dot·​com)​
6 /​/​6 /​/​
7 /​/​·​Distributed·​under·​the·​Boost·​Software·​License,​·​Version·​1.​0.​·​(See·​accompanying7 /​/​·​Distributed·​under·​the·​Boost·​Software·​License,​·​Version·​1.​0.​·​(See·​accompanying
8 /​/​·​file·​LICENSE_1_0.​txt·​or·​copy·​at·​http:​/​/​www.​boost.​org/​LICENSE_1_0.​txt)​8 /​/​·​file·​LICENSE_1_0.​txt·​or·​copy·​at·​http:​/​/​www.​boost.​org/​LICENSE_1_0.​txt)​
9 /​/​9 /​/​
10 10
11 #include·​<algorithm>11 #include·​<algorithm>
12 #include·​<cstdlib>12 #include·​<cstdlib>
13 #include·​<deque>13 #include·​<deque>
14 #include·​<iostream>14 #include·​<iostream>
15 #include·​<memory>
15 #include·​<set>16 #include·​<set>
16 #include·​<string>17 #include·​<string>
17 #include·​<boost/​bind/​bind.​hpp>
18 #include·​<boost/​shared_ptr.​hpp>
19 #include·​<boost/​enable_shared_from_th​is.​hpp>
20 #include·​"asio/​buffer.​hpp"18 #include·​"asio/​buffer.​hpp"
21 #include·​"asio/​io_context.​hpp"19 #include·​"asio/​io_context.​hpp"
22 #include·​"asio/​ip/​tcp.​hpp"20 #include·​"asio/​ip/​tcp.​hpp"
23 #include·​"asio/​ip/​udp.​hpp"21 #include·​"asio/​ip/​udp.​hpp"
24 #include·​"asio/​read_until.​hpp"22 #include·​"asio/​read_until.​hpp"
25 #include·​"asio/​steady_timer.​hpp"23 #include·​"asio/​steady_timer.​hpp"
26 #include·​"asio/​write.​hpp"24 #include·​"asio/​write.​hpp"
27 25
28 using·​asio:​:​steady_timer;​26 using·​asio:​:​steady_timer;​
29 using·​asio:​:​ip:​:​tcp;​27 using·​asio:​:​ip:​:​tcp;​
30 using·​asio:​:​ip:​:​udp;​28 using·​asio:​:​ip:​:​udp;​
31 29
32 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​30 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
33 31
34 class·​subscriber32 class·​subscriber
35 {33 {
36 public:​34 public:​
37 ··​virtual·​~subscriber()​·{}35 ··​virtual·​~subscriber()​·=·default;​
38 ··​virtual·​void·​deliver(const·​std:​:​string&·​msg)​·​=·​0;​36 ··​virtual·​void·​deliver(const·​std:​:​string&·​msg)​·​=·​0;​
39 };​37 };​
40 38
41 typedef·boost:​:​shared_ptr<subscriber​>·​subscriber_ptr;​39 typedef·​std:​:​shared_ptr<subscriber​>·​subscriber_ptr;​
42 40
43 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​41 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
44 42
45 class·​channel43 class·​channel
46 {44 {
47 public:​45 public:​
48 ··​void·​join(subscriber_ptr·​subscriber)​46 ··​void·​join(subscriber_ptr·​subscriber)​
49 ··​{47 ··​{
50 ····​subscribers_.​insert(subscriber)​;​48 ····​subscribers_.​insert(subscriber)​;​
51 ··​}49 ··​}
52 50
53 ··​void·​leave(subscriber_ptr·​subscriber)​51 ··​void·​leave(subscriber_ptr·​subscriber)​
54 ··​{52 ··​{
55 ····​subscribers_.​erase(subscriber)​;​53 ····​subscribers_.​erase(subscriber)​;​
56 ··​}54 ··​}
57 55
58 ··​void·​deliver(const·​std:​:​string&·​msg)​56 ··​void·​deliver(const·​std:​:​string&·​msg)​
59 ··​{57 ··​{
60 ····std:​:​for_each(subscribers_​.​begin()​,​·​subscribers_.​end()​,​58 ····​for·(const·auto&·s·:​·​subscribers_)​
61 ········boost:​:​bind(&subscriber:​:​deliver,​59 ····{
62 ··········boost:​:​placeholders:​:​_1,​·boost:​:​ref(msg)​)​)​;​60 ······​s-​>deliver(msg)​;​
61 ····​}
63 ··​}62 ··​}
64 63
65 private:​64 private:​
66 ··​std:​:​set<subscriber_ptr>·​subscribers_;​65 ··​std:​:​set<subscriber_ptr>·​subscribers_;​
67 };​66 };​
68 67
69 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​68 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
70 69
71 /​/​70 /​/​
72 /​/​·​This·​class·​manages·​socket·​timeouts·​by·​applying·​the·​concept·​of·​a·​deadline.​71 /​/​·​This·​class·​manages·​socket·​timeouts·​by·​applying·​the·​concept·​of·​a·​deadline.​
73 /​/​·​Some·​asynchronous·​operations·​are·​given·​deadlines·​by·​which·​they·​must·​complete.​72 /​/​·​Some·​asynchronous·​operations·​are·​given·​deadlines·​by·​which·​they·​must·​complete.​
74 /​/​·​Deadlines·​are·​enforced·​by·​two·​"actors"·​that·​persist·​for·​the·​lifetime·​of·​the73 /​/​·​Deadlines·​are·​enforced·​by·​two·​"actors"·​that·​persist·​for·​the·​lifetime·​of·​the
75 /​/​·​session·​object,​·​one·​for·​input·​and·​one·​for·​output:​74 /​/​·​session·​object,​·​one·​for·​input·​and·​one·​for·​output:​
76 /​/​75 /​/​
77 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+·····················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+76 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+······················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
78 /​/​··​|················​|·····················​|················​|77 /​/​··​|················​|······················​|················​|
79 /​/​··​|·​check_deadline·​|<-​-​-​+················​|·​check_deadline·​|<-​-​-​+78 /​/​··​|·​check_deadline·​|<-​-​-​-​-​-​-​+·············​|·​check_deadline·​|<-​-​-​-​-​-​-​+
80 /​/​··​|················​|····|·async_wait()​···​|················​|····|·async_wait()​79 /​/​··​|················​|········|·············​|················​|········|
81 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|··on·input······​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|··on·output80 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|·············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|
82 /​/​··············​|·········|··deadline··················​|·········|··deadline81 /​/​···············​|············|··························​|············|
83 /​/​··············​+-​-​-​-​-​-​-​-​-​+····························+-​-​-​-​-​-​-​-​-​+82 /​/​··async_wait()​·|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····async_wait()​·|····+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
83 /​/​···​on·​input····​|····​|·····​lambda·····​|·····​on·​output···​|····​|·····​lambda·····​|
84 /​/​···​deadline····​+-​-​-​>|·······​in·······​|·····​deadline····​+-​-​-​>|·······​in·······​|
85 /​/​····················​|·​check_deadline·​|······················​|·​check_deadline·​|
86 /​/​····················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+······················​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
84 /​/​87 /​/​
85 /​/​·​If·​either·​deadline·​actor·​determines·​that·​the·​corresponding·​deadline·​has88 /​/​·​If·​either·​deadline·​actor·​determines·​that·​the·​corresponding·​deadline·​has
86 /​/​·​expired,​·​the·​socket·​is·​closed·​and·​any·​outstanding·​operations·​are·​cancelled.​89 /​/​·​expired,​·​the·​socket·​is·​closed·​and·​any·​outstanding·​operations·​are·​cancelled.​
87 /​/​90 /​/​
88 /​/​·​The·​input·​actor·​reads·​messages·​from·​the·​socket,​·​where·​messages·​are·​delimited91 /​/​·​The·​input·​actor·​reads·​messages·​from·​the·​socket,​·​where·​messages·​are·​delimited
89 /​/​·​by·​the·​newline·​character:​92 /​/​·​by·​the·​newline·​character:​
90 /​/​93 /​/​
91 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​+94 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
92 /​/​··​|············​|95 /​/​··​|·············​|
93 /​/​··​|·start_read·​|<-​-​-​+96 /​/​··​|··read_line··​|<-​-​-​-​+
94 /​/​··​|············​|····​|97 /​/​··​|·············​|·····​|
95 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​+····​|98 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+·····​|
96 /​/​··········​|·········​|99 /​/​··········​|···········​|
97 /​/​··​async_-​·​|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​+100 /​/​··​async_-​·​|····​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
98 /​/​···​read_-​·​|····​|·············​|101 /​/​···​read_-​·​|····​|···lambda····​|
99 /​/​··​until()​·​+-​-​-​>|·handle_read·​|102 /​/​··​until()​·​+-​-​-​>|·····in······​|
100 /​/​···············​|·············​|103 /​/​···············​|··read_line··​|
101 /​/​···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​+104 /​/​···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​+
102 /​/​105 /​/​
103 /​/​·​The·​deadline·​for·​receiving·​a·​complete·​message·​is·​30·​seconds.​·​If·​a·​non-​empty106 /​/​·​The·​deadline·​for·​receiving·​a·​complete·​message·​is·​30·​seconds.​·​If·​a·​non-​empty
104 /​/​·​message·​is·​received,​·​it·​is·​delivered·​to·​all·​subscribers.​·​If·​a·​heartbeat·​(a107 /​/​·​message·​is·​received,​·​it·​is·​delivered·​to·​all·​subscribers.​·​If·​a·​heartbeat·​(a
105 /​/​·​message·​that·​consists·​of·​a·​single·​newline·​character)​·​is·​received,​·​a·​heartbeat108 /​/​·​message·​that·​consists·​of·​a·​single·​newline·​character)​·​is·​received,​·​a·​heartbeat
106 /​/​·​is·​enqueued·​for·​the·​client,​·​provided·​there·​are·​no·​other·​messages·​waiting·​to109 /​/​·​is·​enqueued·​for·​the·​client,​·​provided·​there·​are·​no·​other·​messages·​waiting·​to
107 /​/​·​be·​sent.​110 /​/​·​be·​sent.​
108 /​/​111 /​/​
109 /​/​·​The·​output·​actor·​is·​responsible·​for·​sending·​messages·​to·​the·​client:​112 /​/​·​The·​output·​actor·​is·​responsible·​for·​sending·​messages·​to·​the·​client:​
110 /​/​113 /​/​
111 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+114 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
112 /​/​··​|··············​|<-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+115 /​/​··​|················​|<-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
113 /​/​··​|·​await_output·​|······················​|116 /​/​··​|··​await_output··​|······················​|
114 /​/​··​|··············​|<-​-​-​+·················​|117 /​/​··​|················​|<-​-​-​-​-​-​-​+·············​|
115 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····|·················​|118 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+········|·············​|
116 /​/​······|······|········|·async_wait()​····​|119 /​/​····|············|··········|·············​|
117 /​/​······|······​+-​-​-​-​-​-​-​-​+·················​|120 /​/​····|····async_-​·|··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····​|
118 /​/​······V·································​|121 /​/​····|·····wait()​·|··|·····lambda·····|····​|
119 /​/​··+-​-​-​-​-​-​-​-​-​-​-​-​-​+···············+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+122 /​/​····|············+-​>|·······in·······|····|
120 /​/​··|·············|·async_write()​·|··············​|123 /​/​····|···············|··await_output··|····​|
121 /​/​··​|·start_write·|-​-​-​-​-​-​-​-​-​-​-​-​-​-​>|·handle_write·​|124 /​/​····​|···············+-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​+····​|
122 /​/​··|·············|···············|··············​|125 /​/​····V·····································​|
123 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+126 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
127 /​/​··​|··············​|·​async_write()​·​|····​lambda····​|
128 /​/​··​|··​write_line··​|-​-​-​-​-​-​-​-​-​-​-​-​-​-​>|······​in······​|
129 /​/​··​|··············​|···············​|··​write_line··​|
130 /​/​··​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+···············​+-​-​-​-​-​-​-​-​-​-​-​-​-​-​+
124 /​/​131 /​/​
125 /​/​·​The·​output·​actor·​first·​waits·​for·​an·​output·​message·​to·​be·​enqueued.​·​It·​does132 /​/​·​The·​output·​actor·​first·​waits·​for·​an·​output·​message·​to·​be·​enqueued.​·​It·​does
126 /​/​·​this·​by·​using·​a·​steady_timer·​as·​an·​asynchronous·​condition·​variable.​·​The133 /​/​·​this·​by·​using·​a·​steady_timer·​as·​an·​asynchronous·​condition·​variable.​·​The
127 /​/​·​steady_timer·​will·​be·​signalled·​whenever·​the·​output·​queue·​is·​non-​empty.​134 /​/​·​steady_timer·​will·​be·​signalled·​whenever·​the·​output·​queue·​is·​non-​empty.​
128 /​/​135 /​/​
129 /​/​·​Once·​a·​message·​is·​available,​·​it·​is·​sent·​to·​the·​client.​·​The·​deadline·​for136 /​/​·​Once·​a·​message·​is·​available,​·​it·​is·​sent·​to·​the·​client.​·​The·​deadline·​for
130 /​/​·​sending·​a·​complete·​message·​is·​30·​seconds.​·​After·​the·​message·​is·​successfully137 /​/​·​sending·​a·​complete·​message·​is·​30·​seconds.​·​After·​the·​message·​is·​successfully
131 /​/​·​sent,​·​the·​output·​actor·​again·​waits·​for·​the·​output·​queue·​to·​become·​non-​empty.​138 /​/​·​sent,​·​the·​output·​actor·​again·​waits·​for·​the·​output·​queue·​to·​become·​non-​empty.​
132 /​/​139 /​/​
133 class·​tcp_session140 class·​tcp_session
134 ··​:​·​public·​subscriber,​141 ··​:​·​public·​subscriber,​
135 ····​public·boost:​:​enable_shared_from_th​is<tcp_session>142 ····​public·​std:​:​enable_shared_from_th​is<tcp_session>
136 {143 {
137 public:​144 public:​
138 ··​tcp_session(asio:​:​io_context&·io_context,​·​channel&·​ch)​145 ··​tcp_session(tcp:​:​socket·socket,​·​channel&·​ch)​
139 ····​:​·​channel_(ch)​,​146 ····​:​·​channel_(ch)​,​
140 ······​socket_(io_context)​,​147 ······​socket_(std:​:​move(socket)​)​
141 ······input_deadline_(io_co​ntext)​,​
142 ······non_empty_output_queu​e_(io_context)​,​
143 ······output_deadline_(io_c​ontext)​
144 ··​{148 ··​{
145 ····​input_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​149 ····​input_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
146 ····​output_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​150 ····​output_deadline_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
147 151
148 ····​/​/​·​The·​non_empty_output_queu​e_·​steady_timer·​is·​set·​to·​the·​maximum·​time152 ····​/​/​·​The·​non_empty_output_queu​e_·​steady_timer·​is·​set·​to·​the·​maximum·​time
149 ····​/​/​·​point·​whenever·​the·​output·​queue·​is·​empty.​·​This·​ensures·​that·​the·​output153 ····​/​/​·​point·​whenever·​the·​output·​queue·​is·​empty.​·​This·​ensures·​that·​the·​output
150 ····​/​/​·​actor·​stays·​asleep·​until·​a·​message·​is·​put·​into·​the·​queue.​154 ····​/​/​·​actor·​stays·​asleep·​until·​a·​message·​is·​put·​into·​the·​queue.​
151 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​155 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
152 ··​}156 ··​}
153 157
154 ··​tcp:​:​socket&·​socket()​
155 ··​{
156 ····​return·​socket_;​
157 ··​}
158
159 ··​/​/​·​Called·​by·​the·​server·​object·​to·​initiate·​the·​four·​actors.​158 ··​/​/​·​Called·​by·​the·​server·​object·​to·​initiate·​the·​four·​actors.​
160 ··​void·​start()​159 ··​void·​start()​
161 ··​{160 ··​{
162 ····​channel_.​join(shared_from_this​()​)​;​161 ····​channel_.​join(shared_from_this​()​)​;​
163 162
164 ····start_read()​;​163 ····read_line()​;​
165 164 ····check_deadline(input_​deadline_)​;​
166 ····input_deadline_.​async_wait(
167 ········boost:​:​bind(&tcp_session:​:​check_deadline,​
168 ········shared_from_this()​,​·&input_deadline_)​)​;​
169 165
170 ····​await_output()​;​166 ····​await_output()​;​
171 167 ····check_deadline(output​_deadline_)​;​
172 ····output_deadline_.​async_wait(
173 ········boost:​:​bind(&tcp_session:​:​check_deadline,​
174 ········shared_from_this()​,​·&output_deadline_)​)​;​
175 ··​}168 ··​}
176 169
177 private:​170 private:​
178 ··​void·​stop()​171 ··​void·​stop()​
179 ··​{172 ··​{
180 ····​channel_.​leave(shared_from_thi​s()​)​;​173 ····​channel_.​leave(shared_from_thi​s()​)​;​
181 174
182 ····asio:​:​error_code·​ignored_ec;​175 ····​std:​:​error_code·​ignored_error;​
183 ····​socket_.​close(ignored_ec)​;​176 ····​socket_.​close(ignored_error)​;​
184 ····​input_deadline_.​cancel()​;​177 ····​input_deadline_.​cancel()​;​
185 ····​non_empty_output_queu​e_.​cancel()​;​178 ····​non_empty_output_queu​e_.​cancel()​;​
186 ····​output_deadline_.​cancel()​;​179 ····​output_deadline_.​cancel()​;​
187 ··​}180 ··​}
188 181
189 ··​bool·​stopped()​·​const182 ··​bool·​stopped()​·​const
190 ··​{183 ··​{
191 ····​return·​!socket_.​is_open()​;​184 ····​return·​!socket_.​is_open()​;​
192 ··​}185 ··​}
193 186
194 ··​void·​deliver(const·​std:​:​string&·​msg)​187 ··​void·​deliver(const·​std:​:​string&·​msg)​·override
195 ··​{188 ··​{
196 ····​output_queue_.​push_back(msg·​+·​"\n")​;​189 ····​output_queue_.​push_back(msg·​+·​"\n")​;​
197 190
198 ····​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying·​the·​expiry191 ····​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying·​the·​expiry
199 ····​/​/​·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on·​the·​timer.​192 ····​/​/​·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on·​the·​timer.​
200 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​193 ····​non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​
201 ··​}194 ··​}
202 195
203 ··​void·start_read()​196 ··​void·read_line()​
204 ··​{197 ··​{
205 ····​/​/​·​Set·​a·​deadline·​for·​the·​read·​operation.​198 ····​/​/​·​Set·​a·​deadline·​for·​the·​read·​operation.​
206 ····​input_deadline_.​expires_after(asio:​:​chrono:​:​seconds(30)​)​;​199 ····​input_deadline_.​expires_after(std:​:​chrono:​:​seconds(30)​)​;​
207 200
208 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​read·​a·​newline-​delimited·​message.​201 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​read·​a·​newline-​delimited·​message.​
202 ····​auto·​self(shared_from_this​()​)​;​
209 ····​asio:​:​async_read_until(sock​et_,​203 ····​asio:​:​async_read_until(sock​et_,​
210 ········​asio:​:​dynamic_buffer(input_​buffer_)​,​·​'\n',​204 ········​asio:​:​dynamic_buffer(input_​buffer_)​,​·​'\n',​
211 ········boost:​:​bind(&tcp_session:​:​handle_read,​·​shared_from_this()​,​205 ········[this,​·self](const·std:​:​error_code&·error,​·​std:​:​size_t·n)​
212 ··········boost:​:​placeholders:​:​_1,​·boost:​:​placeholders:​:​_2)​)​;​
213 ··}
214
215 ··void·handle_read(const·asio:​:​error_code&·ec,​·std:​:​size_t·n)​
216 ··{
217 ····if·(stopped()​)​
218 ······return;​
219
220 ····if·(!ec)​
221 ····{
222 ······/​/​·Extract·the·newline-​delimited·message·from·the·buffer.​
223 ······std:​:​string·msg(input_buffer_.​substr(0,​·n·-​·1)​)​;​
224 ······input_buffer_.​erase(0,​·n)​;​
225
226 ······if·(!msg.​empty()​)​
227 ······{
228 ········channel_.​deliver(msg)​;​
229 ······}
230 ······else
231 ······{
232 ········/​/​·We·received·a·heartbeat·message·from·the·client.​·If·there's·nothing
233 ········/​/​·else·being·sent·or·ready·to·be·sent,​·send·a·heartbeat·right·back.​
234 ········if·(output_queue_.​empty()​)​
235 ········​{206 ········​{
236 ··········​output_queue_.​push_back("\n")​;​207 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
237 208 ··········if·(stopped()​)​
238 ··········/​/​·Signal·that·the·output·queue·contains·messages.​·Modifying·the209 ············return;​
239 ··········/​/​·expiry·will·wake·the·output·actor,​·if·it·is·waiting·on·the·timer.​210
240 ··········non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​min()​)​;​211 ··········​if·(!error)​
241 ········}212 ··········{
242 ······}213 ············/​/​·Extract·the·newline-​delimited·message·from·the·buffer.​
243 214 ············std:​:​string·msg(input_buffer_.​substr(0,​·n·-​·1)​)​;​
244 ······start_read()​;​215 ············input_buffer_.​erase(0,​·n)​;​
245 ····}216
246 ····else217 ············if·(!msg.​empty()​)​
247 ····​{218 ············​{
248 ······stop()​;​219 ··············channel_.​deliver(msg)​;​
249 ····​}220 ············​}
221 ············​else
222 ············​{
223
224 ··············​/​/​·​We·​received·​a·​heartbeat·​message·​from·​the·​client.​·​If·​there's
225 ··············​/​/​·​nothing·​else·​being·​sent·​or·​ready·​to·​be·​sent,​·​send·​a·​heartbeat
226 ··············​/​/​·​right·​back.​
227 ··············​if·​(output_queue_.​empty()​)​
228 ··············​{
229 ················​output_queue_.​push_back("\n")​;​
230
231 ················​/​/​·​Signal·​that·​the·​output·​queue·​contains·​messages.​·​Modifying
232 ················​/​/​·​the·​expiry·​will·​wake·​the·​output·​actor,​·​if·​it·​is·​waiting·​on
233 ················​/​/​·​the·​timer.​
234 ················​non_empty_output_queu​e_.​expires_at(
235 ····················​steady_timer:​:​time_point:​:​min()​)​;​
236 ··············​}
237 ············​}
238
239 ············​read_line()​;​
240 ··········​}
241 ··········​else
242 ··········​{
243 ············​stop()​;​
244 ··········​}
245 ········​})​;​
250 ··​}246 ··​}
251 247
252 ··​void·​await_output()​248 ··​void·​await_output()​
253 ··​{249 ··​{
254 ····if·​(stopped()​)​250 ····auto·self(shared_from_this​()​)​;​
255 ······return;​251 ····non_empty_output_queu​e_.​async_wait(
256 252 ········[this,​·self](const·std:​:​error_code&·/​*error*/​)​
257 ····if·(output_queue_.​empty()​)​253 ········{
258 ····{254 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
259 ······/​/​·There·are·no·messages·that·are·ready·to·be·sent.​·The·actor·goes·to255 ··········if·(stopped()​)​
260 ······/​/​·sleep·by·waiting·on·the·non_empty_output_queu​e_·timer.​·When·a·new256 ············return;​
261 ······/​/​·message·is·added,​·the·timer·will·be·modified·and·the·actor·will·wake.​257
262 ······non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​258 ··········if·(output_queue_.​empty()​)​
263 ······non_empty_output_queu​e_.​async_wait(259 ··········{
264 ··········boost:​:​bind(&tcp_session:​:​await_output,​·shared_from_this()​)​)​;​260 ············/​/​·There·are·no·messages·that·are·ready·to·be·sent.​·The·actor·goes
265 ····}261 ············/​/​·to·sleep·by·waiting·on·the·non_empty_output_queu​e_·timer.​·When·a
266 ····​else262 ············/​/​·new·message·is·added,​·the·timer·will·be·modified·and·the·actor
267 ····{263 ············/​/​·will·wake.​
268 ······​start_write()​;​264 ············non_empty_output_queu​e_.​expires_at(steady_tim​er:​:​time_point:​:​max()​)​;​
269 ····}265 ············await_output()​;​
266 ··········​}
267 ··········​else
268 ··········​{
269 ············​write_line()​;​
270 ··········​}
271 ········​})​;​
270 ··​}272 ··​}
271 273
272 ··​void·start_write()​274 ··​void·write_line()​
273 ··​{275 ··​{
274 ····​/​/​·​Set·​a·​deadline·​for·​the·​write·​operation.​276 ····​/​/​·​Set·​a·​deadline·​for·​the·​write·​operation.​
275 ····​output_deadline_.​expires_after(asio:​:​chrono:​:​seconds(30)​)​;​277 ····​output_deadline_.​expires_after(std:​:​chrono:​:​seconds(30)​)​;​
276 278
277 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​send·​a·​message.​279 ····​/​/​·​Start·​an·​asynchronous·​operation·​to·​send·​a·​message.​
280 ····​auto·​self(shared_from_this​()​)​;​
278 ····​asio:​:​async_write(socket_,​281 ····​asio:​:​async_write(socket_,​
279 ········​asio:​:​buffer(output_queue_.​front()​)​,​282 ········​asio:​:​buffer(output_queue_.​front()​)​,​
280 ········boost:​:​bind(&tcp_session:​:​handle_write,​283 ········[this,​·self](const·std:​:​error_code&·error,​·std:​:​size_t·/​*n*/​)​
281 ··········shared_from_this()​,​·boost:​:​placeholders:​:​_1)​)​;​284 ········{
282 ··}285 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
283 286 ··········if·(stopped()​)​
284 ··void·handle_write(const·asio:​:​error_code&·ec)​287 ············return;​
285 ··{288
286 ····if·(stopped()​)​289 ··········if·(!error)​
287 ······return;​290 ··········{
288 291 ············output_queue_.​pop_front()​;​
289 ····if·(!ec)​292
290 ····{293 ············await_output()​;​
291 ······output_queue_.​pop_front()​;​294 ··········}
292 295 ··········else
293 ······await_output()​;​296 ··········{
294 ····}297 ············stop()​;​
295 ····else298 ··········}
296 ····{299 ········})​;​
297 ······stop()​;​300 ··}
298 ····}301
299 ··}302 ··void·check_deadline(steady​_timer&·deadline)​
300 303 ··{
301 ··void·check_deadline(steady​_timer*·deadline)​304 ····auto·self(shared_from_this​()​)​;​
302 ··{305 ····deadline.​async_wait(
303 ····​if·​(stopped()​)​306 ········[this,​·self,​·&deadline](const·std:​:​error_code&·/​*error*/​)​
304 ······return;​307 ········{
305 308 ··········/​/​·Check·if·the·session·was·stopped·while·the·operation·was·pending.​
306 ····/​/​·Check·whether·the·deadline·has·passed.​·We·compare·the·deadline·against309 ··········if·(stopped()​)​
307 ····/​/​·the·current·time·since·a·new·asynchronous·operation·may·have·moved·the310 ············​return;​
308 ····/​/​·deadline·before·this·actor·had·a·chance·to·run.​311
309 ····if·(deadline-​>expiry()​·<=·​steady_timer:​:​clock_type:​:​now()​)​312 ··········/​/​·Check·whether·the·deadline·has·passed.​·We·compare·the·deadline
310 ····{313 ··········/​/​·against·the·current·time·since·a·new·asynchronous·operation·may
311 ······​/​/​·The·deadline·has·passed.​·Stop·​the·session.​·The·other·actors·will314 ··········​/​/​·have·moved·the·deadline·before·​this·actor·had·a·chance·to·run.​
312 ······/​/​·terminate·as·​soon·as·possible.​315 ··········if·(deadline.​expiry()​·<=·​steady_timer:​:​clock_type:​:​now()​)​
313 ······stop()​;​316 ··········{
314 ····}317 ············/​/​·The·deadline·has·passed.​·Stop·the·session.​·The·other·actors·will
315 ····​else318 ············/​/​·terminate·as·soon·as·possible.​
316 ····{319 ············stop()​;​
317 ······/​/​·Put·the·actor·back·to·sleep.​320 ··········}
318 ······deadline-​>async_wait(321 ··········else
319 ··········boost:​:​bind(&tcp_session:​:​check_deadline,​322 ··········{
320 ··········shared_from_this()​,​·deadline)​)​;​323 ············/​/​·Put·the·actor·back·to·sleep.​
321 ····}324 ············check_deadline(deadli​ne)​;​
325 ··········​}
326 ········​})​;​
322 ··​}327 ··​}
323 328
324 ··​channel&·​channel_;​329 ··​channel&·​channel_;​
325 ··​tcp:​:​socket·​socket_;​330 ··​tcp:​:​socket·​socket_;​
326 ··​std:​:​string·​input_buffer_;​331 ··​std:​:​string·​input_buffer_;​
327 ··​steady_timer·​input_deadline_;​332 ··​steady_timer·​input_deadline_{socke​t_.​get_executor()​};​
328 ··​std:​:​deque<std:​:​string>·​output_queue_;​333 ··​std:​:​deque<std:​:​string>·​output_queue_;​
329 ··​steady_timer·​non_empty_output_queu​e_;​334 ··​steady_timer·​non_empty_output_queu​e_{socket_.​get_executor()​};​
330 ··​steady_timer·​output_deadline_;​335 ··​steady_timer·​output_deadline_{sock​et_.​get_executor()​};​
331 };​336 };​
332 337
333 typedef·boost:​:​shared_ptr<tcp_sessio​n>·​tcp_session_ptr;​338 typedef·​std:​:​shared_ptr<tcp_sessio​n>·​tcp_session_ptr;​
334 339
335 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​340 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
336 341
337 class·​udp_broadcaster342 class·​udp_broadcaster
338 ··​:​·​public·​subscriber343 ··​:​·​public·​subscriber
339 {344 {
340 public:​345 public:​
341 ··​udp_broadcaster(asio:​:​io_context&·​io_context,​346 ··​udp_broadcaster(asio:​:​io_context&·​io_context,​
342 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​347 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​
343 ····​:​·​socket_(io_context)​348 ····​:​·​socket_(io_context)​
344 ··​{349 ··​{
345 ····​socket_.​connect(broadcast_end​point)​;​350 ····​socket_.​connect(broadcast_end​point)​;​
346 ····​socket_.​set_option(udp:​:​socket:​:​broadcast(true)​)​;​351 ····​socket_.​set_option(udp:​:​socket:​:​broadcast(true)​)​;​
347 ··​}352 ··​}
348 353
349 private:​354 private:​
350 ··​void·​deliver(const·​std:​:​string&·​msg)​355 ··​void·​deliver(const·​std:​:​string&·​msg)​
351 ··​{356 ··​{
352 ····asio:​:​error_code·​ignored_ec;​357 ····​std:​:​error_code·​ignored_error;​
353 ····​socket_.​send(asio:​:​buffer(msg)​,​·​0,​·​ignored_ec)​;​358 ····​socket_.​send(asio:​:​buffer(msg)​,​·​0,​·​ignored_error)​;​
354 ··​}359 ··​}
355 360
356 ··​udp:​:​socket·​socket_;​361 ··​udp:​:​socket·​socket_;​
357 };​362 };​
358 363
359 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​364 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
360 365
361 class·​server366 class·​server
362 {367 {
363 public:​368 public:​
364 ··​server(asio:​:​io_context&·​io_context,​369 ··​server(asio:​:​io_context&·​io_context,​
365 ······​const·​tcp:​:​endpoint&·​listen_endpoint,​370 ······​const·​tcp:​:​endpoint&·​listen_endpoint,​
366 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​371 ······​const·​udp:​:​endpoint&·​broadcast_endpoint)​
367 ····​:​·​io_context_(io_contex​t)​,​372 ····​:​·​io_context_(io_contex​t)​,​
368 ······​acceptor_(io_context,​·​listen_endpoint)​373 ······​acceptor_(io_context,​·​listen_endpoint)​
369 ··​{374 ··​{
370 ····subscriber_ptr·bc(new·udp_broadcaster(io_co​ntext_,​·broadcast_endpoint)​)​;​375 ····​channel_.​join(
371 ····channel_.​join(bc)​;​376 ········std:​:​make_shared<udp_broad​caster>(
377 ··········​io_context_,​·​broadcast_endpoint)​)​;​
372 378
373 ····start_accept()​;​379 ····​accept()​;​
374 ··​}380 ··​}
375 381
376 ··void·start_accept()​382 private:​
377 ··{383 ··void·accept()​
378 ····tcp_session_ptr·new_session(new·tcp_session(io_contex​t_,​·channel_)​)​;​
379
380 ····acceptor_.​async_accept(new_sess​ion-​>socket()​,​
381 ········boost:​:​bind(&server:​:​handle_accept,​
382 ··········this,​·new_session,​·boost:​:​placeholders:​:​_1)​)​;​
383 ··}
384
385 ··void·handle_accept(tcp_ses​sion_ptr·session,​
386 ······const·asio:​:​error_code&·ec)​
387 ··​{384 ··​{
388 ····if·(!ec)​385 ····acceptor_.​async_accept(
389 ····{386 ········[this](const·std:​:​error_code&·error,​·tcp:​:​socket·socket)​
390 ······session-​>start()​;​387 ········{
391 ····}388 ··········if·(!error)​
389 ··········​{
390 ············​std:​:​make_shared<tcp_sessi​on>(std:​:​move(socket)​,​·​channel_)​-​>start()​;​
391 ··········​}
392 392
393 ····start_accept()​;​393 ··········accept()​;​
394 ········​})​;​
394 ··​}395 ··​}
395 396
396 private:​
397 ··​asio:​:​io_context&·​io_context_;​397 ··​asio:​:​io_context&·​io_context_;​
398 ··​tcp:​:​acceptor·​acceptor_;​398 ··​tcp:​:​acceptor·​acceptor_;​
399 ··​channel·​channel_;​399 ··​channel·​channel_;​
400 };​400 };​
401 401
402 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​402 /​/​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​-​
403 403
404 int·​main(int·​argc,​·​char*·​argv[])​404 int·​main(int·​argc,​·​char*·​argv[])​
405 {405 {
406 ··​try406 ··​try
407 ··​{407 ··​{
408 ····​using·​namespace·​std;​·​/​/​·​For·​atoi.​408 ····​using·​namespace·​std;​·​/​/​·​For·​atoi.​
409 409
410 ····​if·​(argc·​!=·​4)​410 ····​if·​(argc·​!=·​4)​
411 ····​{411 ····​{
412 ······​std:​:​cerr·​<<·​"Usage:​·​server·​<listen_port>·​<bcast_address>·​<bcast_port>\n";​412 ······​std:​:​cerr·​<<·​"Usage:​·​server·​<listen_port>·​<bcast_address>·​<bcast_port>\n";​
413 ······​return·​1;​413 ······​return·​1;​
414 ····​}414 ····​}
415 415
416 ····​asio:​:​io_context·​io_context;​416 ····​asio:​:​io_context·​io_context;​
417 417
418 ····​tcp:​:​endpoint·​listen_endpoint(tcp:​:​v4()​,​·​atoi(argv[1])​)​;​418 ····​tcp:​:​endpoint·​listen_endpoint(tcp:​:​v4()​,​·​atoi(argv[1])​)​;​
419 419
420 ····​udp:​:​endpoint·​broadcast_endpoint(420 ····​udp:​:​endpoint·​broadcast_endpoint(
421 ········​asio:​:​ip:​:​make_address(argv[2])​,​·​atoi(argv[3])​)​;​421 ········​asio:​:​ip:​:​make_address(argv[2])​,​·​atoi(argv[3])​)​;​
422 422
423 ····​server·​s(io_context,​·​listen_endpoint,​·​broadcast_endpoint)​;​423 ····​server·​s(io_context,​·​listen_endpoint,​·​broadcast_endpoint)​;​
424 424
425 ····​io_context.​run()​;​425 ····​io_context.​run()​;​
426 ··​}426 ··​}
427 ··​catch·​(std:​:​exception&·​e)​427 ··​catch·​(std:​:​exception&·​e)​
428 ··​{428 ··​{
429 ····​std:​:​cerr·​<<·​"Exception:​·​"·​<<·​e.​what()​·​<<·​"\n";​429 ····​std:​:​cerr·​<<·​"Exception:​·​"·​<<·​e.​what()​·​<<·​"\n";​
430 ··​}430 ··​}
431 431
432 ··​return·​0;​432 ··​return·​0;​
433 }433 }