Skip to content

Commit

Permalink
first pass at a tar importer
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping committed Sep 10, 2015
1 parent 2107930 commit 0de8947
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 5 deletions.
1 change: 1 addition & 0 deletions core/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var rootSubcommands = map[string]*cmds.Command{
"resolve": ResolveCmd,
"stats": StatsCmd,
"swarm": SwarmCmd,
"tar": TarCmd,
"tour": tourCmd,
"file": unixfs.UnixFSCmd,
"update": UpdateCmd,
Expand Down
113 changes: 113 additions & 0 deletions core/commands/tar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package commands

import (
"io"
"strings"

cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
path "github.com/ipfs/go-ipfs/path"
tar "github.com/ipfs/go-ipfs/tar"
)

var TarCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "utility functions for tar files in ipfs",
},

Subcommands: map[string]*cmds.Command{
"add": tarAddCmd,
"cat": tarCatCmd,
},
}

var tarAddCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "import a tar file into ipfs",
ShortDescription: `
'ipfs tar add' will parse a tar file and create a merkledag structure to represent it.
`,
},

Arguments: []cmds.Argument{
cmds.FileArg("file", true, false, "tar file to add").EnableStdin(),
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

fi, err := req.Files().NextFile()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

node, err := tar.ImportTar(fi, nd.DAG)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

k, err := node.Key()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

fi.FileName()
res.SetOutput(&AddedObject{
Name: fi.FileName(),
Hash: k.B58String(),
})
},
Type: AddedObject{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
o := res.Output().(*AddedObject)
return strings.NewReader(o.Hash), nil
},
},
}

var tarCatCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "export a tar file from ipfs",
ShortDescription: `
'ipfs tar cat' will export a tar file from a previously imported one in ipfs
`,
},

Arguments: []cmds.Argument{
cmds.StringArg("path", true, false, "ipfs path of archive to export").EnableStdin(),
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

p, err := path.ParsePath(req.Arguments()[0])
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

root, err := core.Resolve(req.Context(), nd, p)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

r, err := tar.ExportTar(req.Context(), root, nd.DAG)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

res.SetOutput(r)
},
}
2 changes: 1 addition & 1 deletion importer/chunk/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func FromString(r io.Reader, chunker string) (Splitter, error) {
if err != nil {
return nil, err
}
return NewSizeSplitter(r, int64(size)), nil
return NewSizeSplitter(r, uint64(size)), nil

case strings.HasPrefix(chunker, "rabin"):
return parseRabinString(r, chunker)
Expand Down
8 changes: 4 additions & 4 deletions importer/chunk/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var log = util.Logger("chunk")

var DefaultBlockSize int64 = 1024 * 256
var DefaultBlockSize uint64 = 1024 * 256

type Splitter interface {
NextBytes() ([]byte, error)
Expand All @@ -21,7 +21,7 @@ func DefaultSplitter(r io.Reader) Splitter {
return NewSizeSplitter(r, DefaultBlockSize)
}

func SizeSplitterGen(size int64) SplitterGen {
func SizeSplitterGen(size uint64) SplitterGen {
return func(r io.Reader) Splitter {
return NewSizeSplitter(r, size)
}
Expand Down Expand Up @@ -50,11 +50,11 @@ func Chan(s Splitter) (<-chan []byte, <-chan error) {

type sizeSplitterv2 struct {
r io.Reader
size int64
size uint64
err error
}

func NewSizeSplitter(r io.Reader, size int64) Splitter {
func NewSizeSplitter(r io.Reader, size uint64) Splitter {
return &sizeSplitterv2{
r: r,
size: size,
Expand Down
185 changes: 185 additions & 0 deletions tar/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package tarfmt

import (
"archive/tar"
"bytes"
"errors"
"io"
"io/ioutil"

importer "github.com/ipfs/go-ipfs/importer"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
uio "github.com/ipfs/go-ipfs/unixfs/io"

context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)

var blockSize = 512
var zeroBlock = make([]byte, blockSize)

func marshalHeader(h *tar.Header) ([]byte, error) {
buf := new(bytes.Buffer)
w := tar.NewWriter(buf)
err := w.WriteHeader(h)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func ImportTar(r io.Reader, ds dag.DAGService) (*dag.Node, error) {
rall, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}

r = bytes.NewReader(rall)

tr := tar.NewReader(r)

root := new(dag.Node)
root.Data = []byte("ipfs/tar")

for {
h, err := tr.Next()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}

header := new(dag.Node)

headerBytes, err := marshalHeader(h)
if err != nil {
return nil, err
}

header.Data = headerBytes

if h.Size > 0 {
spl := chunk.NewRabin(tr, chunk.DefaultBlockSize)
nd, err := importer.BuildDagFromReader(ds, spl, nil)
if err != nil {
return nil, err
}

err = header.AddNodeLinkClean("data", nd)
if err != nil {
return nil, err
}
}

_, err = ds.Add(header)
if err != nil {
return nil, err
}

err = root.AddNodeLinkClean(h.Name, header)
if err != nil {
return nil, err
}
}

_, err = ds.Add(root)
if err != nil {
return nil, err
}

return root, nil
}

type tarReader struct {
links []*dag.Link
ds dag.DAGService

hdrBuf *bytes.Reader
fileRead *countReader
pad int

ctx context.Context
}

func (tr *tarReader) Read(b []byte) (int, error) {
if tr.hdrBuf != nil {
n, err := tr.hdrBuf.Read(b)
if err == io.EOF {
tr.hdrBuf = nil
return n, nil
}
return n, err
}
if tr.fileRead != nil {
n, err := tr.fileRead.Read(b)
if err == io.EOF {
nr := tr.fileRead.n
tr.pad = (blockSize - (nr % blockSize)) % blockSize
tr.fileRead.Close()
tr.fileRead = nil
return n, nil
}
return n, err
}
if tr.pad > 0 {
n := copy(b, zeroBlock[:tr.pad])
tr.pad -= n
return n, nil
}

if len(tr.links) == 0 {
return 0, io.EOF
}

next := tr.links[0]
tr.links = tr.links[1:]

headerNd, err := next.GetNode(tr.ctx, tr.ds)
if err != nil {
return 0, err
}

tr.hdrBuf = bytes.NewReader(headerNd.Data)
if len(headerNd.Links) > 0 {
data, err := headerNd.Links[0].GetNode(tr.ctx, tr.ds)
if err != nil {
return 0, err
}

dr, err := uio.NewDagReader(tr.ctx, data, tr.ds)
if err != nil {
return 0, err
}

tr.fileRead = &countReader{r: dr}
}

return tr.Read(b)
}

func ExportTar(ctx context.Context, root *dag.Node, ds dag.DAGService) (io.Reader, error) {
if string(root.Data) != "ipfs/tar" {
return nil, errors.New("not an ipfs tarchive")
}
return &tarReader{
links: root.Links,
ds: ds,
ctx: ctx,
}, nil
}

type countReader struct {
r io.ReadCloser
n int
}

func (r *countReader) Read(b []byte) (int, error) {
n, err := r.r.Read(b)
r.n += n
return n, err
}

func (r *countReader) Close() error {
return r.r.Close()
}

0 comments on commit 0de8947

Please sign in to comment.