Files
google-apis-rs/tokio/sync/index.html
2021-04-02 00:20:57 +08:00

421 lines
42 KiB
HTML
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta name="generator" content="rustdoc"><meta name="description" content="API documentation for the Rust `sync` mod in crate `tokio`."><meta name="keywords" content="rust, rustlang, rust-lang, sync"><title>tokio::sync - Rust</title><link rel="stylesheet" type="text/css" href="../../normalize.css"><link rel="stylesheet" type="text/css" href="../../rustdoc.css" id="mainThemeStyle"><link rel="stylesheet" type="text/css" href="../../light.css" id="themeStyle"><link rel="stylesheet" type="text/css" href="../../dark.css" disabled ><link rel="stylesheet" type="text/css" href="../../ayu.css" disabled ><script id="default-settings"></script><script src="../../storage.js"></script><noscript><link rel="stylesheet" href="../../noscript.css"></noscript><link rel="icon" type="image/svg+xml" href="../../favicon.svg">
<link rel="alternate icon" type="image/png" href="../../favicon-16x16.png">
<link rel="alternate icon" type="image/png" href="../../favicon-32x32.png"><style type="text/css">#crate-search{background-image:url("../../down-arrow.svg");}</style></head><body class="rustdoc mod"><!--[if lte IE 8]><div class="warning">This old browser is unsupported and will most likely display funky things.</div><![endif]--><nav class="sidebar"><div class="sidebar-menu">&#9776;</div><a href='../../tokio/index.html'><div class='logo-container rust-logo'><img src='../../rust-logo.png' alt='logo'></div></a><p class="location">Module sync</p><div class="sidebar-elems"><div class="block items"><ul><li><a href="#modules">Modules</a></li><li><a href="#structs">Structs</a></li><li><a href="#enums">Enums</a></li></ul></div><p class="location"><a href="../index.html">tokio</a></p><div id="sidebar-vars" data-name="sync" data-ty="mod" data-relpath="../"></div><script defer src="../sidebar-items.js"></script></div></nav><div class="theme-picker"><button id="theme-picker" aria-label="Pick another theme!" aria-haspopup="menu"><img src="../../brush.svg" width="18" alt="Pick another theme!"></button><div id="theme-choices" role="menu"></div></div><script src="../../theme.js"></script><nav class="sub"><form class="search-form"><div class="search-container"><div><select id="crate-search"><option value="All crates">All crates</option></select><input class="search-input" name="search" disabled autocomplete="off" spellcheck="false" placeholder="Click or press S to search, ? for more options…" type="search"></div><button type="button" class="help-button">?</button>
<a id="settings-menu" href="../../settings.html"><img src="../../wheel.svg" width="18" alt="Change settings"></a></div></form></nav><section id="main" class="content"><h1 class="fqn"><span class="in-band">Module <a href="../index.html">tokio</a>::<wbr><a class="mod" href="">sync</a></span><span class="out-of-band"><span id="render-detail"><a id="toggle-all-docs" href="javascript:void(0)" title="collapse all docs">[<span class="inner">&#x2212;</span>]</a></span><a class="srclink" href="../../src/tokio/sync/mod.rs.html#1-488" title="goto source code">[src]</a></span></h1><div class="docblock"><p>Synchronization primitives for use in asynchronous contexts.</p>
<p>Tokio programs tend to be organized as a set of <a href="../../tokio/task/index.html">tasks</a> where each task
operates independently and may be executed on separate physical threads. The
synchronization primitives provided in this module permit these independent
tasks to communicate together.</p>
<h1 id="message-passing" class="section-header"><a href="#message-passing">Message passing</a></h1>
<p>The most common form of synchronization in a Tokio program is message
passing. Two tasks operate independently and send messages to each other to
synchronize. Doing so has the advantage of avoiding shared state.</p>
<p>Message passing is implemented using channels. A channel supports sending a
message from one producer task to one or more consumer tasks. There are a
few flavors of channels provided by Tokio. Each channel flavor supports
different message passing patterns. When a channel supports multiple
producers, many separate tasks may <strong>send</strong> messages. When a channel
supports multiple consumers, many different separate tasks may <strong>receive</strong>
messages.</p>
<p>Tokio provides many different channel flavors as different message passing
patterns are best handled with different implementations.</p>
<h2 id="oneshot-channel" class="section-header"><a href="#oneshot-channel"><code>oneshot</code> channel</a></h2>
<p>The <a href="../../tokio/sync/oneshot/index.html"><code>oneshot</code> channel</a> supports sending a <strong>single</strong> value from a
single producer to a single consumer. This channel is usually used to send
the result of a computation to a waiter.</p>
<p><strong>Example:</strong> using a <a href="../../tokio/sync/oneshot/index.html"><code>oneshot</code> channel</a> to receive the result of a
computation.</p>
<div class="example-wrap"><pre class="rust rust-example-rendered">
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">sync</span>::<span class="ident">oneshot</span>;
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">some_computation</span>() <span class="op">-</span><span class="op">&gt;</span> <span class="ident">String</span> {
<span class="string">&quot;represents the result of the computation&quot;</span>.<span class="ident">to_string</span>()
}
<span class="attribute">#[<span class="ident">tokio</span>::<span class="ident">main</span>]</span>
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">oneshot</span>::<span class="ident">channel</span>();
<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">some_computation</span>().<span class="kw">await</span>;
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="ident">res</span>).<span class="ident">unwrap</span>();
});
<span class="comment">// Do other work while the computation is happening in the background</span>
<span class="comment">// Wait for the computation result</span>
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
}</pre></div>
<p>Note, if the task produces a computation result as its final
action before terminating, the <a href="../../tokio/task/struct.JoinHandle.html"><code>JoinHandle</code></a> can be used to
receive that value instead of allocating resources for the
<code>oneshot</code> channel. Awaiting on <a href="../../tokio/task/struct.JoinHandle.html"><code>JoinHandle</code></a> returns <code>Result</code>. If
the task panics, the <code>Joinhandle</code> yields <code>Err</code> with the panic
cause.</p>
<p><strong>Example:</strong></p>
<div class="example-wrap"><pre class="rust rust-example-rendered">
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">some_computation</span>() <span class="op">-</span><span class="op">&gt;</span> <span class="ident">String</span> {
<span class="string">&quot;the result of the computation&quot;</span>.<span class="ident">to_string</span>()
}
<span class="attribute">#[<span class="ident">tokio</span>::<span class="ident">main</span>]</span>
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
<span class="kw">let</span> <span class="ident">join_handle</span> <span class="op">=</span> <span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="ident">some_computation</span>().<span class="kw">await</span>
});
<span class="comment">// Do other work while the computation is happening in the background</span>
<span class="comment">// Wait for the computation result</span>
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">join_handle</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
}</pre></div>
<h2 id="mpsc-channel" class="section-header"><a href="#mpsc-channel"><code>mpsc</code> channel</a></h2>
<p>The <a href="../../tokio/sync/mpsc/index.html"><code>mpsc</code> channel</a> supports sending <strong>many</strong> values from <strong>many</strong>
producers to a single consumer. This channel is often used to send work to a
task or to receive the result of many computations.</p>
<p><strong>Example:</strong> using an mpsc to incrementally stream the results of a series
of computations.</p>
<div class="example-wrap"><pre class="rust rust-example-rendered">
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">sync</span>::<span class="ident">mpsc</span>;
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">some_computation</span>(<span class="ident">input</span>: <span class="ident">u32</span>) <span class="op">-</span><span class="op">&gt;</span> <span class="ident">String</span> {
<span class="macro">format</span><span class="macro">!</span>(<span class="string">&quot;the result of computation {}&quot;</span>, <span class="ident">input</span>)
}
<span class="attribute">#[<span class="ident">tokio</span>::<span class="ident">main</span>]</span>
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="kw-2">mut</span> <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">mpsc</span>::<span class="ident">channel</span>(<span class="number">100</span>);
<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="kw">for</span> <span class="ident">i</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">10</span> {
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">some_computation</span>(<span class="ident">i</span>).<span class="kw">await</span>;
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="ident">res</span>).<span class="kw">await</span>.<span class="ident">unwrap</span>();
}
});
<span class="kw">while</span> <span class="kw">let</span> <span class="prelude-val">Some</span>(<span class="ident">res</span>) <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">recv</span>().<span class="kw">await</span> {
<span class="macro">println</span><span class="macro">!</span>(<span class="string">&quot;got = {}&quot;</span>, <span class="ident">res</span>);
}
}</pre></div>
<p>The argument to <code>mpsc::channel</code> is the channel capacity. This is the maximum
number of values that can be stored in the channel pending receipt at any
given time. Properly setting this value is key in implementing robust
programs as the channel capacity plays a critical part in handling back
pressure.</p>
<p>A common concurrency pattern for resource management is to spawn a task
dedicated to managing that resource and using message passing between other
tasks to interact with the resource. The resource may be anything that may
not be concurrently used. Some examples include a socket and program state.
For example, if multiple tasks need to send data over a single socket, spawn
a task to manage the socket and use a channel to synchronize.</p>
<p><strong>Example:</strong> sending data from many tasks over a single socket using message
passing.</p>
<div class="example-wrap"><pre class="rust rust-example-rendered">
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">io</span>::{<span class="self">self</span>, <span class="ident">AsyncWriteExt</span>};
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">net</span>::<span class="ident">TcpStream</span>;
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">sync</span>::<span class="ident">mpsc</span>;
<span class="attribute">#[<span class="ident">tokio</span>::<span class="ident">main</span>]</span>
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() <span class="op">-</span><span class="op">&gt;</span> <span class="ident">io</span>::<span class="prelude-ty">Result</span><span class="op">&lt;</span>()<span class="op">&gt;</span> {
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">socket</span> <span class="op">=</span> <span class="ident">TcpStream</span>::<span class="ident">connect</span>(<span class="string">&quot;www.example.com:1234&quot;</span>).<span class="kw">await</span><span class="question-mark">?</span>;
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="kw-2">mut</span> <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">mpsc</span>::<span class="ident">channel</span>(<span class="number">100</span>);
<span class="kw">for</span> <span class="kw">_</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">10</span> {
<span class="comment">// Each task needs its own `tx` handle. This is done by cloning the</span>
<span class="comment">// original handle.</span>
<span class="kw">let</span> <span class="ident">tx</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">clone</span>();
<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="kw-2">&amp;</span><span class="string">b&quot;data to write&quot;</span>[..]).<span class="kw">await</span>.<span class="ident">unwrap</span>();
});
}
<span class="comment">// The `rx` half of the channel returns `None` once **all** `tx` clones</span>
<span class="comment">// drop. To ensure `None` is returned, drop the handle owned by the</span>
<span class="comment">// current task. If this `tx` handle is not dropped, there will always</span>
<span class="comment">// be a single outstanding `tx` handle.</span>
<span class="ident">drop</span>(<span class="ident">tx</span>);
<span class="kw">while</span> <span class="kw">let</span> <span class="prelude-val">Some</span>(<span class="ident">res</span>) <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">recv</span>().<span class="kw">await</span> {
<span class="ident">socket</span>.<span class="ident">write_all</span>(<span class="ident">res</span>).<span class="kw">await</span><span class="question-mark">?</span>;
}
<span class="prelude-val">Ok</span>(())
}</pre></div>
<p>The <a href="../../tokio/sync/mpsc/index.html"><code>mpsc</code></a> and <a href="../../tokio/sync/oneshot/index.html"><code>oneshot</code></a> channels can be combined to
provide a request / response type synchronization pattern with a shared
resource. A task is spawned to synchronize a resource and waits on commands
received on a <a href="../../tokio/sync/mpsc/index.html"><code>mpsc</code></a> channel. Each command includes a
<a href="../../tokio/sync/oneshot/index.html"><code>oneshot</code></a> <code>Sender</code> on which the result of the command is sent.</p>
<p><strong>Example:</strong> use a task to synchronize a <code>u64</code> counter. Each task sends an
&quot;fetch and increment&quot; command. The counter value <strong>before</strong> the increment is
sent over the provided <code>oneshot</code> channel.</p>
<div class="example-wrap"><pre class="rust rust-example-rendered">
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">sync</span>::{<span class="ident">oneshot</span>, <span class="ident">mpsc</span>};
<span class="kw">use</span> <span class="ident">Command</span>::<span class="ident">Increment</span>;
<span class="kw">enum</span> <span class="ident">Command</span> {
<span class="ident">Increment</span>,
<span class="comment">// Other commands can be added here</span>
}
<span class="attribute">#[<span class="ident">tokio</span>::<span class="ident">main</span>]</span>
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
<span class="kw">let</span> (<span class="ident">cmd_tx</span>, <span class="kw-2">mut</span> <span class="ident">cmd_rx</span>) <span class="op">=</span> <span class="ident">mpsc</span>::<span class="ident">channel</span>::<span class="op">&lt;</span>(<span class="ident">Command</span>, <span class="ident">oneshot</span>::<span class="ident">Sender</span><span class="op">&lt;</span><span class="ident">u64</span><span class="op">&gt;</span>)<span class="op">&gt;</span>(<span class="number">100</span>);
<span class="comment">// Spawn a task to manage the counter</span>
<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">counter</span>: <span class="ident">u64</span> <span class="op">=</span> <span class="number">0</span>;
<span class="kw">while</span> <span class="kw">let</span> <span class="prelude-val">Some</span>((<span class="ident">cmd</span>, <span class="ident">response</span>)) <span class="op">=</span> <span class="ident">cmd_rx</span>.<span class="ident">recv</span>().<span class="kw">await</span> {
<span class="kw">match</span> <span class="ident">cmd</span> {
<span class="ident">Increment</span> <span class="op">=</span><span class="op">&gt;</span> {
<span class="kw">let</span> <span class="ident">prev</span> <span class="op">=</span> <span class="ident">counter</span>;
<span class="ident">counter</span> <span class="op">+</span><span class="op">=</span> <span class="number">1</span>;
<span class="ident">response</span>.<span class="ident">send</span>(<span class="ident">prev</span>).<span class="ident">unwrap</span>();
}
}
}
});
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">join_handles</span> <span class="op">=</span> <span class="macro">vec</span><span class="macro">!</span>[];
<span class="comment">// Spawn tasks that will send the increment command.</span>
<span class="kw">for</span> <span class="kw">_</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">10</span> {
<span class="kw">let</span> <span class="ident">cmd_tx</span> <span class="op">=</span> <span class="ident">cmd_tx</span>.<span class="ident">clone</span>();
<span class="ident">join_handles</span>.<span class="ident">push</span>(<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="kw">let</span> (<span class="ident">resp_tx</span>, <span class="ident">resp_rx</span>) <span class="op">=</span> <span class="ident">oneshot</span>::<span class="ident">channel</span>();
<span class="ident">cmd_tx</span>.<span class="ident">send</span>((<span class="ident">Increment</span>, <span class="ident">resp_tx</span>)).<span class="kw">await</span>.<span class="ident">ok</span>().<span class="ident">unwrap</span>();
<span class="kw">let</span> <span class="ident">res</span> <span class="op">=</span> <span class="ident">resp_rx</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
<span class="macro">println</span><span class="macro">!</span>(<span class="string">&quot;previous value = {}&quot;</span>, <span class="ident">res</span>);
}));
}
<span class="comment">// Wait for all tasks to complete</span>
<span class="kw">for</span> <span class="ident">join_handle</span> <span class="kw">in</span> <span class="ident">join_handles</span>.<span class="ident">drain</span>(..) {
<span class="ident">join_handle</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
}
}</pre></div>
<h2 id="broadcast-channel" class="section-header"><a href="#broadcast-channel"><code>broadcast</code> channel</a></h2>
<p>The <a href="../../tokio/sync/broadcast/index.html"><code>broadcast</code> channel</a> supports sending <strong>many</strong> values from
<strong>many</strong> producers to <strong>many</strong> consumers. Each consumer will receive
<strong>each</strong> value. This channel can be used to implement &quot;fan out&quot; style
patterns common with pub / sub or &quot;chat&quot; systems.</p>
<p>This channel tends to be used less often than <code>oneshot</code> and <code>mpsc</code> but still
has its use cases.</p>
<p>Basic usage</p>
<div class="example-wrap"><pre class="rust rust-example-rendered">
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">sync</span>::<span class="ident">broadcast</span>;
<span class="attribute">#[<span class="ident">tokio</span>::<span class="ident">main</span>]</span>
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="kw-2">mut</span> <span class="ident">rx1</span>) <span class="op">=</span> <span class="ident">broadcast</span>::<span class="ident">channel</span>(<span class="number">16</span>);
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">rx2</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">subscribe</span>();
<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="macro">assert_eq</span><span class="macro">!</span>(<span class="ident">rx1</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">10</span>);
<span class="macro">assert_eq</span><span class="macro">!</span>(<span class="ident">rx1</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">20</span>);
});
<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="macro">assert_eq</span><span class="macro">!</span>(<span class="ident">rx2</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">10</span>);
<span class="macro">assert_eq</span><span class="macro">!</span>(<span class="ident">rx2</span>.<span class="ident">recv</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>(), <span class="number">20</span>);
});
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="number">10</span>).<span class="ident">unwrap</span>();
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="number">20</span>).<span class="ident">unwrap</span>();
}</pre></div>
<h2 id="watch-channel" class="section-header"><a href="#watch-channel"><code>watch</code> channel</a></h2>
<p>The <a href="../../tokio/sync/watch/index.html"><code>watch</code> channel</a> supports sending <strong>many</strong> values from a <strong>single</strong>
producer to <strong>many</strong> consumers. However, only the <strong>most recent</strong> value is
stored in the channel. Consumers are notified when a new value is sent, but
there is no guarantee that consumers will see <strong>all</strong> values.</p>
<p>The <a href="../../tokio/sync/watch/index.html"><code>watch</code> channel</a> is similar to a <a href="../../tokio/sync/broadcast/index.html"><code>broadcast</code> channel</a> with capacity 1.</p>
<p>Use cases for the <a href="../../tokio/sync/watch/index.html"><code>watch</code> channel</a> include broadcasting configuration
changes or signalling program state changes, such as transitioning to
shutdown.</p>
<p><strong>Example:</strong> use a <a href="../../tokio/sync/watch/index.html"><code>watch</code> channel</a> to notify tasks of configuration
changes. In this example, a configuration file is checked periodically. When
the file changes, the configuration changes are signalled to consumers.</p>
<div class="example-wrap"><pre class="rust rust-example-rendered">
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">sync</span>::<span class="ident">watch</span>;
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">time</span>::{<span class="self">self</span>, <span class="ident">Duration</span>, <span class="ident">Instant</span>};
<span class="kw">use</span> <span class="ident">std</span>::<span class="ident">io</span>;
<span class="attribute">#[<span class="ident">derive</span>(<span class="ident">Debug</span>, <span class="ident">Clone</span>, <span class="ident">Eq</span>, <span class="ident">PartialEq</span>)]</span>
<span class="kw">struct</span> <span class="ident">Config</span> {
<span class="ident">timeout</span>: <span class="ident">Duration</span>,
}
<span class="kw">impl</span> <span class="ident">Config</span> {
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">load_from_file</span>() <span class="op">-</span><span class="op">&gt;</span> <span class="ident">io</span>::<span class="prelude-ty">Result</span><span class="op">&lt;</span><span class="ident">Config</span><span class="op">&gt;</span> {
<span class="comment">// file loading and deserialization logic here</span>
}
}
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">my_async_operation</span>() {
<span class="comment">// Do something here</span>
}
<span class="attribute">#[<span class="ident">tokio</span>::<span class="ident">main</span>]</span>
<span class="kw">async</span> <span class="kw">fn</span> <span class="ident">main</span>() {
<span class="comment">// Load initial configuration value</span>
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">config</span> <span class="op">=</span> <span class="ident">Config</span>::<span class="ident">load_from_file</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>();
<span class="comment">// Create the watch channel, initialized with the loaded configuration</span>
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">watch</span>::<span class="ident">channel</span>(<span class="ident">config</span>.<span class="ident">clone</span>());
<span class="comment">// Spawn a task to monitor the file.</span>
<span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="kw">loop</span> {
<span class="comment">// Wait 10 seconds between checks</span>
<span class="ident">time</span>::<span class="ident">sleep</span>(<span class="ident">Duration</span>::<span class="ident">from_secs</span>(<span class="number">10</span>)).<span class="kw">await</span>;
<span class="comment">// Load the configuration file</span>
<span class="kw">let</span> <span class="ident">new_config</span> <span class="op">=</span> <span class="ident">Config</span>::<span class="ident">load_from_file</span>().<span class="kw">await</span>.<span class="ident">unwrap</span>();
<span class="comment">// If the configuration changed, send the new config value</span>
<span class="comment">// on the watch channel.</span>
<span class="kw">if</span> <span class="ident">new_config</span> <span class="op">!</span><span class="op">=</span> <span class="ident">config</span> {
<span class="ident">tx</span>.<span class="ident">send</span>(<span class="ident">new_config</span>.<span class="ident">clone</span>()).<span class="ident">unwrap</span>();
<span class="ident">config</span> <span class="op">=</span> <span class="ident">new_config</span>;
}
}
});
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">handles</span> <span class="op">=</span> <span class="macro">vec</span><span class="macro">!</span>[];
<span class="comment">// Spawn tasks that runs the async operation for at most `timeout`. If</span>
<span class="comment">// the timeout elapses, restart the operation.</span>
<span class="comment">//</span>
<span class="comment">// The task simultaneously watches the `Config` for changes. When the</span>
<span class="comment">// timeout duration changes, the timeout is updated without restarting</span>
<span class="comment">// the in-flight operation.</span>
<span class="kw">for</span> <span class="kw">_</span> <span class="kw">in</span> <span class="number">0</span>..<span class="number">5</span> {
<span class="comment">// Clone a config watch handle for use in this task</span>
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">rx</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">clone</span>();
<span class="kw">let</span> <span class="ident">handle</span> <span class="op">=</span> <span class="ident">tokio</span>::<span class="ident">spawn</span>(<span class="kw">async</span> <span class="kw">move</span> {
<span class="comment">// Start the initial operation and pin the future to the stack.</span>
<span class="comment">// Pinning to the stack is required to resume the operation</span>
<span class="comment">// across multiple calls to `select!`</span>
<span class="kw">let</span> <span class="ident">op</span> <span class="op">=</span> <span class="ident">my_async_operation</span>();
<span class="ident">tokio</span>::<span class="macro">pin</span><span class="macro">!</span>(<span class="ident">op</span>);
<span class="comment">// Get the initial config value</span>
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">conf</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">borrow</span>().<span class="ident">clone</span>();
<span class="kw">let</span> <span class="kw-2">mut</span> <span class="ident">op_start</span> <span class="op">=</span> <span class="ident">Instant</span>::<span class="ident">now</span>();
<span class="kw">let</span> <span class="ident">sleep</span> <span class="op">=</span> <span class="ident">time</span>::<span class="ident">sleep_until</span>(<span class="ident">op_start</span> <span class="op">+</span> <span class="ident">conf</span>.<span class="ident">timeout</span>);
<span class="ident">tokio</span>::<span class="macro">pin</span><span class="macro">!</span>(<span class="ident">sleep</span>);
<span class="kw">loop</span> {
<span class="ident">tokio</span>::<span class="macro">select</span><span class="macro">!</span> {
<span class="kw">_</span> <span class="op">=</span> <span class="kw-2">&amp;</span><span class="kw-2">mut</span> <span class="ident">sleep</span> <span class="op">=</span><span class="op">&gt;</span> {
<span class="comment">// The operation elapsed. Restart it</span>
<span class="ident">op</span>.<span class="ident">set</span>(<span class="ident">my_async_operation</span>());
<span class="comment">// Track the new start time</span>
<span class="ident">op_start</span> <span class="op">=</span> <span class="ident">Instant</span>::<span class="ident">now</span>();
<span class="comment">// Restart the timeout</span>
<span class="ident">sleep</span>.<span class="ident">set</span>(<span class="ident">time</span>::<span class="ident">sleep_until</span>(<span class="ident">op_start</span> <span class="op">+</span> <span class="ident">conf</span>.<span class="ident">timeout</span>));
}
<span class="kw">_</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">changed</span>() <span class="op">=</span><span class="op">&gt;</span> {
<span class="ident">conf</span> <span class="op">=</span> <span class="ident">rx</span>.<span class="ident">borrow</span>().<span class="ident">clone</span>();
<span class="comment">// The configuration has been updated. Update the</span>
<span class="comment">// `sleep` using the new `timeout` value.</span>
<span class="ident">sleep</span>.<span class="ident">as_mut</span>().<span class="ident">reset</span>(<span class="ident">op_start</span> <span class="op">+</span> <span class="ident">conf</span>.<span class="ident">timeout</span>);
}
<span class="kw">_</span> <span class="op">=</span> <span class="kw-2">&amp;</span><span class="kw-2">mut</span> <span class="ident">op</span> <span class="op">=</span><span class="op">&gt;</span> {
<span class="comment">// The operation completed!</span>
<span class="kw">return</span>
}
}
}
});
<span class="ident">handles</span>.<span class="ident">push</span>(<span class="ident">handle</span>);
}
<span class="kw">for</span> <span class="ident">handle</span> <span class="kw">in</span> <span class="ident">handles</span>.<span class="ident">drain</span>(..) {
<span class="ident">handle</span>.<span class="kw">await</span>.<span class="ident">unwrap</span>();
}
}</pre></div>
<h1 id="state-synchronization" class="section-header"><a href="#state-synchronization">State synchronization</a></h1>
<p>The remaining synchronization primitives focus on synchronizing state.
These are asynchronous equivalents to versions provided by <code>std</code>. They
operate in a similar way as their <code>std</code> counterparts but will wait
asynchronously instead of blocking the thread.</p>
<ul>
<li>
<p><a href="../../tokio/sync/struct.Barrier.html"><code>Barrier</code></a> Ensures multiple tasks will wait for each other to
reach a point in the program, before continuing execution all together.</p>
</li>
<li>
<p><a href="../../tokio/sync/struct.Mutex.html"><code>Mutex</code></a> Mutual Exclusion mechanism, which ensures that at most
one thread at a time is able to access some data.</p>
</li>
<li>
<p><a href="../../tokio/sync/struct.Notify.html"><code>Notify</code></a> Basic task notification. <code>Notify</code> supports notifying a
receiving task without sending data. In this case, the task wakes up and
resumes processing.</p>
</li>
<li>
<p><a href="../../tokio/sync/struct.RwLock.html"><code>RwLock</code></a> Provides a mutual exclusion mechanism which allows
multiple readers at the same time, while allowing only one writer at a
time. In some cases, this can be more efficient than a mutex.</p>
</li>
<li>
<p><a href="../../tokio/sync/struct.Semaphore.html"><code>Semaphore</code></a> Limits the amount of concurrency. A semaphore
holds a number of permits, which tasks may request in order to enter a
critical section. Semaphores are useful for implementing limiting or
bounding of any kind.</p>
</li>
</ul>
</div><h2 id="modules" class="section-header"><a href="#modules">Modules</a></h2>
<table><tr class="module-item"><td><a class="mod" href="broadcast/index.html" title="tokio::sync::broadcast mod">broadcast</a></td><td class="docblock-short"><p>A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
all consumers.</p>
</td></tr><tr class="module-item"><td><a class="mod" href="mpsc/index.html" title="tokio::sync::mpsc mod">mpsc</a></td><td class="docblock-short"><p>A multi-producer, single-consumer queue for sending values between
asynchronous tasks.</p>
</td></tr><tr class="module-item"><td><a class="mod" href="oneshot/index.html" title="tokio::sync::oneshot mod">oneshot</a></td><td class="docblock-short"><p>A one-shot channel is used for sending a single message between
asynchronous tasks. The <a href="../../tokio/sync/oneshot/fn.channel.html" title="channel"><code>channel</code></a> function is used to create a
<a href="../../tokio/sync/oneshot/struct.Sender.html" title="Sender"><code>Sender</code></a> and <a href="../../tokio/sync/oneshot/struct.Receiver.html" title="Receiver"><code>Receiver</code></a> handle pair that form the channel.</p>
</td></tr><tr class="module-item"><td><a class="mod" href="watch/index.html" title="tokio::sync::watch mod">watch</a></td><td class="docblock-short"><p>A single-producer, multi-consumer channel that only retains the <em>last</em> sent
value.</p>
</td></tr></table><h2 id="structs" class="section-header"><a href="#structs">Structs</a></h2>
<table><tr class="module-item"><td><a class="struct" href="struct.AcquireError.html" title="tokio::sync::AcquireError struct">AcquireError</a></td><td class="docblock-short"><p>Error returned from the <a href="../../tokio/sync/struct.Semaphore.html#method.acquire"><code>Semaphore::acquire</code></a> function.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.Barrier.html" title="tokio::sync::Barrier struct">Barrier</a></td><td class="docblock-short"><p>A barrier enables multiple threads to synchronize the beginning of some computation.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.BarrierWaitResult.html" title="tokio::sync::BarrierWaitResult struct">BarrierWaitResult</a></td><td class="docblock-short"><p>A <code>BarrierWaitResult</code> is returned by <code>wait</code> when all threads in the <code>Barrier</code> have rendezvoused.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.Mutex.html" title="tokio::sync::Mutex struct">Mutex</a></td><td class="docblock-short"><p>An asynchronous <code>Mutex</code>-like type.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.MutexGuard.html" title="tokio::sync::MutexGuard struct">MutexGuard</a></td><td class="docblock-short"><p>A handle to a held <code>Mutex</code>.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.Notify.html" title="tokio::sync::Notify struct">Notify</a></td><td class="docblock-short"><p>Notify a single task to wake up.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.OwnedMutexGuard.html" title="tokio::sync::OwnedMutexGuard struct">OwnedMutexGuard</a></td><td class="docblock-short"><p>An owned handle to a held <code>Mutex</code>.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.OwnedSemaphorePermit.html" title="tokio::sync::OwnedSemaphorePermit struct">OwnedSemaphorePermit</a></td><td class="docblock-short"><p>An owned permit from the semaphore.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.RwLock.html" title="tokio::sync::RwLock struct">RwLock</a></td><td class="docblock-short"><p>An asynchronous reader-writer lock.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.RwLockMappedWriteGuard.html" title="tokio::sync::RwLockMappedWriteGuard struct">RwLockMappedWriteGuard</a></td><td class="docblock-short"><p>RAII structure used to release the exclusive write access of a lock when
dropped.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.RwLockReadGuard.html" title="tokio::sync::RwLockReadGuard struct">RwLockReadGuard</a></td><td class="docblock-short"><p>RAII structure used to release the shared read access of a lock when
dropped.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.RwLockWriteGuard.html" title="tokio::sync::RwLockWriteGuard struct">RwLockWriteGuard</a></td><td class="docblock-short"><p>RAII structure used to release the exclusive write access of a lock when
dropped.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.Semaphore.html" title="tokio::sync::Semaphore struct">Semaphore</a></td><td class="docblock-short"><p>Counting semaphore performing asynchronous permit acquisition.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.SemaphorePermit.html" title="tokio::sync::SemaphorePermit struct">SemaphorePermit</a></td><td class="docblock-short"><p>A permit from the semaphore.</p>
</td></tr><tr class="module-item"><td><a class="struct" href="struct.TryLockError.html" title="tokio::sync::TryLockError struct">TryLockError</a></td><td class="docblock-short"><p>Error returned from the <a href="../../tokio/sync/struct.Mutex.html#method.try_lock"><code>Mutex::try_lock</code></a>, <a href="../../tokio/sync/struct.RwLock.html#method.try_read"><code>RwLock::try_read</code></a> and
<a href="../../tokio/sync/struct.RwLock.html#method.try_write"><code>RwLock::try_write</code></a> functions.</p>
</td></tr></table><h2 id="enums" class="section-header"><a href="#enums">Enums</a></h2>
<table><tr class="module-item"><td><a class="enum" href="enum.TryAcquireError.html" title="tokio::sync::TryAcquireError enum">TryAcquireError</a></td><td class="docblock-short"><p>Error returned from the <a href="../../tokio/sync/struct.Semaphore.html#method.try_acquire"><code>Semaphore::try_acquire</code></a> function.</p>
</td></tr></table></section><section id="search" class="content hidden"></section><section class="footer"></section><div id="rustdoc-vars" data-root-path="../../" data-current-crate="tokio"></div>
<script src="../../main.js"></script><script defer src="../../search-index.js"></script></body></html>