feat: Allow to create tarpc on existing TCP listener

This commit is contained in:
Boris Sabatier
2023-12-31 08:39:40 +01:00
committed by Tim
parent b92dd154bc
commit d62706e62c

View File

@@ -210,7 +210,19 @@ pub mod tcp {
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
let listener = TcpListener::bind(addr).await?;
listen_on(TcpListener::bind(addr).await?, codec_fn).await
}
/// Wrap accepted connections from `listener` in TCP transports.
pub async fn listen_on<Item, SinkItem, Codec, CodecFn>(
listener: TcpListener,
codec_fn: CodecFn,
) -> io::Result<Incoming<Item, SinkItem, Codec, CodecFn>>
where
Item: for<'de> Deserialize<'de>,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
let local_addr = listener.local_addr()?;
Ok(Incoming {
listener,
@@ -662,6 +674,26 @@ mod tests {
Ok(())
}
#[cfg(tcp)]
#[tokio::test]
async fn tcp_on_existing_transport() -> io::Result<()> {
use super::tcp;
let transport = TcpListener::bind("0.0.0.0:0").await?;
let mut listener = tcp::listen_on(transport, SymmetricalJson::<String>::default).await?;
let addr = listener.local_addr();
tokio::spawn(async move {
let mut transport = listener.next().await.unwrap().unwrap();
let message = transport.next().await.unwrap().unwrap();
transport.send(message).await.unwrap();
});
let mut transport = tcp::connect(addr, SymmetricalJson::<String>::default).await?;
transport.send(String::from("test")).await?;
assert_matches!(transport.next().await, Some(Ok(s)) if s == "test");
assert_matches!(transport.next().await, None);
Ok(())
}
#[cfg(all(unix, feature = "unix"))]
#[tokio::test]
async fn uds() -> io::Result<()> {