mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-08 12:32:11 +01:00
Remove bad mem::forget usage.
mem::forget is a dangerous tool, and it was being used carelessly for things that have safer alternatives. There was at least one bug where a cloned tokio::sync::mpsc::UnboundedSender used for request cancellation was being leaked on every successful server response, so its refcounts were never decremented. Because these are atomic refcounts, they'll wrap around rather than overflow when reaching the maximum value, so I don't believe this could lead to panics or unsoundness.
This commit is contained in:
@@ -18,7 +18,7 @@ use pin_project::pin_project;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
error::Error,
|
||||
fmt, mem,
|
||||
fmt,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
@@ -147,6 +147,7 @@ impl<Req, Resp> Channel<Req, Resp> {
|
||||
response: &mut response,
|
||||
request_id,
|
||||
cancellation: &self.cancellation,
|
||||
cancel: true,
|
||||
};
|
||||
self.to_dispatch
|
||||
.send(DispatchRequest {
|
||||
@@ -168,6 +169,7 @@ struct ResponseGuard<'a, Resp> {
|
||||
response: &'a mut oneshot::Receiver<Result<Response<Resp>, DeadlineExceededError>>,
|
||||
cancellation: &'a RequestCancellation,
|
||||
request_id: u64,
|
||||
cancel: bool,
|
||||
}
|
||||
|
||||
/// An error that can occur in the processing of an RPC. This is not request-specific errors but
|
||||
@@ -196,7 +198,7 @@ impl<Resp> ResponseGuard<'_, Resp> {
|
||||
async fn response(mut self) -> Result<Resp, RpcError> {
|
||||
let response = (&mut self.response).await;
|
||||
// Cancel drop logic once a response has been received.
|
||||
mem::forget(self);
|
||||
self.cancel = false;
|
||||
match response {
|
||||
Ok(resp) => Ok(resp?.message?),
|
||||
Err(oneshot::error::RecvError { .. }) => {
|
||||
@@ -223,7 +225,9 @@ impl<Resp> Drop for ResponseGuard<'_, Resp> {
|
||||
// dispatch task misses an early-arriving cancellation message, then it will see the
|
||||
// receiver as closed.
|
||||
self.response.close();
|
||||
self.cancellation.cancel(self.request_id);
|
||||
if self.cancel {
|
||||
self.cancellation.cancel(self.request_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -654,6 +658,7 @@ mod tests {
|
||||
response: &mut response,
|
||||
cancellation: &cancellation,
|
||||
request_id: 3,
|
||||
cancel: true,
|
||||
});
|
||||
// resp's drop() is run, which should send a cancel message.
|
||||
let cx = &mut Context::from_waker(noop_waker_ref());
|
||||
@@ -674,6 +679,7 @@ mod tests {
|
||||
response: &mut response,
|
||||
cancellation: &cancellation,
|
||||
request_id: 3,
|
||||
cancel: true,
|
||||
}
|
||||
.response()
|
||||
.await
|
||||
@@ -830,6 +836,7 @@ mod tests {
|
||||
response,
|
||||
cancellation: &channel.cancellation,
|
||||
request_id,
|
||||
cancel: true,
|
||||
};
|
||||
channel.to_dispatch.send(request).await.unwrap();
|
||||
response_guard
|
||||
|
||||
@@ -21,14 +21,7 @@ use futures::{
|
||||
};
|
||||
use in_flight_requests::{AlreadyExistsError, InFlightRequests};
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
error::Error,
|
||||
fmt,
|
||||
marker::PhantomData,
|
||||
mem::{self, ManuallyDrop},
|
||||
pin::Pin,
|
||||
};
|
||||
use std::{convert::TryFrom, error::Error, fmt, marker::PhantomData, pin::Pin};
|
||||
use tracing::{info_span, instrument::Instrument, Span};
|
||||
|
||||
mod in_flight_requests;
|
||||
@@ -208,10 +201,11 @@ where
|
||||
Ok(TrackedRequest {
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard: ManuallyDrop::new(ResponseGuard {
|
||||
response_guard: ResponseGuard {
|
||||
request_id: request.id,
|
||||
request_cancellation: self.request_cancellation.clone(),
|
||||
}),
|
||||
cancel: false,
|
||||
},
|
||||
request,
|
||||
})
|
||||
}
|
||||
@@ -240,7 +234,7 @@ pub struct TrackedRequest<Req> {
|
||||
/// A span representing the server processing of this request.
|
||||
pub span: Span,
|
||||
/// An inert response guard. Becomes active in an InFlightRequest.
|
||||
pub response_guard: ManuallyDrop<ResponseGuard>,
|
||||
pub response_guard: ResponseGuard,
|
||||
}
|
||||
|
||||
/// The server end of an open connection with a client, receiving requests from, and sending
|
||||
@@ -581,13 +575,15 @@ where
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard,
|
||||
mut response_guard,
|
||||
}| {
|
||||
// The response guard becomes active once in an InFlightRequest.
|
||||
response_guard.cancel = true;
|
||||
InFlightRequest {
|
||||
request,
|
||||
abort_registration,
|
||||
span,
|
||||
response_guard: ManuallyDrop::into_inner(response_guard),
|
||||
response_guard,
|
||||
response_tx: self.responses_tx.clone(),
|
||||
}
|
||||
},
|
||||
@@ -674,11 +670,14 @@ where
|
||||
pub struct ResponseGuard {
|
||||
request_cancellation: RequestCancellation,
|
||||
request_id: u64,
|
||||
cancel: bool,
|
||||
}
|
||||
|
||||
impl Drop for ResponseGuard {
|
||||
fn drop(&mut self) {
|
||||
self.request_cancellation.cancel(self.request_id);
|
||||
if self.cancel {
|
||||
self.request_cancellation.cancel(self.request_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -721,7 +720,7 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
{
|
||||
let Self {
|
||||
response_tx,
|
||||
response_guard,
|
||||
mut response_guard,
|
||||
abort_registration,
|
||||
span,
|
||||
request:
|
||||
@@ -755,7 +754,7 @@ impl<Req, Res> InFlightRequest<Req, Res> {
|
||||
// Request processing has completed, meaning either the channel canceled the request or
|
||||
// a request was sent back to the channel. Either way, the channel will clean up the
|
||||
// request data, so the request does not need to be canceled.
|
||||
mem::forget(response_guard);
|
||||
response_guard.cancel = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::{
|
||||
};
|
||||
use futures::{task::*, Sink, Stream};
|
||||
use pin_project::pin_project;
|
||||
use std::{collections::VecDeque, io, mem::ManuallyDrop, pin::Pin, time::SystemTime};
|
||||
use std::{collections::VecDeque, io, pin::Pin, time::SystemTime};
|
||||
use tracing::Span;
|
||||
|
||||
#[pin_project]
|
||||
@@ -101,10 +101,11 @@ impl<Req, Resp> FakeChannel<io::Result<TrackedRequest<Req>>, Response<Resp>> {
|
||||
},
|
||||
abort_registration,
|
||||
span: Span::none(),
|
||||
response_guard: ManuallyDrop::new(ResponseGuard {
|
||||
response_guard: ResponseGuard {
|
||||
request_cancellation,
|
||||
request_id: id,
|
||||
}),
|
||||
cancel: false,
|
||||
},
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user