[tor-commits] [ooni-probe/master] Implement disk quota management

art at torproject.org art at torproject.org
Mon Sep 19 12:14:24 UTC 2016


commit def4c929d162852ace8a016bb3352677eec5bcde
Author: Arturo Filastò <arturo at filasto.net>
Date:   Tue Aug 30 02:09:55 2016 +0200

    Implement disk quota management
    
    This is related to the feature described in here: https://github.com/TheTorProject/lepidopter/issues/53
---
 ooni/agent/scheduler.py  | 67 ++++++++++++++++++++++++++++++++++++++++++++++++
 ooni/measurements.py     | 20 +++++++++++----
 ooni/settings.py         |  6 ++++-
 ooni/tests/test_utils.py | 34 ++++++++++++++++++++++++
 ooni/ui/web/server.py    | 12 ++++++++-
 ooni/utils/files.py      | 34 ++++++++++++++++++++++++
 6 files changed, 166 insertions(+), 7 deletions(-)

diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 3389db1..54551f6 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -1,3 +1,6 @@
+import os
+import errno
+
 from datetime import datetime
 
 from twisted.application import service
@@ -8,6 +11,7 @@ from twisted.python.filepath import FilePath
 from ooni.scripts import oonireport
 from ooni import resources
 from ooni.utils import log, SHORT_DATE
+from ooni.utils.files import human_size_to_bytes, directory_usage
 from ooni.deck.store import input_store, deck_store
 from ooni.settings import config
 from ooni.contrib import croniter
@@ -146,6 +150,66 @@ class DeleteOldReports(ScheduledTask):
                 measurement_path.child(measurement['id']).remove()
 
 
+class CheckMeasurementQuota(ScheduledTask):
+    """
+    This task is run to ensure we don't run out of disk space and deletes
+    older reports to avoid filling the quota.
+    """
+    identifier = 'check-measurement-quota'
+    schedule = '@hourly'
+    _warn_when = 0.8
+
+    def task(self):
+        if config.basic.measurement_quota is None:
+            return
+        maximum_bytes = human_size_to_bytes(config.basic.measurement_quota)
+        available_bytes = directory_usage(config.measurements_directory)
+        warning_path = os.path.join(config.running_path, 'quota_warning')
+
+        if (float(available_bytes) / float(maximum_bytes)) >= self._warn_when:
+            log.warn("You are about to reach the maximum allowed quota. Be careful")
+            with open(warning_path, "w") as out_file:
+                out_file.write("{0} {1}".split(available_bytes, maximum_bytes))
+        else:
+            try:
+                os.remove(warning_path)
+            except OSError as ose:
+                if ose.errno != errno.ENOENT:
+                    raise
+
+        if float(available_bytes) < float(maximum_bytes):
+            # We are within the allow quota exit.
+            return
+
+        # We should begin to delete old reports
+        amount_to_delete = float(maximum_bytes) - float(available_bytes)
+        amount_deleted = 0
+        measurement_path = FilePath(config.measurements_directory)
+
+        kept_measurements = []
+        stale_measurements = []
+        remaining_measurements = []
+        measurements_by_date = sorted(list_measurements(compute_size=True),
+                                      key=lambda k: k['test_start_time'])
+        for measurement in measurements_by_date:
+            if measurement['keep'] is True:
+                kept_measurements.append(measurement)
+            elif measurement['stale'] is True:
+                stale_measurements.append(measurement)
+            else:
+                remaining_measurements.append(measurement)
+
+        # This is the order in which we should begin deleting measurements.
+        ordered_measurements = (stale_measurements +
+                                remaining_measurements +
+                                kept_measurements)
+        while amount_deleted < amount_to_delete:
+            measurement = ordered_measurements.pop(0)
+            log.warn("Deleting report {0}".format(measurement["id"]))
+            measurement_path.child(measurement['id']).remove()
+            amount_deleted += measurement['size']
+
+
 class RunDeck(ScheduledTask):
     """
     This will run the decks that have been configured on the system as the
@@ -196,6 +260,7 @@ SYSTEM_TASKS = [
     UpdateInputsAndResources
 ]
 
+
 @defer.inlineCallbacks
 def run_system_tasks(no_input_store=False):
     task_classes = SYSTEM_TASKS[:]
@@ -215,6 +280,7 @@ def run_system_tasks(no_input_store=False):
             log.err("Failed to run task {0}".format(task.identifier))
             log.exception(exc)
 
+
 class SchedulerService(service.MultiService):
     """
     This service is responsible for running the periodic tasks.
