This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit 211254fa9849a1ae705a482ba984d0d415730560 Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Fri May 27 16:20:47 2022 +0100
Add distinct IP counter --- common/ipsetsink/sink.go | 52 ++++++++++++++++++++++ common/ipsetsink/sink_test.go | 47 ++++++++++++++++++++ common/ipsetsink/sinkcluster/common.go | 9 ++++ common/ipsetsink/sinkcluster/reader.go | 60 +++++++++++++++++++++++++ common/ipsetsink/sinkcluster/writer.go | 68 +++++++++++++++++++++++++++++ common/ipsetsink/sinkcluster/writer_test.go | 33 ++++++++++++++ distinctcounter/counter.go | 37 ++++++++++++++++ go.mod | 1 + 8 files changed, 307 insertions(+)
diff --git a/common/ipsetsink/sink.go b/common/ipsetsink/sink.go new file mode 100644 index 0000000..b62f786 --- /dev/null +++ b/common/ipsetsink/sink.go @@ -0,0 +1,52 @@ +package ipsetsink + +import ( + "crypto/hmac" + "hash" + "hash/crc64" + + "github.com/clarkduvall/hyperloglog" + "golang.org/x/crypto/sha3" +) + +func NewIPSetSink(maskingKey string) *IPSetSink { + countDistinct, _ := hyperloglog.NewPlus(18) + return &IPSetSink{ + ipMaskingKey: maskingKey, + countDistinct: countDistinct, + } +} + +type IPSetSink struct { + ipMaskingKey string + countDistinct *hyperloglog.HyperLogLogPlus +} + +func (s *IPSetSink) maskIPAddress(ipAddress string) []byte { + hmacIPMasker := hmac.New(func() hash.Hash { + return sha3.New256() + }, []byte(s.ipMaskingKey)) + hmacIPMasker.Write([]byte(ipAddress)) + return hmacIPMasker.Sum(nil) +} + +func (s *IPSetSink) AddIPToSet(ipAddress string) { + s.countDistinct.Add(crc64FromBytes{hashValue(s.maskIPAddress(ipAddress))}) +} + +func (s *IPSetSink) Dump() ([]byte, error) { + return s.countDistinct.GobEncode() +} + +func (s *IPSetSink) Reset() { + s.countDistinct.Clear() +} + +type hashValue []byte +type crc64FromBytes struct { + hashValue +} + +func (c crc64FromBytes) Sum64() uint64 { + return crc64.Checksum(c.hashValue, crc64.MakeTable(crc64.ECMA)) +} diff --git a/common/ipsetsink/sink_test.go b/common/ipsetsink/sink_test.go new file mode 100644 index 0000000..00ae965 --- /dev/null +++ b/common/ipsetsink/sink_test.go @@ -0,0 +1,47 @@ +package ipsetsink + +import ( + "fmt" + "github.com/clarkduvall/hyperloglog" + "testing" +) +import . "github.com/smartystreets/goconvey/convey" + +func TestSinkInit(t *testing.T) { + Convey("Context", t, func() { + sink := NewIPSetSink("demo") + sink.AddIPToSet("test1") + sink.AddIPToSet("test2") + data, err := sink.Dump() + So(err, ShouldBeNil) + structure, err := hyperloglog.NewPlus(18) + So(err, ShouldBeNil) + err = structure.GobDecode(data) + So(err, ShouldBeNil) + count := structure.Count() + So(count, ShouldBeBetweenOrEqual, 1, 3) + }) +} + +func TestSinkCounting(t *testing.T) { + Convey("Context", t, func() { + for itemCount := 300; itemCount <= 10000; itemCount += 200 { + sink := NewIPSetSink("demo") + for i := 0; i <= itemCount; i++ { + sink.AddIPToSet(fmt.Sprintf("demo%v", i)) + } + for i := 0; i <= itemCount; i++ { + sink.AddIPToSet(fmt.Sprintf("demo%v", i)) + } + data, err := sink.Dump() + So(err, ShouldBeNil) + structure, err := hyperloglog.NewPlus(18) + So(err, ShouldBeNil) + err = structure.GobDecode(data) + So(err, ShouldBeNil) + count := structure.Count() + So((float64(count)/float64(itemCount))-1.0, ShouldAlmostEqual, 0, 0.01) + } + + }) +} diff --git a/common/ipsetsink/sinkcluster/common.go b/common/ipsetsink/sinkcluster/common.go new file mode 100644 index 0000000..501c753 --- /dev/null +++ b/common/ipsetsink/sinkcluster/common.go @@ -0,0 +1,9 @@ +package sinkcluster + +import "time" + +type SinkEntry struct { + RecordingStart time.Time `json:"recordingStart"` + RecordingEnd time.Time `json:"recordingEnd"` + Recorded []byte `json:"recorded"` +} diff --git a/common/ipsetsink/sinkcluster/reader.go b/common/ipsetsink/sinkcluster/reader.go new file mode 100644 index 0000000..3f7a08f --- /dev/null +++ b/common/ipsetsink/sinkcluster/reader.go @@ -0,0 +1,60 @@ +package sinkcluster + +import ( + "bufio" + "encoding/json" + "github.com/clarkduvall/hyperloglog" + "io" + "time" +) + +func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter { + return &ClusterCounter{from: from, to: to} +} + +type ClusterCounter struct { + from time.Time + to time.Time +} + +type ClusterCountResult struct { + Sum uint64 + ChunkIncluded int64 +} + +func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) { + result := ClusterCountResult{} + counter, err := hyperloglog.NewPlus(18) + if err != nil { + return nil, err + } + inputScanner := bufio.NewScanner(reader) + for inputScanner.Scan() { + inputLine := inputScanner.Bytes() + sinkInfo := SinkEntry{} + if err := json.Unmarshal(inputLine, &sinkInfo); err != nil { + return nil, err + } + + if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) || + sinkInfo.RecordingEnd.After(c.to) { + continue + } + + restoredCounter, err := hyperloglog.NewPlus(18) + if err != nil { + return nil, err + } + err = restoredCounter.GobDecode(sinkInfo.Recorded) + if err != nil { + return nil, err + } + result.ChunkIncluded++ + err = counter.Merge(restoredCounter) + if err != nil { + return nil, err + } + } + result.Sum = counter.Count() + return &result, nil +} diff --git a/common/ipsetsink/sinkcluster/writer.go b/common/ipsetsink/sinkcluster/writer.go new file mode 100644 index 0000000..1c409e8 --- /dev/null +++ b/common/ipsetsink/sinkcluster/writer.go @@ -0,0 +1,68 @@ +package sinkcluster + +import ( + "bytes" + "encoding/json" + "io" + "log" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink" +) + +func NewClusterWriter(writer WriteSyncer, writeInterval time.Duration, sink *ipsetsink.IPSetSink) *ClusterWriter { + c := &ClusterWriter{ + writer: writer, + lastWriteTime: time.Now(), + writeInterval: writeInterval, + current: sink, + } + return c +} + +type ClusterWriter struct { + writer WriteSyncer + lastWriteTime time.Time + writeInterval time.Duration + current *ipsetsink.IPSetSink +} + +type WriteSyncer interface { + Sync() error + io.Writer +} + +func (c *ClusterWriter) WriteIPSetToDisk() { + currentTime := time.Now() + data, err := c.current.Dump() + if err != nil { + log.Println("unable able to write ipset to file:", err) + return + } + entry := &SinkEntry{ + RecordingStart: c.lastWriteTime, + RecordingEnd: currentTime, + Recorded: data, + } + jsonData, err := json.Marshal(entry) + if err != nil { + log.Println("unable able to write ipset to file:", err) + return + } + jsonData = append(jsonData, byte('\n')) + _, err = io.Copy(c.writer, bytes.NewReader(jsonData)) + if err != nil { + log.Println("unable able to write ipset to file:", err) + return + } + c.writer.Sync() + c.lastWriteTime = currentTime + c.current.Reset() +} + +func (c *ClusterWriter) AddIPToSet(ipAddress string) { + if c.lastWriteTime.Add(c.writeInterval).Before(time.Now()) { + c.WriteIPSetToDisk() + } + c.current.AddIPToSet(ipAddress) +} diff --git a/common/ipsetsink/sinkcluster/writer_test.go b/common/ipsetsink/sinkcluster/writer_test.go new file mode 100644 index 0000000..5319cb2 --- /dev/null +++ b/common/ipsetsink/sinkcluster/writer_test.go @@ -0,0 +1,33 @@ +package sinkcluster + +import ( + "bytes" + "io" + "testing" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink" + + . "github.com/smartystreets/goconvey/convey" +) + +type writerStub struct { + io.Writer +} + +func (w writerStub) Sync() error { + return nil +} + +func TestSinkWriter(t *testing.T) { + + Convey("Context", t, func() { + buffer := bytes.NewBuffer(nil) + writerStubInst := &writerStub{buffer} + sink := ipsetsink.NewIPSetSink("demo") + clusterWriter := NewClusterWriter(writerStubInst, time.Minute, sink) + clusterWriter.AddIPToSet("1") + clusterWriter.WriteIPSetToDisk() + So(buffer.Bytes(), ShouldNotBeNil) + }) +} diff --git a/distinctcounter/counter.go b/distinctcounter/counter.go new file mode 100644 index 0000000..c128465 --- /dev/null +++ b/distinctcounter/counter.go @@ -0,0 +1,37 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink/sinkcluster" +) + +func main() { + inputFile := flag.String("in", "", "") + start := flag.String("start", "", "") + end := flag.String("end", "", "") + flag.Parse() + startTime, err := time.Parse(time.UnixDate, *start) + if err != nil { + log.Fatal("unable to parse start time:", err) + } + endTime, err := time.Parse(time.UnixDate, *end) + if err != nil { + log.Fatal("unable to parse end time:", err) + } + fd, err := os.Open(*inputFile) + if err != nil { + log.Fatal("unable to open input file:", err) + } + counter := sinkcluster.NewClusterCounter(startTime, endTime) + result, err := counter.Count(fd) + if err != nil { + log.Fatal("unable to count:", err) + } + fmt.Printf("sum = %v\n", result.Sum) + fmt.Printf("chunkIncluded = %v\n", result.ChunkIncluded) +} diff --git a/go.mod b/go.mod index 842648c..c782967 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13
require ( git.torproject.org/pluggable-transports/goptlib.git v1.1.0 + github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 // indirect github.com/gorilla/websocket v1.4.1 github.com/pion/ice/v2 v2.2.6 github.com/pion/sdp/v3 v3.0.5