Skip to content

Minimize copies #111

Closed
Closed
@frankmcsherry

Description

I've been trying to do a bit of review of the copies that go on in the timely and timely communication path. I think several of them can be removed, but first I thought I would try and explain what each of them are.

Let's go in order from a message received in timely communication, in BinaryReceiver::recv_loop().

  1. There is almost certainly a copy in

    let read = self.reader.read(&mut self.buffer[self.length..]).unwrap_or(0);

    where we collect whatever data the kernel has for us. In the absence of some zero-copy interface to the networking, I think this is probably going to stick around. Though, we may have to think a bit harder about where we copy into.

  2. Just a bit below, we have

    target.send(slice[..h_len].to_vec()).unwrap();

    This is where we peel out the bytes destined for a specific (worker, dataflow, channel) tuple and send the bytes along to that destination. Because this is a different thread with no lifetime relationship, we invoke .to_vec() to get an owned allocation.

  3. The byte allocation arrives at communication's binary::Puller, where it is not copied. This is a clever moment where we deserialize into the Message type, whose from_bytes method takes ownership of the Vec<u8> buffer and invokes Abomonation's decode method to get references.

  4. This Message gets handed to operator code, and if the operator only needs a &[Record] then no copy needs to happen. However, if the operator needs a &mut Vec<Record> then the DerefMut implementation will invoke a clone() on the &Vec<Record>, which will surely do some allocations. The byte buffer is dropped at this point.

  5. Operators can supply outputs either record-by-record, or as a ready-to-send batch of records. In either case, if they hit a data exchange channel they will need to be moved. This is essentially a copy, but it seems largely unavoidable if we want to put the records destined for remote workers into contiguous memory. This is where the "shuffle" actually needs to happen, and it seems legit.

  6. If serialization is required, then <Message as Serialize>::into_bytes() is invoked, and it will do an allocation of a Vec<u8> and a copy into it. The only way we know how to turn general Vec<Record> types into bytes is using Abomonation's encode, and this copies. In principle, we could "steal" the allocation of the vector itself, and only serialize subsequent owned data.

  7. The header (fixed sized struct) and bytes are sent to BinarySender::send_loop(), in which we write both to a W: Writer. This happens to be a BufWritter wrapped around a network stream, which mean a copy into the buffer, and probably a copy out of the buffer when it eventually gets around to writing at the network stream in bulk (the second of which is intrinsic in the TcpStream api).

I think three of these are somewhat non-negotiable at the moment: i. the copy from kernel buffers when we read from the network stream (in 1.), ii. the copy as we do the data shuffle (in 5.), and the copy back into kernel buffers (in 7.).

This leaves us with four potential copies that could be superfluous.

  1. This copy could be avoided using something like the bytes crate, where one hands out multiple references to a common allocation, and the API ensures that the references are to disjoint ranges.

    This could also be avoided by doing smaller reads into independently owned allocations; each read pulls down the next payload and the subsequent header, which tells us how much to read for the next allocation (and could tell us a size and alignment). This has the potential risk that if there are many small methods we do many small reads, possibly doing lots of kernel crossings. In that case, it seems like copies are an unavoidable cost of moving many messages using few kernel crossings.

  2. This wasn't actually a copy, but it has a number so we want to put it here.

  3. This copy is self-inflicted, in that one could write operator code that doesn't even need a mutable reference to the source data. It isn't always natural to do this, but if your code insists on owned data with owned allocations then this is non-negotiable, as we don't control the Rust codegen that needs to work correctly with the data it is handed.

    One candidate bit of cuteness is: if we are handed an owned Vec<u8>, in conflict with the optimization for (2.), we could arrange that the data are laid out so that the same allocation can be used as the spine of the Vec<Record>. This could still mean copying if these types have allocations behind them, and it is important that we got the Vec<u8> as a Vec<Record> because the deallocation logic is allowed to explode if we dealloc with a different size or alignment, but it could be possible for something like this to work.

  4. We decided that the shuffle move was non-optional, but I have to put it here to make markdown numbers work out.

  5. When we go from Vec<Record> to Vec<u8> we have relatively few options. We could grab the spine of the vector and only serialize auxiliary data (perhaps to the same allocation, if there is enough space). This would mean no copies here for data without further owned memory, and in the absence of any further information we would have no choice I think (if each Record contains a bunch of String fields and such).

    One alternative is something like the CapnProto builder patterns, where instead of allocating a Rust object for output you directly serialize the result at a byte buffer. This is possible, though I don't know how ergonomic it ends up being (that is, you could write the code by hand, but you probably wouldn't want to).

  6. One of these copies seems unescapable (the kernel buffer copy), but the BufWriter copy seems optional. It does some very good things for us, in terms of minimizing kernel crossings. This could be somewhat avoided if each worker produced a consolidated Vec<u8> to send to each remote process, rather than separate allocations for each channel, and each remote worker. This seems possible, though again awkward. The shuffle that happens means to colocate data for each worker, and we don't know how large each of these will be before sending them. We could commit to certain spacing (e.g. 1024 records for each worker) and start writing each worker at an offset of a common buffer for each process, with some inefficiency if there is skew of any sort. In any case, operator code currently produces output records one at a time, and we need to do something with each of these records.

One meta-point, which I don't really know that I understand, is that we may be able to absolve ourselves of copies that do not leave low level caches. Each copy that remains within the L1 cache could be viewed as pretty cheap, relative to all the other operations going on. So, deserialization code for small buffers might be relatively cheap, as compared to copying each frame into a new allocation (2.). I'm slightly making this up, but understanding "zero copy" and which costs it is really avoiding seems like a good thing to do along the way.

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions