From 18131f88bfd48ca215284452e9d01f537a796233 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Thu, 11 Sep 2014 20:43:12 +0200 Subject: [PATCH] Add code for content defined chunking (CDC) --- chunker/chunker.go | 255 ++++++++++++++++++++++++++++++++++++++++ chunker/chunker_test.go | 133 +++++++++++++++++++++ chunker/doc.go | 26 ++++ cmd/splits/main.go | 59 ++++++++++ 4 files changed, 473 insertions(+) create mode 100644 chunker/chunker.go create mode 100644 chunker/chunker_test.go create mode 100644 chunker/doc.go create mode 100644 cmd/splits/main.go diff --git a/chunker/chunker.go b/chunker/chunker.go new file mode 100644 index 000000000..6e051d8c4 --- /dev/null +++ b/chunker/chunker.go @@ -0,0 +1,255 @@ +package chunker + +import ( + "io" + "sync" +) + +const ( + KiB = 1024 + MiB = 1024 * KiB + + // randomly generated irreducible polynomial of degree 53 in Z_2[X] + Polynomial = 0x3DA3358B4DC173 + + // use a sliding window of 64 byte. + WindowSize = 64 + + // aim to create chunks of 20 bits or about 1MiB on average. + AverageBits = 20 + + // Chunks should be in the range of 512KiB to 8MiB. + MinSize = 512 * KiB + MaxSize = 8 * MiB + + splitmask = (1 << AverageBits) - 1 +) + +var ( + pol_shift = deg(Polynomial) - 8 + once sync.Once + mod_table [256]uint64 + out_table [256]uint64 +) + +// A chunk is one content-dependent chunk of bytes whose end was cut when the +// Rabin Fingerprint had the value stored in Cut. +type Chunk struct { + Start int + Length int + Cut uint64 + Data []byte +} + +// A chunker takes a stream of bytes and emits average size chunks. +type Chunker interface { + Next() (*Chunk, error) +} + +// A chunker internally holds everything needed to split content. +type chunker struct { + rd io.Reader + + window []byte + wpos int + + buf []byte + bpos int + bmax int + + data []byte + start int + count int + pos int + + digest uint64 +} + +// New returns a new Chunker that reads from data from rd. +func New(rd io.Reader) Chunker { + c := &chunker{ + rd: rd, + + window: make([]byte, WindowSize), + + buf: make([]byte, MaxSize), + + data: make([]byte, 0, MaxSize), + } + + once.Do(c.fill_tables) + c.reset() + + return c +} + +func (c *chunker) reset() { + for i := 0; i < WindowSize; i++ { + c.window[i] = 0 + } + c.digest = 0 + c.wpos = 0 + c.pos = 0 + c.count = 0 + c.slide(1) + c.data = make([]byte, 0, MaxSize) +} + +// Calculate out_table and mod_table for optimization. Must be called only once. +func (c *chunker) fill_tables() { + // calculate table for sliding out bytes. The byte to slide out is used as + // the index for the table, the value contains the following: + // out_table[b] = Hash(b || 0 || ... || 0) + // \ windowsize-1 zero bytes / + // To slide out byte b_0 for window size w with known hash + // H := H(b_0 || ... || b_w), it is sufficient to add out_table[b_0]: + // H(b_0 || ... || b_w) + H(b_0 || 0 || ... || 0) + // = H(b_0 + b_0 || b_1 + 0 || ... || b_w + 0) + // = H( 0 || b_1 || ... || b_w) + // + // Afterwards a new byte can be shifted in. + for b := 0; b < 256; b++ { + var hash uint64 + + hash = append_byte(hash, byte(b), Polynomial) + for i := 0; i < WindowSize-1; i++ { + hash = append_byte(hash, 0, Polynomial) + } + out_table[b] = hash + } + + // calculate table for reduction mod Polynomial + k := deg(Polynomial) + for b := 0; b < 256; b++ { + // mod_table[b] = A | B, where A = (b(x) * x^k mod pol) and B = b(x) * x^k + // + // The 8 bits above deg(Polynomial) determine what happens next and so + // these bits are used as a lookup to this table. The value is split in + // two parts: Part A contains the result of the modulus operation, part + // B is used to cancel out the 8 top bits so that one XOR operation is + // enough to reduce modulo Polynomial + mod_table[b] = mod(uint64(b)<= c.bmax { + n, err := io.ReadFull(c.rd, c.buf) + + if err == io.ErrUnexpectedEOF { + err = nil + } + + if err != nil { + return &Chunk{ + Start: c.start, + Length: c.count, + Cut: c.digest, + Data: c.data, + }, err + } + + c.bpos = 0 + c.bmax = n + } + + for i, b := range c.buf[c.bpos:c.bmax] { + // inline c.slide(b) and append(b) to increase performance + out := c.window[c.wpos] + c.window[c.wpos] = b + c.digest ^= out_table[out] + c.wpos = (c.wpos + 1) % WindowSize + + // c.append(b) + index := c.digest >> uint(pol_shift) + c.digest <<= 8 + c.digest |= uint64(b) + + c.digest ^= mod_table[index] + + if (c.count+i+1 >= MinSize && (c.digest&splitmask) == 0) || c.count+i+1 >= MaxSize { + c.data = append(c.data, c.buf[c.bpos:c.bpos+i]...) + c.count += i + 1 + c.pos += i + 1 + c.bpos += i + 1 + + chunk := &Chunk{ + Start: c.start, + Length: c.count, + Cut: c.digest, + Data: c.data, + } + + // keep position + pos := c.pos + c.reset() + c.pos = pos + c.start = pos + + return chunk, nil + } + } + + steps := c.bmax - c.bpos + if steps > 0 { + c.data = append(c.data, c.buf[c.bpos:c.bpos+steps]...) + } + c.count += steps + c.pos += steps + c.bpos = c.bmax + } + + return nil, nil +} + +func (c *chunker) append(b byte) { + index := c.digest >> uint(pol_shift) + c.digest <<= 8 + c.digest |= uint64(b) + + c.digest ^= mod_table[index] +} + +func (c *chunker) slide(b byte) { + out := c.window[c.wpos] + c.window[c.wpos] = b + c.digest ^= out_table[out] + c.wpos = (c.wpos + 1) % WindowSize + + c.append(b) +} + +func append_byte(hash uint64, b byte, pol uint64) uint64 { + hash <<= 8 + hash |= uint64(b) + + return mod(hash, pol) +} + +// Mod calculates the remainder of x divided by p. +func mod(x, p uint64) uint64 { + for deg(x) >= deg(p) { + shift := uint(deg(x) - deg(p)) + + x = x ^ (p << shift) + } + + return x +} + +// Deg returns the degree of the polynomial p, this is equivalent to the number +// of the highest bit set in p. +func deg(p uint64) int { + var mask uint64 = 0x8000000000000000 + + for i := 0; i < 64; i++ { + if mask&p > 0 { + return 63 - i + } + + mask >>= 1 + } + + return -1 +} diff --git a/chunker/chunker_test.go b/chunker/chunker_test.go new file mode 100644 index 000000000..b1e39711c --- /dev/null +++ b/chunker/chunker_test.go @@ -0,0 +1,133 @@ +package chunker_test + +import ( + "bytes" + "io" + "math/rand" + "testing" + + "github.com/fd0/khepri/chunker" +) + +type chunk struct { + Length int + CutFP uint64 +} + +// created for 32MB of random data out of math/rand's Uint32() seeded by +// constant 23 +// +// chunking configuration: +// window size 64, avg chunksize 1<<20, min chunksize 1<<19, max chunksize 1<<23 +// polynom 0x3DA3358B4DC173 +var chunks1 = []chunk{ + chunk{2163460, 0x000b98d4cdf00000}, + chunk{643703, 0x000d4e8364d00000}, + chunk{1528956, 0x0015a25c2ef00000}, + chunk{1955808, 0x00102a8242e00000}, + chunk{2222372, 0x00045da878000000}, + chunk{2538687, 0x00198a8179900000}, + chunk{609606, 0x001d4e8d17100000}, + chunk{1205738, 0x000a7204dd600000}, + chunk{959742, 0x00183e71e1400000}, + chunk{4036109, 0x001fec043c700000}, + chunk{1525894, 0x000b1574b1500000}, + chunk{1352720, 0x00018965f2e00000}, + chunk{811884, 0x00155628aa100000}, + chunk{1282314, 0x001909a0a1400000}, + chunk{1318021, 0x001cceb980000000}, + chunk{948640, 0x0011f7a470a00000}, + chunk{645464, 0x00030ce2d9400000}, + chunk{533758, 0x0004435c53c00000}, + chunk{1128303, 0x0000c48517800000}, + chunk{800374, 0x000968473f900000}, + chunk{2453512, 0x001e197c92600000}, + chunk{2651975, 0x000ae6c868000000}, + chunk{237392, 0x00184c5825e18636}, +} + +func test_with_data(t *testing.T, chunker chunker.Chunker, chunks []chunk) { + for i, chunk := range chunks { + c, err := chunker.Next() + + if i < len(chunks)-1 { + if err != nil { + t.Fatalf("Error returned with chunk %d: %v", i, err) + } + } else { + if err != io.EOF { + t.Fatalf("EOF not returned with chunk %d", i) + } + } + + if c == nil { + t.Fatalf("Nil chunk returned") + } + + if c != nil { + if c.Length != chunk.Length { + t.Fatalf("Length for chunk %d does not match: expected %d, got %d", + i, chunk.Length, c.Length) + } + + if c.Cut != chunk.CutFP { + t.Fatalf("Cut fingerprint for chunk %d does not match: expected %016x, got %016x", + i, chunk.CutFP, c.Cut) + } + } + } +} + +func get_random(seed, count int) []byte { + buf := make([]byte, count) + + rnd := rand.New(rand.NewSource(23)) + for i := 0; i < count; i += 4 { + r := rnd.Uint32() + buf[i] = byte(r) + buf[i+1] = byte(r >> 8) + buf[i+2] = byte(r >> 16) + buf[i+3] = byte(r >> 24) + } + + return buf +} + +func TestChunker(t *testing.T) { + // setup data source + buf := get_random(23, 32*1024*1024) + + ch := chunker.New(bytes.NewReader(buf)) + + test_with_data(t, ch, chunks1) +} + +func BenchmarkChunker(b *testing.B) { + size := 10 * 1024 * 1024 + buf := get_random(23, size) + + b.ResetTimer() + b.SetBytes(int64(size)) + var chunks int + for i := 0; i < b.N; i++ { + chunks = 0 + + ch := chunker.New(bytes.NewReader(buf)) + + for { + _, err := ch.Next() + + chunks++ + + if err != nil && err != io.EOF { + b.Fatalf("Unexpected error occurred: %v", err) + } + + if err == io.EOF { + break + } + } + } + + b.Logf("%d chunks, average chunk size: %d bytes", chunks, size/chunks) +} diff --git a/chunker/doc.go b/chunker/doc.go new file mode 100644 index 000000000..428ee29f2 --- /dev/null +++ b/chunker/doc.go @@ -0,0 +1,26 @@ +// Copyright 2014 Alexander Neumann. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* +Package chunker implements Content Defined Chunking (CDC) based on a rolling +Rabin Checksum. + +Background Literature + +An introduction to Rabin Fingerprints/Checksums can be found in the following articles: + +Michael O. Rabin (1981): "Fingerprinting by Random Polynomials" +http://www.xmailserver.org/rabin.pdf + +Ross N. Williams (1993): "A Painless Guide to CRC Error Detection Algorithms" +http://www.zlib.net/crc_v3.txt + +Andrei Z. Broder (1993): "Some Applications of Rabin's Fingerprinting Method" +http://www.xmailserver.org/rabin_apps.pdf + +Andrew Kadatch, Bob Jenkins (2007): "Everything we know about CRC but afraid to forget" +http://crcutil.googlecode.com/files/crc-doc.1.0.pdf + +*/ +package chunker diff --git a/cmd/splits/main.go b/cmd/splits/main.go new file mode 100644 index 000000000..06c46d0e8 --- /dev/null +++ b/cmd/splits/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "crypto/sha256" + "fmt" + "io" + "os" + + "github.com/fd0/khepri/chunker" +) + +func main() { + count, bytes := 0, 0 + min := 0 + max := 0 + + var ( + err error + file *os.File = os.Stdin + ) + + if len(os.Args) > 1 { + file, err = os.Open(os.Args[1]) + if err != nil { + panic(err) + } + } + + ch := chunker.New(file) + + for { + chunk, err := ch.Next() + + if chunk != nil { + fmt.Printf("%d %016x %02x\n", chunk.Length, chunk.Cut, sha256.Sum256(chunk.Data)) + count++ + bytes += chunk.Length + + if chunk.Length == chunker.MaxSize { + max++ + } else if chunk.Length == chunker.MinSize { + min++ + } + + } + + if err == io.EOF { + break + } + } + + var avg int + if count > 0 { + avg = bytes / count + } + + fmt.Fprintf(os.Stderr, "%d chunks, average size %d (%d min size, %d max size chunks)\n", + count, avg, min, max) +}