Remove serde_transport::tcp::connect_with.

Instead, serde_transport::tcp::connect returns a future named Connect
that has methods to directly access the framing config. This is
consistent with how serde_transport::tcp::listen returns a future with
methods to access the framing config. In addition to this consistency,
it reduces the API surface and provides a simpler user transition from
"zero config" to "some config".
This commit is contained in:
Tim Kuehn
2020-08-19 17:51:53 -07:00
parent 3207affb4a
commit 2264ebecfc
2 changed files with 46 additions and 26 deletions

View File

@@ -7,9 +7,7 @@
use clap::{App, Arg};
use std::{io, net::SocketAddr};
use tarpc::{client, context};
use tokio::net::TcpStream;
use tokio_serde::formats::Json;
use tokio_util::codec::LengthDelimitedCodec;
#[tokio::main]
async fn main() -> io::Result<()> {
@@ -45,17 +43,13 @@ async fn main() -> io::Result<()> {
let name = flags.value_of("name").unwrap().into();
let conn = TcpStream::connect(server_addr).await?;
let transport = tarpc::serde_transport::new(
LengthDelimitedCodec::builder()
.max_frame_length(4294967296)
.new_framed(conn),
Json::default(),
);
let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
transport.config_mut().max_frame_length(4294967296);
// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
// config and any Transport as input.
let mut client = service::WorldClient::new(client::Config::default(), transport).spawn()?;
let mut client =
service::WorldClient::new(client::Config::default(), transport.await?).spawn()?;
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context

View File

@@ -151,36 +151,62 @@ pub mod tcp {
}
}
/// Connects to `addr`, wrapping the connection in a TCP transport.
pub async fn connect_with<A, Item, SinkItem, Codec>(
addr: A,
codec: impl FnOnce() -> Codec,
config: LengthDelimitedCodec,
) -> io::Result<Transport<TcpStream, Item, SinkItem, Codec>>
/// A connection Future that also exposes the length-delimited framing config.
#[pin_project]
pub struct Connect<T, Item, SinkItem, CodecFn> {
#[pin]
inner: T,
codec_fn: CodecFn,
config: length_delimited::Builder,
ghost: PhantomData<(fn(SinkItem), fn() -> Item)>,
}
impl<T, Item, SinkItem, Codec, CodecFn> Future for Connect<T, Item, SinkItem, CodecFn>
where
A: ToSocketAddrs,
T: Future<Output = io::Result<TcpStream>>,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
Ok(new(
Framed::new(TcpStream::connect(addr).await?, config),
codec(),
))
type Output = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let io = ready!(self.as_mut().project().inner.poll(cx))?;
Poll::Ready(Ok(new(self.config.new_framed(io), (self.codec_fn)())))
}
}
impl<T, Item, SinkItem, CodecFn> Connect<T, Item, SinkItem, CodecFn> {
/// Returns an immutable reference to the length-delimited codec's config.
pub fn config(&self) -> &length_delimited::Builder {
&self.config
}
/// Returns a mutable reference to the length-delimited codec's config.
pub fn config_mut(&mut self) -> &mut length_delimited::Builder {
&mut self.config
}
}
/// Connects to `addr`, wrapping the connection in a TCP transport.
pub async fn connect<A, Item, SinkItem, Codec>(
pub fn connect<A, Item, SinkItem, Codec, CodecFn>(
addr: A,
codec: impl FnOnce() -> Codec,
) -> io::Result<Transport<TcpStream, Item, SinkItem, Codec>>
codec_fn: CodecFn,
) -> Connect<impl Future<Output = io::Result<TcpStream>>, Item, SinkItem, CodecFn>
where
A: ToSocketAddrs,
Item: for<'de> Deserialize<'de>,
SinkItem: Serialize,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
connect_with(addr, codec, LengthDelimitedCodec::new()).await
Connect {
inner: TcpStream::connect(addr),
codec_fn,
config: LengthDelimitedCodec::builder(),
ghost: PhantomData,
}
}
/// Listens on `addr`, wrapping accepted connections in TCP transports.
@@ -213,7 +239,7 @@ pub mod tcp {
local_addr: SocketAddr,
codec_fn: CodecFn,
config: length_delimited::Builder,
ghost: PhantomData<(Item, SinkItem, Codec)>,
ghost: PhantomData<(fn() -> Item, fn(SinkItem), Codec)>,
}
impl<Item, SinkItem, Codec, CodecFn> Incoming<Item, SinkItem, Codec, CodecFn> {