[tor-commits] [pluggable-transports/snowflake] 01/07: Add distinct IP counter

gitolite role git at cupani.torproject.org
Thu Jun 16 17:23:14 UTC 2022


This is an automated email from the git hooks/post-receive script.

shelikhoo pushed a commit to branch main
in repository pluggable-transports/snowflake.

commit 211254fa9849a1ae705a482ba984d0d415730560
Author: Shelikhoo <xiaokangwang at outlook.com>
AuthorDate: Fri May 27 16:20:47 2022 +0100

    Add distinct IP counter
---
 common/ipsetsink/sink.go                    | 52 ++++++++++++++++++++++
 common/ipsetsink/sink_test.go               | 47 ++++++++++++++++++++
 common/ipsetsink/sinkcluster/common.go      |  9 ++++
 common/ipsetsink/sinkcluster/reader.go      | 60 +++++++++++++++++++++++++
 common/ipsetsink/sinkcluster/writer.go      | 68 +++++++++++++++++++++++++++++
 common/ipsetsink/sinkcluster/writer_test.go | 33 ++++++++++++++
 distinctcounter/counter.go                  | 37 ++++++++++++++++
 go.mod                                      |  1 +
 8 files changed, 307 insertions(+)

diff --git a/common/ipsetsink/sink.go b/common/ipsetsink/sink.go
new file mode 100644
index 0000000..b62f786
--- /dev/null
+++ b/common/ipsetsink/sink.go
@@ -0,0 +1,52 @@
+package ipsetsink
+
+import (
+	"crypto/hmac"
+	"hash"
+	"hash/crc64"
+
+	"github.com/clarkduvall/hyperloglog"
+	"golang.org/x/crypto/sha3"
+)
+
+func NewIPSetSink(maskingKey string) *IPSetSink {
+	countDistinct, _ := hyperloglog.NewPlus(18)
+	return &IPSetSink{
+		ipMaskingKey:  maskingKey,
+		countDistinct: countDistinct,
+	}
+}
+
+type IPSetSink struct {
+	ipMaskingKey  string
+	countDistinct *hyperloglog.HyperLogLogPlus
+}
+
+func (s *IPSetSink) maskIPAddress(ipAddress string) []byte {
+	hmacIPMasker := hmac.New(func() hash.Hash {
+		return sha3.New256()
+	}, []byte(s.ipMaskingKey))
+	hmacIPMasker.Write([]byte(ipAddress))
+	return hmacIPMasker.Sum(nil)
+}
+
+func (s *IPSetSink) AddIPToSet(ipAddress string) {
+	s.countDistinct.Add(crc64FromBytes{hashValue(s.maskIPAddress(ipAddress))})
+}
+
+func (s *IPSetSink) Dump() ([]byte, error) {
+	return s.countDistinct.GobEncode()
+}
+
+func (s *IPSetSink) Reset() {
+	s.countDistinct.Clear()
+}
+
+type hashValue []byte
+type crc64FromBytes struct {
+	hashValue
+}
+
+func (c crc64FromBytes) Sum64() uint64 {
+	return crc64.Checksum(c.hashValue, crc64.MakeTable(crc64.ECMA))
+}
diff --git a/common/ipsetsink/sink_test.go b/common/ipsetsink/sink_test.go
new file mode 100644
index 0000000..00ae965
--- /dev/null
+++ b/common/ipsetsink/sink_test.go
@@ -0,0 +1,47 @@
+package ipsetsink
+
+import (
+	"fmt"
+	"github.com/clarkduvall/hyperloglog"
+	"testing"
+)
+import . "github.com/smartystreets/goconvey/convey"
+
+func TestSinkInit(t *testing.T) {
+	Convey("Context", t, func() {
+		sink := NewIPSetSink("demo")
+		sink.AddIPToSet("test1")
+		sink.AddIPToSet("test2")
+		data, err := sink.Dump()
+		So(err, ShouldBeNil)
+		structure, err := hyperloglog.NewPlus(18)
+		So(err, ShouldBeNil)
+		err = structure.GobDecode(data)
+		So(err, ShouldBeNil)
+		count := structure.Count()
+		So(count, ShouldBeBetweenOrEqual, 1, 3)
+	})
+}
+
+func TestSinkCounting(t *testing.T) {
+	Convey("Context", t, func() {
+		for itemCount := 300; itemCount <= 10000; itemCount += 200 {
+			sink := NewIPSetSink("demo")
+			for i := 0; i <= itemCount; i++ {
+				sink.AddIPToSet(fmt.Sprintf("demo%v", i))
+			}
+			for i := 0; i <= itemCount; i++ {
+				sink.AddIPToSet(fmt.Sprintf("demo%v", i))
+			}
+			data, err := sink.Dump()
+			So(err, ShouldBeNil)
+			structure, err := hyperloglog.NewPlus(18)
+			So(err, ShouldBeNil)
+			err = structure.GobDecode(data)
+			So(err, ShouldBeNil)
+			count := structure.Count()
+			So((float64(count)/float64(itemCount))-1.0, ShouldAlmostEqual, 0, 0.01)
+		}
+
+	})
+}
diff --git a/common/ipsetsink/sinkcluster/common.go b/common/ipsetsink/sinkcluster/common.go
new file mode 100644
index 0000000..501c753
--- /dev/null
+++ b/common/ipsetsink/sinkcluster/common.go
@@ -0,0 +1,9 @@
+package sinkcluster
+
+import "time"
+
+type SinkEntry struct {
+	RecordingStart time.Time `json:"recordingStart"`
+	RecordingEnd   time.Time `json:"recordingEnd"`
+	Recorded       []byte    `json:"recorded"`
+}
diff --git a/common/ipsetsink/sinkcluster/reader.go b/common/ipsetsink/sinkcluster/reader.go
new file mode 100644
index 0000000..3f7a08f
--- /dev/null
+++ b/common/ipsetsink/sinkcluster/reader.go
@@ -0,0 +1,60 @@
+package sinkcluster
+
+import (
+	"bufio"
+	"encoding/json"
+	"github.com/clarkduvall/hyperloglog"
+	"io"
+	"time"
+)
+
+func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter {
+	return &ClusterCounter{from: from, to: to}
+}
+
+type ClusterCounter struct {
+	from time.Time
+	to   time.Time
+}
+
+type ClusterCountResult struct {
+	Sum           uint64
+	ChunkIncluded int64
+}
+
+func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) {
+	result := ClusterCountResult{}
+	counter, err := hyperloglog.NewPlus(18)
+	if err != nil {
+		return nil, err
+	}
+	inputScanner := bufio.NewScanner(reader)
+	for inputScanner.Scan() {
+		inputLine := inputScanner.Bytes()
+		sinkInfo := SinkEntry{}
+		if err := json.Unmarshal(inputLine, &sinkInfo); err != nil {
+			return nil, err
+		}
+
+		if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) ||
+			sinkInfo.RecordingEnd.After(c.to) {
+			continue
+		}
+
+		restoredCounter, err := hyperloglog.NewPlus(18)
+		if err != nil {
+			return nil, err
+		}
+		err = restoredCounter.GobDecode(sinkInfo.Recorded)
+		if err != nil {
+			return nil, err
+		}
+		result.ChunkIncluded++
+		err = counter.Merge(restoredCounter)
+		if err != nil {
+			return nil, err
+		}
+	}
+	result.Sum = counter.Count()
+	return &result, nil
+}
diff --git a/common/ipsetsink/sinkcluster/writer.go b/common/ipsetsink/sinkcluster/writer.go
new file mode 100644
index 0000000..1c409e8
--- /dev/null
+++ b/common/ipsetsink/sinkcluster/writer.go
@@ -0,0 +1,68 @@
+package sinkcluster
+
+import (
+	"bytes"
+	"encoding/json"
+	"io"
+	"log"
+	"time"
+
+	"git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink"
+)
+
+func NewClusterWriter(writer WriteSyncer, writeInterval time.Duration, sink *ipsetsink.IPSetSink) *ClusterWriter {
+	c := &ClusterWriter{
+		writer:        writer,
+		lastWriteTime: time.Now(),
+		writeInterval: writeInterval,
+		current:       sink,
+	}
+	return c
+}
+
+type ClusterWriter struct {
+	writer        WriteSyncer
+	lastWriteTime time.Time
+	writeInterval time.Duration
+	current       *ipsetsink.IPSetSink
+}
+
+type WriteSyncer interface {
+	Sync() error
+	io.Writer
+}
+
+func (c *ClusterWriter) WriteIPSetToDisk() {
+	currentTime := time.Now()
+	data, err := c.current.Dump()
+	if err != nil {
+		log.Println("unable able to write ipset to file:", err)
+		return
+	}
+	entry := &SinkEntry{
+		RecordingStart: c.lastWriteTime,
+		RecordingEnd:   currentTime,
+		Recorded:       data,
+	}
+	jsonData, err := json.Marshal(entry)
+	if err != nil {
+		log.Println("unable able to write ipset to file:", err)
+		return
+	}
+	jsonData = append(jsonData, byte('\n'))
+	_, err = io.Copy(c.writer, bytes.NewReader(jsonData))
+	if err != nil {
+		log.Println("unable able to write ipset to file:", err)
+		return
+	}
+	c.writer.Sync()
+	c.lastWriteTime = currentTime
+	c.current.Reset()
+}
+
+func (c *ClusterWriter) AddIPToSet(ipAddress string) {
+	if c.lastWriteTime.Add(c.writeInterval).Before(time.Now()) {
+		c.WriteIPSetToDisk()
+	}
+	c.current.AddIPToSet(ipAddress)
+}
diff --git a/common/ipsetsink/sinkcluster/writer_test.go b/common/ipsetsink/sinkcluster/writer_test.go
new file mode 100644
index 0000000..5319cb2
--- /dev/null
+++ b/common/ipsetsink/sinkcluster/writer_test.go
@@ -0,0 +1,33 @@
+package sinkcluster
+
+import (
+	"bytes"
+	"io"
+	"testing"
+	"time"
+
+	"git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+type writerStub struct {
+	io.Writer
+}
+
+func (w writerStub) Sync() error {
+	return nil
+}
+
+func TestSinkWriter(t *testing.T) {
+
+	Convey("Context", t, func() {
+		buffer := bytes.NewBuffer(nil)
+		writerStubInst := &writerStub{buffer}
+		sink := ipsetsink.NewIPSetSink("demo")
+		clusterWriter := NewClusterWriter(writerStubInst, time.Minute, sink)
+		clusterWriter.AddIPToSet("1")
+		clusterWriter.WriteIPSetToDisk()
+		So(buffer.Bytes(), ShouldNotBeNil)
+	})
+}
diff --git a/distinctcounter/counter.go b/distinctcounter/counter.go
new file mode 100644
index 0000000..c128465
--- /dev/null
+++ b/distinctcounter/counter.go
@@ -0,0 +1,37 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"time"
+
+	"git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink/sinkcluster"
+)
+
+func main() {
+	inputFile := flag.String("in", "", "")
+	start := flag.String("start", "", "")
+	end := flag.String("end", "", "")
+	flag.Parse()
+	startTime, err := time.Parse(time.UnixDate, *start)
+	if err != nil {
+		log.Fatal("unable to parse start time:", err)
+	}
+	endTime, err := time.Parse(time.UnixDate, *end)
+	if err != nil {
+		log.Fatal("unable to parse end time:", err)
+	}
+	fd, err := os.Open(*inputFile)
+	if err != nil {
+		log.Fatal("unable to open input file:", err)
+	}
+	counter := sinkcluster.NewClusterCounter(startTime, endTime)
+	result, err := counter.Count(fd)
+	if err != nil {
+		log.Fatal("unable to count:", err)
+	}
+	fmt.Printf("sum = %v\n", result.Sum)
+	fmt.Printf("chunkIncluded = %v\n", result.ChunkIncluded)
+}
diff --git a/go.mod b/go.mod
index 842648c..c782967 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.13
 
 require (
 	git.torproject.org/pluggable-transports/goptlib.git v1.1.0
+	github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 // indirect
 	github.com/gorilla/websocket v1.4.1
 	github.com/pion/ice/v2 v2.2.6
 	github.com/pion/sdp/v3 v3.0.5

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the tor-commits mailing list