diff --git a/src/aruna/aruna.api.dataproxy.services.v2.rs b/src/aruna/aruna.api.dataproxy.services.v2.rs index 1b6b319..6e66396 100644 --- a/src/aruna/aruna.api.dataproxy.services.v2.rs +++ b/src/aruna/aruna.api.dataproxy.services.v2.rs @@ -442,74 +442,174 @@ pub mod bundler_service_server { const NAME: &'static str = "aruna.api.dataproxy.services.v2.BundlerService"; } } +/// Messages (requests) from PROXY B #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct DataProxyInfo { +pub struct InitMessage { #[prost(string, tag = "1")] pub dataproxy_id: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "2")] + pub object_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InfoAckMessage { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChunkAckMessage { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, #[prost(int64, tag = "2")] - pub available_space: i64, + pub chunk_idx: i64, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct RequestReplicationRequest { - #[prost(message, optional, tag = "1")] - pub info: ::core::option::Option, - #[prost(bool, tag = "2")] - pub user_initialized: bool, +pub struct RetryChunkMessage { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, + #[prost(int64, tag = "2")] + pub chunk_idx: i64, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct DataInfo { +pub struct Empty {} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ErrorMessage { + #[prost(oneof = "error_message::Error", tags = "1, 2, 3")] + pub error: ::core::option::Option, +} +/// Nested message and enum types in `ErrorMessage`. +pub mod error_message { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Error { + #[prost(message, tag = "1")] + RetryChunk(super::RetryChunkMessage), + #[prost(message, tag = "2")] + Abort(super::Empty), + #[prost(string, tag = "3")] + RetryObjectId(::prost::alloc::string::String), + } +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PullReplicationRequest { + #[prost(oneof = "pull_replication_request::Message", tags = "1, 2, 3, 4, 5")] + pub message: ::core::option::Option, +} +/// Nested message and enum types in `PullReplicationRequest`. +pub mod pull_replication_request { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Message { + #[prost(message, tag = "1")] + InitMessage(super::InitMessage), + #[prost(message, tag = "2")] + InfoAckMessage(super::InfoAckMessage), + #[prost(message, tag = "3")] + ChunkAckMessage(super::ChunkAckMessage), + #[prost(message, tag = "4")] + ErrorMessage(super::ErrorMessage), + #[prost(message, tag = "5")] + FinishMessage(super::Empty), + } +} +/// Messages (responses) from PROXY A +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ObjectInfo { #[prost(string, tag = "1")] pub object_id: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub download_url: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub encryption_key: ::prost::alloc::string::String, - #[prost(bool, tag = "4")] - pub is_compressed: bool, + #[prost(int64, tag = "2")] + pub chunks: i64, + #[prost(int64, tag = "3")] + pub raw_size: i64, + #[prost(uint32, repeated, tag = "4")] + pub block_list: ::prost::alloc::vec::Vec, + /// JSON encoded proxy specific extra fields + #[prost(string, optional, tag = "5")] + pub extra: ::core::option::Option<::prost::alloc::string::String>, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct DataInfos { - #[prost(message, repeated, tag = "1")] - pub data_info: ::prost::alloc::vec::Vec, +pub struct Chunk { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, + #[prost(int64, tag = "2")] + pub chunk_idx: i64, + #[prost(bytes = "vec", tag = "3")] + pub data: ::prost::alloc::vec::Vec, + #[prost(string, tag = "4")] + pub checksum: ::prost::alloc::string::String, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct RequestReplicationResponse { - #[prost(oneof = "request_replication_response::Response", tags = "1, 2")] - pub response: ::core::option::Option, +pub struct PullReplicationResponse { + #[prost(oneof = "pull_replication_response::Message", tags = "1, 2, 3")] + pub message: ::core::option::Option, } -/// Nested message and enum types in `RequestReplicationResponse`. -pub mod request_replication_response { +/// Nested message and enum types in `PullReplicationResponse`. +pub mod pull_replication_response { #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Response { + pub enum Message { #[prost(message, tag = "1")] - DataInfos(super::DataInfos), - #[prost(bool, tag = "2")] - Ack(bool), + ObjectInfo(super::ObjectInfo), + /// If no ack is received, the chunk will be resent + #[prost(message, tag = "2")] + Chunk(super::Chunk), + #[prost(message, tag = "3")] + FinishMessage(super::Empty), } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct InitReplicationRequest { +pub struct DataInfo { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub download_url: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub encryption_key: ::prost::alloc::string::String, + #[prost(bool, tag = "4")] + pub is_compressed: bool, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DataInfos { + #[prost(message, repeated, tag = "1")] + pub data_info: ::prost::alloc::vec::Vec, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PushReplicationRequest { #[prost(message, optional, tag = "1")] pub data_infos: ::core::option::Option, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct InitReplicationResponse { +pub struct PushReplicationResponse { #[prost(bool, tag = "1")] pub ack: bool, } @@ -540,7 +640,7 @@ pub struct S3Path { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PushReplicaRequest { #[prost(string, tag = "3")] - pub target_location: ::prost::alloc::string::String, + pub target_endpoint_id: ::prost::alloc::string::String, #[prost(oneof = "push_replica_request::Resource", tags = "1, 2")] pub resource: ::core::option::Option, } @@ -586,6 +686,7 @@ pub mod pull_replica_request { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PullReplicaResponse { + /// why ? #[prost(string, tag = "1")] pub replication_id: ::prost::alloc::string::String, } @@ -593,6 +694,7 @@ pub struct PullReplicaResponse { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReplicationStatusRequest { + /// why ? #[prost(string, tag = "1")] pub replication_id: ::prost::alloc::string::String, } @@ -803,7 +905,7 @@ impl ReplicationStatus { } } /// Generated client implementations. -pub mod dataproxy_service_client { +pub mod dataproxy_replication_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; use tonic::codegen::http::Uri; @@ -813,10 +915,10 @@ pub mod dataproxy_service_client { /// /// Service for data replication between data-proxies #[derive(Debug, Clone)] - pub struct DataproxyServiceClient { + pub struct DataproxyReplicationServiceClient { inner: tonic::client::Grpc, } - impl DataproxyServiceClient { + impl DataproxyReplicationServiceClient { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where @@ -827,7 +929,7 @@ pub mod dataproxy_service_client { Ok(Self::new(conn)) } } - impl DataproxyServiceClient + impl DataproxyReplicationServiceClient where T: tonic::client::GrpcService, T::Error: Into, @@ -845,7 +947,7 @@ pub mod dataproxy_service_client { pub fn with_interceptor( inner: T, interceptor: F, - ) -> DataproxyServiceClient> + ) -> DataproxyReplicationServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, @@ -859,7 +961,9 @@ pub mod dataproxy_service_client { http::Request, >>::Error: Into + Send + Sync, { - DataproxyServiceClient::new(InterceptedService::new(inner, interceptor)) + DataproxyReplicationServiceClient::new( + InterceptedService::new(inner, interceptor), + ) } /// Compress requests with the given encoding. /// @@ -896,12 +1000,14 @@ pub mod dataproxy_service_client { /// /// Status: ALPHA /// - /// Creates a replication request - pub async fn request_replication( + /// Creates a replication stream + pub async fn pull_replication( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoStreamingRequest< + Message = super::PullReplicationRequest, + >, ) -> std::result::Result< - tonic::Response, + tonic::Response>, tonic::Status, > { self.inner @@ -915,28 +1021,28 @@ pub mod dataproxy_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/aruna.api.dataproxy.services.v2.DataproxyService/RequestReplication", + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PullReplication", ); - let mut req = request.into_request(); + let mut req = request.into_streaming_request(); req.extensions_mut() .insert( GrpcMethod::new( - "aruna.api.dataproxy.services.v2.DataproxyService", - "RequestReplication", + "aruna.api.dataproxy.services.v2.DataproxyReplicationService", + "PullReplication", ), ); - self.inner.unary(req, path, codec).await + self.inner.streaming(req, path, codec).await } /// InitReplication /// /// Status: ALPHA /// /// Provides the necessary url to init replication - pub async fn init_replication( + pub async fn push_replication( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, > { self.inner @@ -950,14 +1056,14 @@ pub mod dataproxy_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/aruna.api.dataproxy.services.v2.DataproxyService/InitReplication", + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PushReplication", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( - "aruna.api.dataproxy.services.v2.DataproxyService", - "InitReplication", + "aruna.api.dataproxy.services.v2.DataproxyReplicationService", + "PushReplication", ), ); self.inner.unary(req, path, codec).await @@ -1582,22 +1688,28 @@ pub mod dataproxy_user_service_client { } } /// Generated server implementations. -pub mod dataproxy_service_server { +pub mod dataproxy_replication_service_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with DataproxyServiceServer. + /// Generated trait containing gRPC methods that should be implemented for use with DataproxyReplicationServiceServer. #[async_trait] - pub trait DataproxyService: Send + Sync + 'static { + pub trait DataproxyReplicationService: Send + Sync + 'static { + /// Server streaming response type for the PullReplication method. + type PullReplicationStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; /// RequestReplication /// /// Status: ALPHA /// - /// Creates a replication request - async fn request_replication( + /// Creates a replication stream + async fn pull_replication( &self, - request: tonic::Request, + request: tonic::Request>, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; /// InitReplication @@ -1605,11 +1717,11 @@ pub mod dataproxy_service_server { /// Status: ALPHA /// /// Provides the necessary url to init replication - async fn init_replication( + async fn push_replication( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; } @@ -1619,7 +1731,7 @@ pub mod dataproxy_service_server { /// /// Service for data replication between data-proxies #[derive(Debug)] - pub struct DataproxyServiceServer { + pub struct DataproxyReplicationServiceServer { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, @@ -1627,7 +1739,7 @@ pub mod dataproxy_service_server { max_encoding_message_size: Option, } struct _Inner(Arc); - impl DataproxyServiceServer { + impl DataproxyReplicationServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -1679,9 +1791,10 @@ pub mod dataproxy_service_server { self } } - impl tonic::codegen::Service> for DataproxyServiceServer + impl tonic::codegen::Service> + for DataproxyReplicationServiceServer where - T: DataproxyService, + T: DataproxyReplicationService, B: Body + Send + 'static, B::Error: Into + Send + 'static, { @@ -1697,25 +1810,30 @@ pub mod dataproxy_service_server { fn call(&mut self, req: http::Request) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { - "/aruna.api.dataproxy.services.v2.DataproxyService/RequestReplication" => { + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PullReplication" => { #[allow(non_camel_case_types)] - struct RequestReplicationSvc(pub Arc); + struct PullReplicationSvc( + pub Arc, + ); impl< - T: DataproxyService, - > tonic::server::UnaryService - for RequestReplicationSvc { - type Response = super::RequestReplicationResponse; + T: DataproxyReplicationService, + > tonic::server::StreamingService + for PullReplicationSvc { + type Response = super::PullReplicationResponse; + type ResponseStream = T::PullReplicationStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request< + tonic::Streaming, + >, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::request_replication( + ::pull_replication( &inner, request, ) @@ -1731,7 +1849,7 @@ pub mod dataproxy_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = RequestReplicationSvc(inner); + let method = PullReplicationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -1742,30 +1860,35 @@ pub mod dataproxy_service_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.streaming(method, req).await; Ok(res) }; Box::pin(fut) } - "/aruna.api.dataproxy.services.v2.DataproxyService/InitReplication" => { + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PushReplication" => { #[allow(non_camel_case_types)] - struct InitReplicationSvc(pub Arc); + struct PushReplicationSvc( + pub Arc, + ); impl< - T: DataproxyService, - > tonic::server::UnaryService - for InitReplicationSvc { - type Response = super::InitReplicationResponse; + T: DataproxyReplicationService, + > tonic::server::UnaryService + for PushReplicationSvc { + type Response = super::PushReplicationResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::init_replication(&inner, request) + ::push_replication( + &inner, + request, + ) .await }; Box::pin(fut) @@ -1778,7 +1901,7 @@ pub mod dataproxy_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = InitReplicationSvc(inner); + let method = PushReplicationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -1809,7 +1932,7 @@ pub mod dataproxy_service_server { } } } - impl Clone for DataproxyServiceServer { + impl Clone for DataproxyReplicationServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -1821,7 +1944,7 @@ pub mod dataproxy_service_server { } } } - impl Clone for _Inner { + impl Clone for _Inner { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } @@ -1831,8 +1954,9 @@ pub mod dataproxy_service_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService for DataproxyServiceServer { - const NAME: &'static str = "aruna.api.dataproxy.services.v2.DataproxyService"; + impl tonic::server::NamedService + for DataproxyReplicationServiceServer { + const NAME: &'static str = "aruna.api.dataproxy.services.v2.DataproxyReplicationService"; } } /// Generated server implementations. diff --git a/src/aruna/aruna.api.storage.models.v2.rs b/src/aruna/aruna.api.storage.models.v2.rs index 5863285..702e9ff 100644 --- a/src/aruna/aruna.api.storage.models.v2.rs +++ b/src/aruna/aruna.api.storage.models.v2.rs @@ -237,10 +237,56 @@ pub struct Endpoint { pub struct DataEndpoint { #[prost(string, tag = "1")] pub id: ::prost::alloc::string::String, + #[prost(enumeration = "ReplicationStatus", optional, tag = "4")] + pub status: ::core::option::Option, /// Hint if the objects' project /// is fully synced to the endpoint - #[prost(bool, tag = "2")] - pub full_synced: bool, + #[prost(oneof = "data_endpoint::Variant", tags = "2, 3")] + pub variant: ::core::option::Option, +} +/// Nested message and enum types in `DataEndpoint`. +pub mod data_endpoint { + /// Hint if the objects' project + /// is fully synced to the endpoint + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Variant { + #[prost(message, tag = "2")] + FullSync(super::FullSync), + #[prost(message, tag = "3")] + PartialSync(super::PartialSync), + } +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FullSync { + #[prost(string, tag = "1")] + pub project_id: ::prost::alloc::string::String, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PartialSync { + #[prost(oneof = "partial_sync::Origin", tags = "1, 2, 3, 4")] + pub origin: ::core::option::Option, +} +/// Nested message and enum types in `PartialSync`. +pub mod partial_sync { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Origin { + #[prost(string, tag = "1")] + ProjectId(::prost::alloc::string::String), + #[prost(string, tag = "2")] + CollectionId(::prost::alloc::string::String), + #[prost(string, tag = "3")] + DatasetId(::prost::alloc::string::String), + #[prost(string, tag = "4")] + ObjectId(::prost::alloc::string::String), + } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -951,3 +997,39 @@ impl ResourceVariant { } } } +#[derive(serde::Deserialize, serde::Serialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ReplicationStatus { + Unspecified = 0, + Waiting = 1, + Running = 2, + Finished = 3, + Error = 4, +} +impl ReplicationStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ReplicationStatus::Unspecified => "REPLICATION_STATUS_UNSPECIFIED", + ReplicationStatus::Waiting => "REPLICATION_STATUS_WAITING", + ReplicationStatus::Running => "REPLICATION_STATUS_RUNNING", + ReplicationStatus::Finished => "REPLICATION_STATUS_FINISHED", + ReplicationStatus::Error => "REPLICATION_STATUS_ERROR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "REPLICATION_STATUS_UNSPECIFIED" => Some(Self::Unspecified), + "REPLICATION_STATUS_WAITING" => Some(Self::Waiting), + "REPLICATION_STATUS_RUNNING" => Some(Self::Running), + "REPLICATION_STATUS_FINISHED" => Some(Self::Finished), + "REPLICATION_STATUS_ERROR" => Some(Self::Error), + _ => None, + } + } +} diff --git a/src/aruna/aruna.api.storage.services.v2.rs b/src/aruna/aruna.api.storage.services.v2.rs index 5a1af1a..5212f5f 100644 --- a/src/aruna/aruna.api.storage.services.v2.rs +++ b/src/aruna/aruna.api.storage.services.v2.rs @@ -3604,23 +3604,39 @@ pub struct ReplicateProjectDataRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReplicateProjectDataResponse { - #[prost(enumeration = "ReplicationStatus", tag = "1")] + #[prost(enumeration = "super::super::models::v2::ReplicationStatus", tag = "1")] pub status: i32, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PartialReplicateDataRequest { - #[prost(string, tag = "1")] - pub resource_id: ::prost::alloc::string::String, - #[prost(string, tag = "2")] + #[prost(string, tag = "4")] pub endpoint_id: ::prost::alloc::string::String, + #[prost(oneof = "partial_replicate_data_request::DataVariant", tags = "1, 2, 3")] + pub data_variant: ::core::option::Option< + partial_replicate_data_request::DataVariant, + >, +} +/// Nested message and enum types in `PartialReplicateDataRequest`. +pub mod partial_replicate_data_request { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum DataVariant { + #[prost(string, tag = "1")] + CollectionId(::prost::alloc::string::String), + #[prost(string, tag = "2")] + DatasetId(::prost::alloc::string::String), + #[prost(string, tag = "3")] + ObjectId(::prost::alloc::string::String), + } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PartialReplicateDataResponse { - #[prost(enumeration = "ReplicationStatus", tag = "1")] + #[prost(enumeration = "super::super::models::v2::ReplicationStatus", tag = "1")] pub status: i32, } #[derive(serde::Deserialize, serde::Serialize)] @@ -3628,10 +3644,10 @@ pub struct PartialReplicateDataResponse { #[derive(Clone, PartialEq, ::prost::Message)] pub struct UpdateReplicationStatusRequest { #[prost(string, tag = "1")] - pub resource_id: ::prost::alloc::string::String, + pub object_id: ::prost::alloc::string::String, #[prost(string, tag = "2")] pub endpoint_id: ::prost::alloc::string::String, - #[prost(enumeration = "ReplicationStatus", tag = "3")] + #[prost(enumeration = "super::super::models::v2::ReplicationStatus", tag = "3")] pub status: i32, } #[derive(serde::Deserialize, serde::Serialize)] @@ -3651,8 +3667,33 @@ pub struct GetReplicationStatusRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetReplicationStatusResponse { - #[prost(enumeration = "ReplicationStatus", tag = "1")] - pub status: i32, + #[prost(message, repeated, tag = "1")] + pub infos: ::prost::alloc::vec::Vec, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReplicationInfo { + #[prost(message, optional, tag = "5")] + pub endpoint_info: ::core::option::Option, + #[prost(oneof = "replication_info::Resource", tags = "1, 2, 3, 4")] + pub resource: ::core::option::Option, +} +/// Nested message and enum types in `ReplicationInfo`. +pub mod replication_info { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Resource { + #[prost(string, tag = "1")] + ProjectId(::prost::alloc::string::String), + #[prost(string, tag = "2")] + CollectionId(::prost::alloc::string::String), + #[prost(string, tag = "3")] + DatasetId(::prost::alloc::string::String), + #[prost(string, tag = "4")] + ObjectId(::prost::alloc::string::String), + } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -3667,49 +3708,12 @@ pub struct DeleteReplicationRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DeleteReplicationResponse {} -#[derive(serde::Deserialize, serde::Serialize)] -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum ReplicationStatus { - Unspecified = 0, - Waiting = 1, - Running = 2, - Finished = 3, - Error = 4, -} -impl ReplicationStatus { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - ReplicationStatus::Unspecified => "REPLICATION_STATUS_UNSPECIFIED", - ReplicationStatus::Waiting => "REPLICATION_STATUS_WAITING", - ReplicationStatus::Running => "REPLICATION_STATUS_RUNNING", - ReplicationStatus::Finished => "REPLICATION_STATUS_FINISHED", - ReplicationStatus::Error => "REPLICATION_STATUS_ERROR", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "REPLICATION_STATUS_UNSPECIFIED" => Some(Self::Unspecified), - "REPLICATION_STATUS_WAITING" => Some(Self::Waiting), - "REPLICATION_STATUS_RUNNING" => Some(Self::Running), - "REPLICATION_STATUS_FINISHED" => Some(Self::Finished), - "REPLICATION_STATUS_ERROR" => Some(Self::Error), - _ => None, - } - } -} /// Generated client implementations. pub mod data_replication_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; use tonic::codegen::http::Uri; /// DataReplicationService - /// /// Endpoint specific methods for syncing data #[derive(Debug, Clone)] pub struct DataReplicationServiceClient { @@ -4039,7 +4043,6 @@ pub mod data_replication_service_server { >; } /// DataReplicationService - /// /// Endpoint specific methods for syncing data #[derive(Debug)] pub struct DataReplicationServiceServer { diff --git a/src/aruna_no_transport/aruna.api.dataproxy.services.v2.rs b/src/aruna_no_transport/aruna.api.dataproxy.services.v2.rs index 909516c..5924687 100644 --- a/src/aruna_no_transport/aruna.api.dataproxy.services.v2.rs +++ b/src/aruna_no_transport/aruna.api.dataproxy.services.v2.rs @@ -431,74 +431,174 @@ pub mod bundler_service_server { const NAME: &'static str = "aruna.api.dataproxy.services.v2.BundlerService"; } } +/// Messages (requests) from PROXY B #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct DataProxyInfo { +pub struct InitMessage { #[prost(string, tag = "1")] pub dataproxy_id: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "2")] + pub object_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InfoAckMessage { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChunkAckMessage { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, #[prost(int64, tag = "2")] - pub available_space: i64, + pub chunk_idx: i64, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct RequestReplicationRequest { - #[prost(message, optional, tag = "1")] - pub info: ::core::option::Option, - #[prost(bool, tag = "2")] - pub user_initialized: bool, +pub struct RetryChunkMessage { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, + #[prost(int64, tag = "2")] + pub chunk_idx: i64, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct DataInfo { +pub struct Empty {} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ErrorMessage { + #[prost(oneof = "error_message::Error", tags = "1, 2, 3")] + pub error: ::core::option::Option, +} +/// Nested message and enum types in `ErrorMessage`. +pub mod error_message { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Error { + #[prost(message, tag = "1")] + RetryChunk(super::RetryChunkMessage), + #[prost(message, tag = "2")] + Abort(super::Empty), + #[prost(string, tag = "3")] + RetryObjectId(::prost::alloc::string::String), + } +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PullReplicationRequest { + #[prost(oneof = "pull_replication_request::Message", tags = "1, 2, 3, 4, 5")] + pub message: ::core::option::Option, +} +/// Nested message and enum types in `PullReplicationRequest`. +pub mod pull_replication_request { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Message { + #[prost(message, tag = "1")] + InitMessage(super::InitMessage), + #[prost(message, tag = "2")] + InfoAckMessage(super::InfoAckMessage), + #[prost(message, tag = "3")] + ChunkAckMessage(super::ChunkAckMessage), + #[prost(message, tag = "4")] + ErrorMessage(super::ErrorMessage), + #[prost(message, tag = "5")] + FinishMessage(super::Empty), + } +} +/// Messages (responses) from PROXY A +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ObjectInfo { #[prost(string, tag = "1")] pub object_id: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub download_url: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub encryption_key: ::prost::alloc::string::String, - #[prost(bool, tag = "4")] - pub is_compressed: bool, + #[prost(int64, tag = "2")] + pub chunks: i64, + #[prost(int64, tag = "3")] + pub raw_size: i64, + #[prost(uint32, repeated, tag = "4")] + pub block_list: ::prost::alloc::vec::Vec, + /// JSON encoded proxy specific extra fields + #[prost(string, optional, tag = "5")] + pub extra: ::core::option::Option<::prost::alloc::string::String>, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct DataInfos { - #[prost(message, repeated, tag = "1")] - pub data_info: ::prost::alloc::vec::Vec, +pub struct Chunk { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, + #[prost(int64, tag = "2")] + pub chunk_idx: i64, + #[prost(bytes = "vec", tag = "3")] + pub data: ::prost::alloc::vec::Vec, + #[prost(string, tag = "4")] + pub checksum: ::prost::alloc::string::String, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct RequestReplicationResponse { - #[prost(oneof = "request_replication_response::Response", tags = "1, 2")] - pub response: ::core::option::Option, +pub struct PullReplicationResponse { + #[prost(oneof = "pull_replication_response::Message", tags = "1, 2, 3")] + pub message: ::core::option::Option, } -/// Nested message and enum types in `RequestReplicationResponse`. -pub mod request_replication_response { +/// Nested message and enum types in `PullReplicationResponse`. +pub mod pull_replication_response { #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Response { + pub enum Message { #[prost(message, tag = "1")] - DataInfos(super::DataInfos), - #[prost(bool, tag = "2")] - Ack(bool), + ObjectInfo(super::ObjectInfo), + /// If no ack is received, the chunk will be resent + #[prost(message, tag = "2")] + Chunk(super::Chunk), + #[prost(message, tag = "3")] + FinishMessage(super::Empty), } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct InitReplicationRequest { +pub struct DataInfo { + #[prost(string, tag = "1")] + pub object_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub download_url: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub encryption_key: ::prost::alloc::string::String, + #[prost(bool, tag = "4")] + pub is_compressed: bool, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DataInfos { + #[prost(message, repeated, tag = "1")] + pub data_info: ::prost::alloc::vec::Vec, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PushReplicationRequest { #[prost(message, optional, tag = "1")] pub data_infos: ::core::option::Option, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct InitReplicationResponse { +pub struct PushReplicationResponse { #[prost(bool, tag = "1")] pub ack: bool, } @@ -529,7 +629,7 @@ pub struct S3Path { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PushReplicaRequest { #[prost(string, tag = "3")] - pub target_location: ::prost::alloc::string::String, + pub target_endpoint_id: ::prost::alloc::string::String, #[prost(oneof = "push_replica_request::Resource", tags = "1, 2")] pub resource: ::core::option::Option, } @@ -575,6 +675,7 @@ pub mod pull_replica_request { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PullReplicaResponse { + /// why ? #[prost(string, tag = "1")] pub replication_id: ::prost::alloc::string::String, } @@ -582,6 +683,7 @@ pub struct PullReplicaResponse { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReplicationStatusRequest { + /// why ? #[prost(string, tag = "1")] pub replication_id: ::prost::alloc::string::String, } @@ -792,7 +894,7 @@ impl ReplicationStatus { } } /// Generated client implementations. -pub mod dataproxy_service_client { +pub mod dataproxy_replication_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; use tonic::codegen::http::Uri; @@ -802,10 +904,10 @@ pub mod dataproxy_service_client { /// /// Service for data replication between data-proxies #[derive(Debug, Clone)] - pub struct DataproxyServiceClient { + pub struct DataproxyReplicationServiceClient { inner: tonic::client::Grpc, } - impl DataproxyServiceClient + impl DataproxyReplicationServiceClient where T: tonic::client::GrpcService, T::Error: Into, @@ -823,7 +925,7 @@ pub mod dataproxy_service_client { pub fn with_interceptor( inner: T, interceptor: F, - ) -> DataproxyServiceClient> + ) -> DataproxyReplicationServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, @@ -837,7 +939,9 @@ pub mod dataproxy_service_client { http::Request, >>::Error: Into + Send + Sync, { - DataproxyServiceClient::new(InterceptedService::new(inner, interceptor)) + DataproxyReplicationServiceClient::new( + InterceptedService::new(inner, interceptor), + ) } /// Compress requests with the given encoding. /// @@ -874,12 +978,14 @@ pub mod dataproxy_service_client { /// /// Status: ALPHA /// - /// Creates a replication request - pub async fn request_replication( + /// Creates a replication stream + pub async fn pull_replication( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoStreamingRequest< + Message = super::PullReplicationRequest, + >, ) -> std::result::Result< - tonic::Response, + tonic::Response>, tonic::Status, > { self.inner @@ -893,28 +999,28 @@ pub mod dataproxy_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/aruna.api.dataproxy.services.v2.DataproxyService/RequestReplication", + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PullReplication", ); - let mut req = request.into_request(); + let mut req = request.into_streaming_request(); req.extensions_mut() .insert( GrpcMethod::new( - "aruna.api.dataproxy.services.v2.DataproxyService", - "RequestReplication", + "aruna.api.dataproxy.services.v2.DataproxyReplicationService", + "PullReplication", ), ); - self.inner.unary(req, path, codec).await + self.inner.streaming(req, path, codec).await } /// InitReplication /// /// Status: ALPHA /// /// Provides the necessary url to init replication - pub async fn init_replication( + pub async fn push_replication( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, > { self.inner @@ -928,14 +1034,14 @@ pub mod dataproxy_service_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/aruna.api.dataproxy.services.v2.DataproxyService/InitReplication", + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PushReplication", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( - "aruna.api.dataproxy.services.v2.DataproxyService", - "InitReplication", + "aruna.api.dataproxy.services.v2.DataproxyReplicationService", + "PushReplication", ), ); self.inner.unary(req, path, codec).await @@ -1538,22 +1644,28 @@ pub mod dataproxy_user_service_client { } } /// Generated server implementations. -pub mod dataproxy_service_server { +pub mod dataproxy_replication_service_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with DataproxyServiceServer. + /// Generated trait containing gRPC methods that should be implemented for use with DataproxyReplicationServiceServer. #[async_trait] - pub trait DataproxyService: Send + Sync + 'static { + pub trait DataproxyReplicationService: Send + Sync + 'static { + /// Server streaming response type for the PullReplication method. + type PullReplicationStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; /// RequestReplication /// /// Status: ALPHA /// - /// Creates a replication request - async fn request_replication( + /// Creates a replication stream + async fn pull_replication( &self, - request: tonic::Request, + request: tonic::Request>, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; /// InitReplication @@ -1561,11 +1673,11 @@ pub mod dataproxy_service_server { /// Status: ALPHA /// /// Provides the necessary url to init replication - async fn init_replication( + async fn push_replication( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; } @@ -1575,7 +1687,7 @@ pub mod dataproxy_service_server { /// /// Service for data replication between data-proxies #[derive(Debug)] - pub struct DataproxyServiceServer { + pub struct DataproxyReplicationServiceServer { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, @@ -1583,7 +1695,7 @@ pub mod dataproxy_service_server { max_encoding_message_size: Option, } struct _Inner(Arc); - impl DataproxyServiceServer { + impl DataproxyReplicationServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -1635,9 +1747,10 @@ pub mod dataproxy_service_server { self } } - impl tonic::codegen::Service> for DataproxyServiceServer + impl tonic::codegen::Service> + for DataproxyReplicationServiceServer where - T: DataproxyService, + T: DataproxyReplicationService, B: Body + Send + 'static, B::Error: Into + Send + 'static, { @@ -1653,25 +1766,30 @@ pub mod dataproxy_service_server { fn call(&mut self, req: http::Request) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { - "/aruna.api.dataproxy.services.v2.DataproxyService/RequestReplication" => { + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PullReplication" => { #[allow(non_camel_case_types)] - struct RequestReplicationSvc(pub Arc); + struct PullReplicationSvc( + pub Arc, + ); impl< - T: DataproxyService, - > tonic::server::UnaryService - for RequestReplicationSvc { - type Response = super::RequestReplicationResponse; + T: DataproxyReplicationService, + > tonic::server::StreamingService + for PullReplicationSvc { + type Response = super::PullReplicationResponse; + type ResponseStream = T::PullReplicationStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request< + tonic::Streaming, + >, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::request_replication( + ::pull_replication( &inner, request, ) @@ -1687,7 +1805,7 @@ pub mod dataproxy_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = RequestReplicationSvc(inner); + let method = PullReplicationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -1698,30 +1816,35 @@ pub mod dataproxy_service_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.streaming(method, req).await; Ok(res) }; Box::pin(fut) } - "/aruna.api.dataproxy.services.v2.DataproxyService/InitReplication" => { + "/aruna.api.dataproxy.services.v2.DataproxyReplicationService/PushReplication" => { #[allow(non_camel_case_types)] - struct InitReplicationSvc(pub Arc); + struct PushReplicationSvc( + pub Arc, + ); impl< - T: DataproxyService, - > tonic::server::UnaryService - for InitReplicationSvc { - type Response = super::InitReplicationResponse; + T: DataproxyReplicationService, + > tonic::server::UnaryService + for PushReplicationSvc { + type Response = super::PushReplicationResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::init_replication(&inner, request) + ::push_replication( + &inner, + request, + ) .await }; Box::pin(fut) @@ -1734,7 +1857,7 @@ pub mod dataproxy_service_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = InitReplicationSvc(inner); + let method = PushReplicationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -1765,7 +1888,7 @@ pub mod dataproxy_service_server { } } } - impl Clone for DataproxyServiceServer { + impl Clone for DataproxyReplicationServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -1777,7 +1900,7 @@ pub mod dataproxy_service_server { } } } - impl Clone for _Inner { + impl Clone for _Inner { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } @@ -1787,8 +1910,9 @@ pub mod dataproxy_service_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService for DataproxyServiceServer { - const NAME: &'static str = "aruna.api.dataproxy.services.v2.DataproxyService"; + impl tonic::server::NamedService + for DataproxyReplicationServiceServer { + const NAME: &'static str = "aruna.api.dataproxy.services.v2.DataproxyReplicationService"; } } /// Generated server implementations. diff --git a/src/aruna_no_transport/aruna.api.storage.models.v2.rs b/src/aruna_no_transport/aruna.api.storage.models.v2.rs index 5863285..702e9ff 100644 --- a/src/aruna_no_transport/aruna.api.storage.models.v2.rs +++ b/src/aruna_no_transport/aruna.api.storage.models.v2.rs @@ -237,10 +237,56 @@ pub struct Endpoint { pub struct DataEndpoint { #[prost(string, tag = "1")] pub id: ::prost::alloc::string::String, + #[prost(enumeration = "ReplicationStatus", optional, tag = "4")] + pub status: ::core::option::Option, /// Hint if the objects' project /// is fully synced to the endpoint - #[prost(bool, tag = "2")] - pub full_synced: bool, + #[prost(oneof = "data_endpoint::Variant", tags = "2, 3")] + pub variant: ::core::option::Option, +} +/// Nested message and enum types in `DataEndpoint`. +pub mod data_endpoint { + /// Hint if the objects' project + /// is fully synced to the endpoint + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Variant { + #[prost(message, tag = "2")] + FullSync(super::FullSync), + #[prost(message, tag = "3")] + PartialSync(super::PartialSync), + } +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FullSync { + #[prost(string, tag = "1")] + pub project_id: ::prost::alloc::string::String, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PartialSync { + #[prost(oneof = "partial_sync::Origin", tags = "1, 2, 3, 4")] + pub origin: ::core::option::Option, +} +/// Nested message and enum types in `PartialSync`. +pub mod partial_sync { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Origin { + #[prost(string, tag = "1")] + ProjectId(::prost::alloc::string::String), + #[prost(string, tag = "2")] + CollectionId(::prost::alloc::string::String), + #[prost(string, tag = "3")] + DatasetId(::prost::alloc::string::String), + #[prost(string, tag = "4")] + ObjectId(::prost::alloc::string::String), + } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -951,3 +997,39 @@ impl ResourceVariant { } } } +#[derive(serde::Deserialize, serde::Serialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ReplicationStatus { + Unspecified = 0, + Waiting = 1, + Running = 2, + Finished = 3, + Error = 4, +} +impl ReplicationStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ReplicationStatus::Unspecified => "REPLICATION_STATUS_UNSPECIFIED", + ReplicationStatus::Waiting => "REPLICATION_STATUS_WAITING", + ReplicationStatus::Running => "REPLICATION_STATUS_RUNNING", + ReplicationStatus::Finished => "REPLICATION_STATUS_FINISHED", + ReplicationStatus::Error => "REPLICATION_STATUS_ERROR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "REPLICATION_STATUS_UNSPECIFIED" => Some(Self::Unspecified), + "REPLICATION_STATUS_WAITING" => Some(Self::Waiting), + "REPLICATION_STATUS_RUNNING" => Some(Self::Running), + "REPLICATION_STATUS_FINISHED" => Some(Self::Finished), + "REPLICATION_STATUS_ERROR" => Some(Self::Error), + _ => None, + } + } +} diff --git a/src/aruna_no_transport/aruna.api.storage.services.v2.rs b/src/aruna_no_transport/aruna.api.storage.services.v2.rs index d73424d..81d565d 100644 --- a/src/aruna_no_transport/aruna.api.storage.services.v2.rs +++ b/src/aruna_no_transport/aruna.api.storage.services.v2.rs @@ -3571,23 +3571,39 @@ pub struct ReplicateProjectDataRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReplicateProjectDataResponse { - #[prost(enumeration = "ReplicationStatus", tag = "1")] + #[prost(enumeration = "super::super::models::v2::ReplicationStatus", tag = "1")] pub status: i32, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PartialReplicateDataRequest { - #[prost(string, tag = "1")] - pub resource_id: ::prost::alloc::string::String, - #[prost(string, tag = "2")] + #[prost(string, tag = "4")] pub endpoint_id: ::prost::alloc::string::String, + #[prost(oneof = "partial_replicate_data_request::DataVariant", tags = "1, 2, 3")] + pub data_variant: ::core::option::Option< + partial_replicate_data_request::DataVariant, + >, +} +/// Nested message and enum types in `PartialReplicateDataRequest`. +pub mod partial_replicate_data_request { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum DataVariant { + #[prost(string, tag = "1")] + CollectionId(::prost::alloc::string::String), + #[prost(string, tag = "2")] + DatasetId(::prost::alloc::string::String), + #[prost(string, tag = "3")] + ObjectId(::prost::alloc::string::String), + } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PartialReplicateDataResponse { - #[prost(enumeration = "ReplicationStatus", tag = "1")] + #[prost(enumeration = "super::super::models::v2::ReplicationStatus", tag = "1")] pub status: i32, } #[derive(serde::Deserialize, serde::Serialize)] @@ -3595,10 +3611,10 @@ pub struct PartialReplicateDataResponse { #[derive(Clone, PartialEq, ::prost::Message)] pub struct UpdateReplicationStatusRequest { #[prost(string, tag = "1")] - pub resource_id: ::prost::alloc::string::String, + pub object_id: ::prost::alloc::string::String, #[prost(string, tag = "2")] pub endpoint_id: ::prost::alloc::string::String, - #[prost(enumeration = "ReplicationStatus", tag = "3")] + #[prost(enumeration = "super::super::models::v2::ReplicationStatus", tag = "3")] pub status: i32, } #[derive(serde::Deserialize, serde::Serialize)] @@ -3618,8 +3634,33 @@ pub struct GetReplicationStatusRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetReplicationStatusResponse { - #[prost(enumeration = "ReplicationStatus", tag = "1")] - pub status: i32, + #[prost(message, repeated, tag = "1")] + pub infos: ::prost::alloc::vec::Vec, +} +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReplicationInfo { + #[prost(message, optional, tag = "5")] + pub endpoint_info: ::core::option::Option, + #[prost(oneof = "replication_info::Resource", tags = "1, 2, 3, 4")] + pub resource: ::core::option::Option, +} +/// Nested message and enum types in `ReplicationInfo`. +pub mod replication_info { + #[derive(serde::Deserialize, serde::Serialize)] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Resource { + #[prost(string, tag = "1")] + ProjectId(::prost::alloc::string::String), + #[prost(string, tag = "2")] + CollectionId(::prost::alloc::string::String), + #[prost(string, tag = "3")] + DatasetId(::prost::alloc::string::String), + #[prost(string, tag = "4")] + ObjectId(::prost::alloc::string::String), + } } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -3634,49 +3675,12 @@ pub struct DeleteReplicationRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DeleteReplicationResponse {} -#[derive(serde::Deserialize, serde::Serialize)] -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum ReplicationStatus { - Unspecified = 0, - Waiting = 1, - Running = 2, - Finished = 3, - Error = 4, -} -impl ReplicationStatus { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - ReplicationStatus::Unspecified => "REPLICATION_STATUS_UNSPECIFIED", - ReplicationStatus::Waiting => "REPLICATION_STATUS_WAITING", - ReplicationStatus::Running => "REPLICATION_STATUS_RUNNING", - ReplicationStatus::Finished => "REPLICATION_STATUS_FINISHED", - ReplicationStatus::Error => "REPLICATION_STATUS_ERROR", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "REPLICATION_STATUS_UNSPECIFIED" => Some(Self::Unspecified), - "REPLICATION_STATUS_WAITING" => Some(Self::Waiting), - "REPLICATION_STATUS_RUNNING" => Some(Self::Running), - "REPLICATION_STATUS_FINISHED" => Some(Self::Finished), - "REPLICATION_STATUS_ERROR" => Some(Self::Error), - _ => None, - } - } -} /// Generated client implementations. pub mod data_replication_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; use tonic::codegen::http::Uri; /// DataReplicationService - /// /// Endpoint specific methods for syncing data #[derive(Debug, Clone)] pub struct DataReplicationServiceClient { @@ -3995,7 +3999,6 @@ pub mod data_replication_service_server { >; } /// DataReplicationService - /// /// Endpoint specific methods for syncing data #[derive(Debug)] pub struct DataReplicationServiceServer {