@@ -271,6 +337,7 @@ class SchedulerService(service.MultiService):
         self.schedule(UpdateInputsAndResources())
         self.schedule(UploadReports())
         self.schedule(DeleteOldReports())
+        self.schedule(CheckMeasurementQuota())
         self.schedule(RefreshDeckList(self))
 
         self._looping_call.start(self.interval)
diff --git a/ooni/measurements.py b/ooni/measurements.py
index 6d90c1b..fd722ee 100644
--- a/ooni/measurements.py
+++ b/ooni/measurements.py
@@ -4,6 +4,7 @@ import signal
 
 from twisted.python.filepath import FilePath
 from ooni.utils import log
+from ooni.utils.files import directory_usage
 from ooni.settings import config
 
 class MeasurementInProgress(Exception):
@@ -61,7 +62,8 @@ def generate_summary(input_file, output_file):
 class MeasurementNotFound(Exception):
     pass
 
-def get_measurement(measurement_id):
+def get_measurement(measurement_id, compute_size=False):
+    size = -1
     measurement_path = FilePath(config.measurements_directory)
     measurement = measurement_path.child(measurement_id)
     if not measurement.exists():
@@ -70,6 +72,7 @@ def get_measurement(measurement_id):
     running = False
     completed = True
     keep = False
+    stale = False
     if measurement.child("measurements.njson.progress").exists():
         completed = False
         # XXX this is done quite often around the code, probably should
@@ -80,10 +83,14 @@ def get_measurement(measurement_id):
             os.kill(pid, signal.SIG_DFL)
             running = True
         except OSError:
-            pass
+            stale = True
 
     if measurement.child("keep").exists():
         keep = True
+
+    if compute_size is True:
+        size = directory_usage(measurement.path)
+
     test_start_time, country_code, asn, test_name = \
         measurement_id.split("-")[:4]
     return {
@@ -94,7 +101,9 @@ def get_measurement(measurement_id):
         "id": measurement_id,
         "completed": completed,
         "keep": keep,
-        "running": running
+        "running": running,
+        "stale": stale,
+        "size": size
     }
 
 
@@ -115,12 +124,13 @@ def get_summary(measurement_id):
     with summary.open("r") as f:
         return json.load(f)
 
-def list_measurements():
+
+def list_measurements(compute_size=False):
     measurements = []
     measurement_path = FilePath(config.measurements_directory)
     for measurement_id in measurement_path.listdir():
         try:
-            measurements.append(get_measurement(measurement_id))
+            measurements.append(get_measurement(measurement_id, compute_size))
         except:
             log.err("Failed to get metadata for measurement {0}".format(measurement_id))
     return measurements
diff --git a/ooni/settings.py b/ooni/settings.py
index 2bacd57..c36f3f7 100644
--- a/ooni/settings.py
+++ b/ooni/settings.py
@@ -20,6 +20,9 @@ basic:
     # Where OONIProbe should be writing it's log file
     logfile: {logfile}
     loglevel: WARNING
+    # The maximum amount of data to store on disk. Once the quota is reached,
+    # we will start deleting older reports.
+    # measurement_quota: 1G
 privacy:
     # Should we include the IP address of the probe in the report?
     includeip: {include_ip}
@@ -99,7 +102,8 @@ tor:
 defaults = {
     "basic": {
         "loglevel": "WARNING",
-        "logfile": "ooniprobe.log"
+        "logfile": "ooniprobe.log",
+        "measurement_quota": "1G"
     },
     "privacy": {
         "includeip": False,
diff --git a/ooni/tests/test_utils.py b/ooni/tests/test_utils.py
index ea1d858..4ef2926 100644
--- a/ooni/tests/test_utils.py
+++ b/ooni/tests/test_utils.py
@@ -1,7 +1,10 @@
 import os
+import tempfile
+
 from twisted.trial import unittest
 
 from ooni.utils import log, generate_filename, net
+from ooni.utils.files import human_size_to_bytes, directory_usage
 
 
 class TestUtils(unittest.TestCase):
@@ -43,3 +46,34 @@ class TestUtils(unittest.TestCase):
     def test_get_addresses(self):
         addresses = net.getAddresses()
         assert isinstance(addresses, list)
+
+    def test_human_size(self):
+        self.assertEqual(
+            human_size_to_bytes("1G"),
+            1024**3
+        )
+        self.assertEqual(
+            human_size_to_bytes("1.3M"),
+            1.3 * 1024**2
+        )
+        self.assertEqual(
+            human_size_to_bytes("1.2K"),
+            1.2 * 1024
+        )
+        self.assertEqual(
+            human_size_to_bytes("1"),
+            1.0
+        )
+        self.assertEqual(
+            human_size_to_bytes("100.2"),
+            100.2
+        )
+
+    def test_directory_usage(self):
+        tmp_dir = tempfile.mkdtemp()
+        with open(os.path.join(tmp_dir, "something.txt"), "w") as out_file:
+            out_file.write("A"*1000)
+        os.mkdir(os.path.join(tmp_dir, "subdir"))
+        with open(os.path.join(tmp_dir, "subdir", "something.txt"), "w") as out_file:
+            out_file.write("A"*1000)
+        self.assertEqual(directory_usage(tmp_dir), 1000*2)
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index ee33fa2..6f9ef69 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -1,6 +1,8 @@
 from __future__ import print_function
 
+import os
 import json
+import errno
 import string
 from functools import wraps
 from random import SystemRandom
@@ -186,13 +188,21 @@ class WebUIAPI(object):
 
     @property
     def status(self):
+        quota_warning = None
+        try:
+            with open(os.path.join(config.running_dir, "quota_warning")) as in_file:
+                quota_warning = in_file.read()
+        except IOError as ioe:
+            if ioe.errno != errno.ENOENT:
+                raise
         return {
             "software_version": ooniprobe_version,
             "software_name": "ooniprobe",
             "asn": probe_ip.geodata['asn'],
             "country_code": probe_ip.geodata['countrycode'],
             "director_started": self._director_started,
-            "initialized": self._is_initialized
+            "initialized": self._is_initialized,
+            "quota_warning": quota_warning
         }
 
     def handle_director_event(self, event):
diff --git a/ooni/utils/files.py b/ooni/utils/files.py
new file mode 100644
index 0000000..aefb831
--- /dev/null
+++ b/ooni/utils/files.py
@@ -0,0 +1,34 @@
+import os
+import re
+
+HUMAN_SIZE = re.compile("(\d+\.?\d*G)|(\d+\.?\d*M)|(\d+\.?\d*K)|(\d+\.?\d*)")
+
+class InvalidFormat(Exception):
+    pass
+
+def human_size_to_bytes(human_size):
+    """
+    Converts a size specified in a human friendly way (for example 1G, 10M,
+    30K) into bytes.
+    """
+    gb, mb, kb, b = HUMAN_SIZE.match(human_size).groups()
+    if gb is not None:
+        b = float(gb[:-1]) * (1024 ** 3)
+    elif mb is not None:
+        b = float(mb[:-1]) * (1024 ** 2)
+    elif kb is not None:
+        b = float(kb[:-1]) * 1024
+    elif b is not None:
+        b = float(b)
+    else:
+        raise InvalidFormat
+    return b
+
+
+def directory_usage(path):
+    total_usage = 0
+    for root, dirs, filenames in os.walk(path):
+        for filename in filenames:
+            fp = os.path.join(root, filename)
+            total_usage += os.path.getsize(fp)
+    return total_usage





More information about the tor-commits mailing list