This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a change to branch main in repository pluggable-transports/snowflake.
from 97dea53 Update Relay Pattern format to include dollar sign new 211254f Add distinct IP counter new fa7d1e2 Add distinct IP counter to metrics new 2541b13 Add distinct IP counter to broker new be40b62 Add go sum for hyperloglog new af11343 Update distinct counter interface new b18e6fc Add document for Distinct IP file new 35e9ab8 Use truncated hash instead crc64 for counted hash
The 7 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
Summary of changes: broker/broker.go | 17 ++++++++ broker/ipc.go | 1 + broker/metrics.go | 13 ++++++ common/ipsetsink/sink.go | 55 +++++++++++++++++++++++ common/ipsetsink/sink_test.go | 47 ++++++++++++++++++++ common/ipsetsink/sinkcluster/common.go | 24 ++++++++++ common/ipsetsink/sinkcluster/reader.go | 60 +++++++++++++++++++++++++ common/ipsetsink/sinkcluster/writer.go | 68 +++++++++++++++++++++++++++++ common/ipsetsink/sinkcluster/writer_test.go | 33 ++++++++++++++ distinctcounter/counter.go | 37 ++++++++++++++++ go.mod | 1 + go.sum | 2 + 12 files changed, 358 insertions(+) create mode 100644 common/ipsetsink/sink.go create mode 100644 common/ipsetsink/sink_test.go create mode 100644 common/ipsetsink/sinkcluster/common.go create mode 100644 common/ipsetsink/sinkcluster/reader.go create mode 100644 common/ipsetsink/sinkcluster/writer.go create mode 100644 common/ipsetsink/sinkcluster/writer_test.go create mode 100644 distinctcounter/counter.go
This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit 211254fa9849a1ae705a482ba984d0d415730560 Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Fri May 27 16:20:47 2022 +0100
Add distinct IP counter --- common/ipsetsink/sink.go | 52 ++++++++++++++++++++++ common/ipsetsink/sink_test.go | 47 ++++++++++++++++++++ common/ipsetsink/sinkcluster/common.go | 9 ++++ common/ipsetsink/sinkcluster/reader.go | 60 +++++++++++++++++++++++++ common/ipsetsink/sinkcluster/writer.go | 68 +++++++++++++++++++++++++++++ common/ipsetsink/sinkcluster/writer_test.go | 33 ++++++++++++++ distinctcounter/counter.go | 37 ++++++++++++++++ go.mod | 1 + 8 files changed, 307 insertions(+)
diff --git a/common/ipsetsink/sink.go b/common/ipsetsink/sink.go new file mode 100644 index 0000000..b62f786 --- /dev/null +++ b/common/ipsetsink/sink.go @@ -0,0 +1,52 @@ +package ipsetsink + +import ( + "crypto/hmac" + "hash" + "hash/crc64" + + "github.com/clarkduvall/hyperloglog" + "golang.org/x/crypto/sha3" +) + +func NewIPSetSink(maskingKey string) *IPSetSink { + countDistinct, _ := hyperloglog.NewPlus(18) + return &IPSetSink{ + ipMaskingKey: maskingKey, + countDistinct: countDistinct, + } +} + +type IPSetSink struct { + ipMaskingKey string + countDistinct *hyperloglog.HyperLogLogPlus +} + +func (s *IPSetSink) maskIPAddress(ipAddress string) []byte { + hmacIPMasker := hmac.New(func() hash.Hash { + return sha3.New256() + }, []byte(s.ipMaskingKey)) + hmacIPMasker.Write([]byte(ipAddress)) + return hmacIPMasker.Sum(nil) +} + +func (s *IPSetSink) AddIPToSet(ipAddress string) { + s.countDistinct.Add(crc64FromBytes{hashValue(s.maskIPAddress(ipAddress))}) +} + +func (s *IPSetSink) Dump() ([]byte, error) { + return s.countDistinct.GobEncode() +} + +func (s *IPSetSink) Reset() { + s.countDistinct.Clear() +} + +type hashValue []byte +type crc64FromBytes struct { + hashValue +} + +func (c crc64FromBytes) Sum64() uint64 { + return crc64.Checksum(c.hashValue, crc64.MakeTable(crc64.ECMA)) +} diff --git a/common/ipsetsink/sink_test.go b/common/ipsetsink/sink_test.go new file mode 100644 index 0000000..00ae965 --- /dev/null +++ b/common/ipsetsink/sink_test.go @@ -0,0 +1,47 @@ +package ipsetsink + +import ( + "fmt" + "github.com/clarkduvall/hyperloglog" + "testing" +) +import . "github.com/smartystreets/goconvey/convey" + +func TestSinkInit(t *testing.T) { + Convey("Context", t, func() { + sink := NewIPSetSink("demo") + sink.AddIPToSet("test1") + sink.AddIPToSet("test2") + data, err := sink.Dump() + So(err, ShouldBeNil) + structure, err := hyperloglog.NewPlus(18) + So(err, ShouldBeNil) + err = structure.GobDecode(data) + So(err, ShouldBeNil) + count := structure.Count() + So(count, ShouldBeBetweenOrEqual, 1, 3) + }) +} + +func TestSinkCounting(t *testing.T) { + Convey("Context", t, func() { + for itemCount := 300; itemCount <= 10000; itemCount += 200 { + sink := NewIPSetSink("demo") + for i := 0; i <= itemCount; i++ { + sink.AddIPToSet(fmt.Sprintf("demo%v", i)) + } + for i := 0; i <= itemCount; i++ { + sink.AddIPToSet(fmt.Sprintf("demo%v", i)) + } + data, err := sink.Dump() + So(err, ShouldBeNil) + structure, err := hyperloglog.NewPlus(18) + So(err, ShouldBeNil) + err = structure.GobDecode(data) + So(err, ShouldBeNil) + count := structure.Count() + So((float64(count)/float64(itemCount))-1.0, ShouldAlmostEqual, 0, 0.01) + } + + }) +} diff --git a/common/ipsetsink/sinkcluster/common.go b/common/ipsetsink/sinkcluster/common.go new file mode 100644 index 0000000..501c753 --- /dev/null +++ b/common/ipsetsink/sinkcluster/common.go @@ -0,0 +1,9 @@ +package sinkcluster + +import "time" + +type SinkEntry struct { + RecordingStart time.Time `json:"recordingStart"` + RecordingEnd time.Time `json:"recordingEnd"` + Recorded []byte `json:"recorded"` +} diff --git a/common/ipsetsink/sinkcluster/reader.go b/common/ipsetsink/sinkcluster/reader.go new file mode 100644 index 0000000..3f7a08f --- /dev/null +++ b/common/ipsetsink/sinkcluster/reader.go @@ -0,0 +1,60 @@ +package sinkcluster + +import ( + "bufio" + "encoding/json" + "github.com/clarkduvall/hyperloglog" + "io" + "time" +) + +func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter { + return &ClusterCounter{from: from, to: to} +} + +type ClusterCounter struct { + from time.Time + to time.Time +} + +type ClusterCountResult struct { + Sum uint64 + ChunkIncluded int64 +} + +func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) { + result := ClusterCountResult{} + counter, err := hyperloglog.NewPlus(18) + if err != nil { + return nil, err + } + inputScanner := bufio.NewScanner(reader) + for inputScanner.Scan() { + inputLine := inputScanner.Bytes() + sinkInfo := SinkEntry{} + if err := json.Unmarshal(inputLine, &sinkInfo); err != nil { + return nil, err + } + + if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) || + sinkInfo.RecordingEnd.After(c.to) { + continue + } + + restoredCounter, err := hyperloglog.NewPlus(18) + if err != nil { + return nil, err + } + err = restoredCounter.GobDecode(sinkInfo.Recorded) + if err != nil { + return nil, err + } + result.ChunkIncluded++ + err = counter.Merge(restoredCounter) + if err != nil { + return nil, err + } + } + result.Sum = counter.Count() + return &result, nil +} diff --git a/common/ipsetsink/sinkcluster/writer.go b/common/ipsetsink/sinkcluster/writer.go new file mode 100644 index 0000000..1c409e8 --- /dev/null +++ b/common/ipsetsink/sinkcluster/writer.go @@ -0,0 +1,68 @@ +package sinkcluster + +import ( + "bytes" + "encoding/json" + "io" + "log" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink" +) + +func NewClusterWriter(writer WriteSyncer, writeInterval time.Duration, sink *ipsetsink.IPSetSink) *ClusterWriter { + c := &ClusterWriter{ + writer: writer, + lastWriteTime: time.Now(), + writeInterval: writeInterval, + current: sink, + } + return c +} + +type ClusterWriter struct { + writer WriteSyncer + lastWriteTime time.Time + writeInterval time.Duration + current *ipsetsink.IPSetSink +} + +type WriteSyncer interface { + Sync() error + io.Writer +} + +func (c *ClusterWriter) WriteIPSetToDisk() { + currentTime := time.Now() + data, err := c.current.Dump() + if err != nil { + log.Println("unable able to write ipset to file:", err) + return + } + entry := &SinkEntry{ + RecordingStart: c.lastWriteTime, + RecordingEnd: currentTime, + Recorded: data, + } + jsonData, err := json.Marshal(entry) + if err != nil { + log.Println("unable able to write ipset to file:", err) + return + } + jsonData = append(jsonData, byte('\n')) + _, err = io.Copy(c.writer, bytes.NewReader(jsonData)) + if err != nil { + log.Println("unable able to write ipset to file:", err) + return + } + c.writer.Sync() + c.lastWriteTime = currentTime + c.current.Reset() +} + +func (c *ClusterWriter) AddIPToSet(ipAddress string) { + if c.lastWriteTime.Add(c.writeInterval).Before(time.Now()) { + c.WriteIPSetToDisk() + } + c.current.AddIPToSet(ipAddress) +} diff --git a/common/ipsetsink/sinkcluster/writer_test.go b/common/ipsetsink/sinkcluster/writer_test.go new file mode 100644 index 0000000..5319cb2 --- /dev/null +++ b/common/ipsetsink/sinkcluster/writer_test.go @@ -0,0 +1,33 @@ +package sinkcluster + +import ( + "bytes" + "io" + "testing" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink" + + . "github.com/smartystreets/goconvey/convey" +) + +type writerStub struct { + io.Writer +} + +func (w writerStub) Sync() error { + return nil +} + +func TestSinkWriter(t *testing.T) { + + Convey("Context", t, func() { + buffer := bytes.NewBuffer(nil) + writerStubInst := &writerStub{buffer} + sink := ipsetsink.NewIPSetSink("demo") + clusterWriter := NewClusterWriter(writerStubInst, time.Minute, sink) + clusterWriter.AddIPToSet("1") + clusterWriter.WriteIPSetToDisk() + So(buffer.Bytes(), ShouldNotBeNil) + }) +} diff --git a/distinctcounter/counter.go b/distinctcounter/counter.go new file mode 100644 index 0000000..c128465 --- /dev/null +++ b/distinctcounter/counter.go @@ -0,0 +1,37 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink/sinkcluster" +) + +func main() { + inputFile := flag.String("in", "", "") + start := flag.String("start", "", "") + end := flag.String("end", "", "") + flag.Parse() + startTime, err := time.Parse(time.UnixDate, *start) + if err != nil { + log.Fatal("unable to parse start time:", err) + } + endTime, err := time.Parse(time.UnixDate, *end) + if err != nil { + log.Fatal("unable to parse end time:", err) + } + fd, err := os.Open(*inputFile) + if err != nil { + log.Fatal("unable to open input file:", err) + } + counter := sinkcluster.NewClusterCounter(startTime, endTime) + result, err := counter.Count(fd) + if err != nil { + log.Fatal("unable to count:", err) + } + fmt.Printf("sum = %v\n", result.Sum) + fmt.Printf("chunkIncluded = %v\n", result.ChunkIncluded) +} diff --git a/go.mod b/go.mod index 842648c..c782967 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13
require ( git.torproject.org/pluggable-transports/goptlib.git v1.1.0 + github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 // indirect github.com/gorilla/websocket v1.4.1 github.com/pion/ice/v2 v2.2.6 github.com/pion/sdp/v3 v3.0.5
This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit fa7d1e2bb77b92452785ce3d03c81a90efc9891e Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Fri May 27 16:21:28 2022 +0100
Add distinct IP counter to metrics --- broker/metrics.go | 13 +++++++++++++ 1 file changed, 13 insertions(+)
diff --git a/broker/metrics.go b/broker/metrics.go index 639d505..cd1ca37 100644 --- a/broker/metrics.go +++ b/broker/metrics.go @@ -14,6 +14,7 @@ import ( "sync" "time"
+ "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink/sinkcluster" "git.torproject.org/pluggable-transports/snowflake.git/v2/common/messages" "github.com/prometheus/client_golang/prometheus" "gitlab.torproject.org/tpo/anti-censorship/geoip" @@ -41,6 +42,8 @@ type Metrics struct { logger *log.Logger geoipdb *geoip.Geoip
+ distinctIPWriter *sinkcluster.ClusterWriter + countryStats CountryStats clientRoundtripEstimate time.Duration proxyIdleCount uint @@ -324,3 +327,13 @@ func initPrometheus() *PromMetrics {
return promMetrics } + +func (m *Metrics) RecordIPAddress(ip string) { + if m.distinctIPWriter != nil { + m.distinctIPWriter.AddIPToSet(ip) + } +} + +func (m *Metrics) SetIPAddressRecorder(recorder *sinkcluster.ClusterWriter) { + m.distinctIPWriter = recorder +}
This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit 2541b13166b9e502c5314536c3c2777fd84db45b Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Fri May 27 17:37:23 2022 +0100
Add distinct IP counter to broker --- broker/broker.go | 17 +++++++++++++++++ broker/ipc.go | 1 + 2 files changed, 18 insertions(+)
diff --git a/broker/broker.go b/broker/broker.go index 9162370..2bf4614 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -11,6 +11,8 @@ import ( "crypto/tls" "flag" "git.torproject.org/pluggable-transports/snowflake.git/v2/common/bridgefingerprint" + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink" + "git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink/sinkcluster" "io" "log" "net/http" @@ -194,6 +196,8 @@ func main() { var certFilename, keyFilename string var disableGeoip bool var metricsFilename string + var ipCountFilename, ipCountMaskingKey string + var ipCountInterval time.Duration var unsafeLogging bool
flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications") @@ -210,6 +214,9 @@ func main() { flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS") flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection") flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output") + flag.StringVar(&ipCountFilename, "ip-count-log", "", "path to ip count logging output") + flag.StringVar(&ipCountMaskingKey, "ip-count-mask", "", "masking key for ip count logging") + flag.DurationVar(&ipCountInterval, "ip-count-interval", time.Hour, "time interval between each chunk") flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed") flag.Parse()
@@ -257,6 +264,16 @@ func main() { } }
+ if ipCountFilename != "" { + ipCountFile, err := os.OpenFile(ipCountFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + + if err != nil { + log.Fatal(err.Error()) + } + ipSetSink := ipsetsink.NewIPSetSink(ipCountMaskingKey) + ctx.metrics.distinctIPWriter = sinkcluster.NewClusterWriter(ipCountFile, ipCountInterval, ipSetSink) + } + go ctx.Broker()
i := &IPC{ctx} diff --git a/broker/ipc.go b/broker/ipc.go index f5d4747..30de180 100644 --- a/broker/ipc.go +++ b/broker/ipc.go @@ -106,6 +106,7 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error { } else { i.ctx.metrics.lock.Lock() i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType) + i.ctx.metrics.RecordIPAddress(remoteIP) i.ctx.metrics.lock.Unlock() }
This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit be40b623a40d388450722250cc21343f03ea5b46 Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Mon May 30 14:43:04 2022 +0100
Add go sum for hyperloglog --- go.sum | 2 ++ 1 file changed, 2 insertions(+)
diff --git a/go.sum b/go.sum index 2c6f232..e610825 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 h1:mK6JroY6bLiPS3s6QCYOSjRyErFc2iHNkhhmRfF0nHo= +github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004/go.mod h1:drodPoQNro6QBO6TJ/MpMZbz8Bn2eSDtRN6jpG4VGw8= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit af1134362aff7ddbd2103b1b2fd284abe9a03782 Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Mon May 30 16:34:15 2022 +0100
Update distinct counter interface --- distinctcounter/counter.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/distinctcounter/counter.go b/distinctcounter/counter.go index c128465..67110b4 100644 --- a/distinctcounter/counter.go +++ b/distinctcounter/counter.go @@ -12,14 +12,14 @@ import (
func main() { inputFile := flag.String("in", "", "") - start := flag.String("start", "", "") - end := flag.String("end", "", "") + start := flag.String("from", "", "") + end := flag.String("to", "", "") flag.Parse() - startTime, err := time.Parse(time.UnixDate, *start) + startTime, err := time.Parse(time.RFC3339, *start) if err != nil { log.Fatal("unable to parse start time:", err) } - endTime, err := time.Parse(time.UnixDate, *end) + endTime, err := time.Parse(time.RFC3339, *end) if err != nil { log.Fatal("unable to parse end time:", err) }
This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit b18e6fcfe41e15b213e6793b035106492b311a42 Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Tue May 31 14:02:04 2022 +0100
Add document for Distinct IP file --- common/ipsetsink/sinkcluster/common.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+)
diff --git a/common/ipsetsink/sinkcluster/common.go b/common/ipsetsink/sinkcluster/common.go index 501c753..4360f70 100644 --- a/common/ipsetsink/sinkcluster/common.go +++ b/common/ipsetsink/sinkcluster/common.go @@ -1,5 +1,20 @@ package sinkcluster
+/* ClusterWriter, and (ClusterCountResult).Count output a streamed IP set journal file to remember distinct IP address + + its format is as follows: + + This file should be in newline-delimited JSON format(https://jsonlines.org/). + For each line, the format of json data should be in the format of: + {"recordingStart":"2022-05-30T14:38:44.678610091Z","recordingEnd":"2022-05-30T14:39:48.157630926Z","recorded":""} + + recordingStart:datetime is the time this chunk of recording start. + + recordingEnd:datetime is the time this chunk of recording end. + + recorded is the checkpoint data generated by hyperloglog. +*/ + import "time"
type SinkEntry struct {
This is an automated email from the git hooks/post-receive script.
shelikhoo pushed a commit to branch main in repository pluggable-transports/snowflake.
commit 35e9ab8c0b3168b5eaa4f6538b8e9208eb38c508 Author: Shelikhoo xiaokangwang@outlook.com AuthorDate: Wed Jun 15 15:32:58 2022 +0100
Use truncated hash instead crc64 for counted hash --- common/ipsetsink/sink.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-)
diff --git a/common/ipsetsink/sink.go b/common/ipsetsink/sink.go index b62f786..168d061 100644 --- a/common/ipsetsink/sink.go +++ b/common/ipsetsink/sink.go @@ -1,9 +1,10 @@ package ipsetsink
import ( + "bytes" "crypto/hmac" + "encoding/binary" "hash" - "hash/crc64"
"github.com/clarkduvall/hyperloglog" "golang.org/x/crypto/sha3" @@ -31,7 +32,7 @@ func (s *IPSetSink) maskIPAddress(ipAddress string) []byte { }
func (s *IPSetSink) AddIPToSet(ipAddress string) { - s.countDistinct.Add(crc64FromBytes{hashValue(s.maskIPAddress(ipAddress))}) + s.countDistinct.Add(truncatedHash64FromBytes{hashValue(s.maskIPAddress(ipAddress))}) }
func (s *IPSetSink) Dump() ([]byte, error) { @@ -43,10 +44,12 @@ func (s *IPSetSink) Reset() { }
type hashValue []byte -type crc64FromBytes struct { +type truncatedHash64FromBytes struct { hashValue }
-func (c crc64FromBytes) Sum64() uint64 { - return crc64.Checksum(c.hashValue, crc64.MakeTable(crc64.ECMA)) +func (c truncatedHash64FromBytes) Sum64() uint64 { + var value uint64 + binary.Read(bytes.NewReader(c.hashValue), binary.BigEndian, &value) + return value }
tor-commits@lists.torproject.org