-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unbreak datafusion #1482
Unbreak datafusion #1482
Conversation
688194b
to
cbf3e90
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AhmedSoliman ! The changes look good to me, I'll test the PR locally in a bit, but otherwsie everything looks great!
for_each_state(schema, tx, rows); | ||
Ok(()) | ||
}; | ||
stream_builder.spawn_blocking(background_task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AhmedSoliman what are your thoughts about drooping the spawn_blocking
for these scans?
Is it a part of your plan mentioned offline of having eventually a dedicated runtime for df/rocksdb?
I'd assume these background threads will take awhile for large enough partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can move to normal async tasks once we move make the underlying db operations async. Whether this runs on its own runtime or sharing the runtime with the rest of the system is adjacent though.
} | ||
|
||
#[async_trait] | ||
impl<T, S> TableProvider for PartitionedTableProvider<T, S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 very nice, it reads very well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @AhmedSoliman. It is impressive how quickly you restructured our RocksDB layout! Really happy about how you changed it.
This PR looks good to me. +1 for merging.
) -> SendableRecordBatchStream { | ||
let range = PartitionKey::MIN..=PartitionKey::MAX; | ||
let status = self.0.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit and outside of this PR: It seems that were are using status and state interchangeably in this module. Maybe something to pull straight at some point to avoid confusion with the invocation_status table.
let mut transaction = partition_store.transaction(); | ||
let rows = transaction.all_inboxes(range); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, why do create a transaction here where for other table implementations this is not done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really know the origin but possibly to get a stable snapshot of the entire state?
async fn scan_partition_store( | ||
partition_store: PartitionStore, | ||
tx: Sender<Result<RecordBatch, datafusion::error::DataFusionError>>, | ||
range: RangeInclusive<PartitionKey>, | ||
projection: SchemaRef, | ||
) -> SendableRecordBatchStream { | ||
let db = self.0.clone(); | ||
let schema = projection.clone(); | ||
let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); | ||
let tx = stream_builder.tx(); | ||
let background_task = move || { | ||
let rows = db.all_invocation_status(range); | ||
for_each_status(schema, tx, rows); | ||
Ok(()) | ||
}; | ||
stream_builder.spawn_blocking(background_task); | ||
stream_builder.build() | ||
) { | ||
let rows = partition_store.all_invocation_status(range); | ||
for_each_status(projection, tx, rows).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it ok to run this operation on the calling thread instead of spawning a task on the blocking thread pool as before? Maybe related question: Why are some tables spawning tasks on a blocking thread pool and others not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it seems that the previous implementation of the df tables were not consistent (this one using a blocking send while others used non-blocking send, some implementations using transactions for reads, others read directly from the storage).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because it's now async, the caller wraps it in a task.
} | ||
|
||
impl<T, S> PartitionedTableProvider<T, S> { | ||
pub(crate) fn new(processors_manager: S, schema: SchemaRef, partition_scanner: T) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe s/processors_manager/partition_selector/?
|
||
Ok(()) | ||
}; | ||
stream_builder.spawn(background_task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not spawn blocking because we assume that the background_task
won't do too much blocking I/O, right?
**IMPORTANT:** This breaks queries through datafusion until we workout how data fusion will shard queries across partitions.
Unbreak datafusion
Stack created with Sapling. Best reviewed with ReviewStack.