Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unbreak datafusion #1482

Merged
merged 4 commits into from
Apr 30, 2024
Merged

Unbreak datafusion #1482

merged 4 commits into from
Apr 30, 2024

Conversation

Copy link

github-actions bot commented Apr 29, 2024

Test Results

 99 files  +4   99 suites  +4   8m 39s ⏱️ +52s
 84 tests +2   82 ✅ +2  2 💤 ±0  0 ❌ ±0 
216 runs  +4  210 ✅ +4  6 💤 ±0  0 ❌ ±0 

Results for commit 348b18a. ± Comparison against base commit 05eb4ba.

♻️ This comment has been updated with latest results.

@AhmedSoliman AhmedSoliman force-pushed the pr1482 branch 2 times, most recently from 688194b to cbf3e90 Compare April 30, 2024 07:12
Copy link
Contributor

@igalshilman igalshilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AhmedSoliman ! The changes look good to me, I'll test the PR locally in a bit, but otherwsie everything looks great!

for_each_state(schema, tx, rows);
Ok(())
};
stream_builder.spawn_blocking(background_task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AhmedSoliman what are your thoughts about drooping the spawn_blocking for these scans?
Is it a part of your plan mentioned offline of having eventually a dedicated runtime for df/rocksdb?

I'd assume these background threads will take awhile for large enough partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can move to normal async tasks once we move make the underlying db operations async. Whether this runs on its own runtime or sharing the runtime with the rest of the system is adjacent though.

}

#[async_trait]
impl<T, S> TableProvider for PartitionedTableProvider<T, S>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 very nice, it reads very well

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @AhmedSoliman. It is impressive how quickly you restructured our RocksDB layout! Really happy about how you changed it.

This PR looks good to me. +1 for merging.

) -> SendableRecordBatchStream {
let range = PartitionKey::MIN..=PartitionKey::MAX;
let status = self.0.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit and outside of this PR: It seems that were are using status and state interchangeably in this module. Maybe something to pull straight at some point to avoid confusion with the invocation_status table.

Comment on lines +57 to +58
let mut transaction = partition_store.transaction();
let rows = transaction.all_inboxes(range);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why do create a transaction here where for other table implementations this is not done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really know the origin but possibly to get a stable snapshot of the entire state?

Comment on lines +49 to +56
async fn scan_partition_store(
partition_store: PartitionStore,
tx: Sender<Result<RecordBatch, datafusion::error::DataFusionError>>,
range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
) -> SendableRecordBatchStream {
let db = self.0.clone();
let schema = projection.clone();
let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16);
let tx = stream_builder.tx();
let background_task = move || {
let rows = db.all_invocation_status(range);
for_each_status(schema, tx, rows);
Ok(())
};
stream_builder.spawn_blocking(background_task);
stream_builder.build()
) {
let rows = partition_store.all_invocation_status(range);
for_each_status(projection, tx, rows).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it ok to run this operation on the calling thread instead of spawning a task on the blocking thread pool as before? Maybe related question: Why are some tables spawning tasks on a blocking thread pool and others not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it seems that the previous implementation of the df tables were not consistent (this one using a blocking send while others used non-blocking send, some implementations using transactions for reads, others read directly from the storage).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it's now async, the caller wraps it in a task.

}

impl<T, S> PartitionedTableProvider<T, S> {
pub(crate) fn new(processors_manager: S, schema: SchemaRef, partition_scanner: T) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe s/processors_manager/partition_selector/?


Ok(())
};
stream_builder.spawn(background_task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not spawn blocking because we assume that the background_task won't do too much blocking I/O, right?

**IMPORTANT:** This breaks queries through datafusion until we workout how data fusion will shard queries across partitions.
@AhmedSoliman AhmedSoliman merged commit 8ee4120 into main Apr 30, 2024
10 checks passed
@AhmedSoliman AhmedSoliman deleted the pr1482 branch April 30, 2024 13:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants