commit def4c929d162852ace8a016bb3352677eec5bcde Author: Arturo Filastò arturo@filasto.net Date: Tue Aug 30 02:09:55 2016 +0200
Implement disk quota management
This is related to the feature described in here: https://github.com/TheTorProject/lepidopter/issues/53 --- ooni/agent/scheduler.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++ ooni/measurements.py | 20 +++++++++++---- ooni/settings.py | 6 ++++- ooni/tests/test_utils.py | 34 ++++++++++++++++++++++++ ooni/ui/web/server.py | 12 ++++++++- ooni/utils/files.py | 34 ++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 7 deletions(-)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py index 3389db1..54551f6 100644 --- a/ooni/agent/scheduler.py +++ b/ooni/agent/scheduler.py @@ -1,3 +1,6 @@ +import os +import errno + from datetime import datetime
from twisted.application import service @@ -8,6 +11,7 @@ from twisted.python.filepath import FilePath from ooni.scripts import oonireport from ooni import resources from ooni.utils import log, SHORT_DATE +from ooni.utils.files import human_size_to_bytes, directory_usage from ooni.deck.store import input_store, deck_store from ooni.settings import config from ooni.contrib import croniter @@ -146,6 +150,66 @@ class DeleteOldReports(ScheduledTask): measurement_path.child(measurement['id']).remove()
+class CheckMeasurementQuota(ScheduledTask): + """ + This task is run to ensure we don't run out of disk space and deletes + older reports to avoid filling the quota. + """ + identifier = 'check-measurement-quota' + schedule = '@hourly' + _warn_when = 0.8 + + def task(self): + if config.basic.measurement_quota is None: + return + maximum_bytes = human_size_to_bytes(config.basic.measurement_quota) + available_bytes = directory_usage(config.measurements_directory) + warning_path = os.path.join(config.running_path, 'quota_warning') + + if (float(available_bytes) / float(maximum_bytes)) >= self._warn_when: + log.warn("You are about to reach the maximum allowed quota. Be careful") + with open(warning_path, "w") as out_file: + out_file.write("{0} {1}".split(available_bytes, maximum_bytes)) + else: + try: + os.remove(warning_path) + except OSError as ose: + if ose.errno != errno.ENOENT: + raise + + if float(available_bytes) < float(maximum_bytes): + # We are within the allow quota exit. + return + + # We should begin to delete old reports + amount_to_delete = float(maximum_bytes) - float(available_bytes) + amount_deleted = 0 + measurement_path = FilePath(config.measurements_directory) + + kept_measurements = [] + stale_measurements = [] + remaining_measurements = [] + measurements_by_date = sorted(list_measurements(compute_size=True), + key=lambda k: k['test_start_time']) + for measurement in measurements_by_date: + if measurement['keep'] is True: + kept_measurements.append(measurement) + elif measurement['stale'] is True: + stale_measurements.append(measurement) + else: + remaining_measurements.append(measurement) + + # This is the order in which we should begin deleting measurements. + ordered_measurements = (stale_measurements + + remaining_measurements + + kept_measurements) + while amount_deleted < amount_to_delete: + measurement = ordered_measurements.pop(0) + log.warn("Deleting report {0}".format(measurement["id"])) + measurement_path.child(measurement['id']).remove() + amount_deleted += measurement['size'] + + class RunDeck(ScheduledTask): """ This will run the decks that have been configured on the system as the @@ -196,6 +260,7 @@ SYSTEM_TASKS = [ UpdateInputsAndResources ]
+ @defer.inlineCallbacks def run_system_tasks(no_input_store=False): task_classes = SYSTEM_TASKS[:] @@ -215,6 +280,7 @@ def run_system_tasks(no_input_store=False): log.err("Failed to run task {0}".format(task.identifier)) log.exception(exc)
+ class SchedulerService(service.MultiService): """ This service is responsible for running the periodic tasks. @@ -271,6 +337,7 @@ class SchedulerService(service.MultiService): self.schedule(UpdateInputsAndResources()) self.schedule(UploadReports()) self.schedule(DeleteOldReports()) + self.schedule(CheckMeasurementQuota()) self.schedule(RefreshDeckList(self))
self._looping_call.start(self.interval) diff --git a/ooni/measurements.py b/ooni/measurements.py index 6d90c1b..fd722ee 100644 --- a/ooni/measurements.py +++ b/ooni/measurements.py @@ -4,6 +4,7 @@ import signal
from twisted.python.filepath import FilePath from ooni.utils import log +from ooni.utils.files import directory_usage from ooni.settings import config
class MeasurementInProgress(Exception): @@ -61,7 +62,8 @@ def generate_summary(input_file, output_file): class MeasurementNotFound(Exception): pass
-def get_measurement(measurement_id): +def get_measurement(measurement_id, compute_size=False): + size = -1 measurement_path = FilePath(config.measurements_directory) measurement = measurement_path.child(measurement_id) if not measurement.exists(): @@ -70,6 +72,7 @@ def get_measurement(measurement_id): running = False completed = True keep = False + stale = False if measurement.child("measurements.njson.progress").exists(): completed = False # XXX this is done quite often around the code, probably should @@ -80,10 +83,14 @@ def get_measurement(measurement_id): os.kill(pid, signal.SIG_DFL) running = True except OSError: - pass + stale = True
if measurement.child("keep").exists(): keep = True + + if compute_size is True: + size = directory_usage(measurement.path) + test_start_time, country_code, asn, test_name = \ measurement_id.split("-")[:4] return { @@ -94,7 +101,9 @@ def get_measurement(measurement_id): "id": measurement_id, "completed": completed, "keep": keep, - "running": running + "running": running, + "stale": stale, + "size": size }
@@ -115,12 +124,13 @@ def get_summary(measurement_id): with summary.open("r") as f: return json.load(f)
-def list_measurements(): + +def list_measurements(compute_size=False): measurements = [] measurement_path = FilePath(config.measurements_directory) for measurement_id in measurement_path.listdir(): try: - measurements.append(get_measurement(measurement_id)) + measurements.append(get_measurement(measurement_id, compute_size)) except: log.err("Failed to get metadata for measurement {0}".format(measurement_id)) return measurements diff --git a/ooni/settings.py b/ooni/settings.py index 2bacd57..c36f3f7 100644 --- a/ooni/settings.py +++ b/ooni/settings.py @@ -20,6 +20,9 @@ basic: # Where OONIProbe should be writing it's log file logfile: {logfile} loglevel: WARNING + # The maximum amount of data to store on disk. Once the quota is reached, + # we will start deleting older reports. + # measurement_quota: 1G privacy: # Should we include the IP address of the probe in the report? includeip: {include_ip} @@ -99,7 +102,8 @@ tor: defaults = { "basic": { "loglevel": "WARNING", - "logfile": "ooniprobe.log" + "logfile": "ooniprobe.log", + "measurement_quota": "1G" }, "privacy": { "includeip": False, diff --git a/ooni/tests/test_utils.py b/ooni/tests/test_utils.py index ea1d858..4ef2926 100644 --- a/ooni/tests/test_utils.py +++ b/ooni/tests/test_utils.py @@ -1,7 +1,10 @@ import os +import tempfile + from twisted.trial import unittest
from ooni.utils import log, generate_filename, net +from ooni.utils.files import human_size_to_bytes, directory_usage
class TestUtils(unittest.TestCase): @@ -43,3 +46,34 @@ class TestUtils(unittest.TestCase): def test_get_addresses(self): addresses = net.getAddresses() assert isinstance(addresses, list) + + def test_human_size(self): + self.assertEqual( + human_size_to_bytes("1G"), + 1024**3 + ) + self.assertEqual( + human_size_to_bytes("1.3M"), + 1.3 * 1024**2 + ) + self.assertEqual( + human_size_to_bytes("1.2K"), + 1.2 * 1024 + ) + self.assertEqual( + human_size_to_bytes("1"), + 1.0 + ) + self.assertEqual( + human_size_to_bytes("100.2"), + 100.2 + ) + + def test_directory_usage(self): + tmp_dir = tempfile.mkdtemp() + with open(os.path.join(tmp_dir, "something.txt"), "w") as out_file: + out_file.write("A"*1000) + os.mkdir(os.path.join(tmp_dir, "subdir")) + with open(os.path.join(tmp_dir, "subdir", "something.txt"), "w") as out_file: + out_file.write("A"*1000) + self.assertEqual(directory_usage(tmp_dir), 1000*2) diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py index ee33fa2..6f9ef69 100644 --- a/ooni/ui/web/server.py +++ b/ooni/ui/web/server.py @@ -1,6 +1,8 @@ from __future__ import print_function
+import os import json +import errno import string from functools import wraps from random import SystemRandom @@ -186,13 +188,21 @@ class WebUIAPI(object):
@property def status(self): + quota_warning = None + try: + with open(os.path.join(config.running_dir, "quota_warning")) as in_file: + quota_warning = in_file.read() + except IOError as ioe: + if ioe.errno != errno.ENOENT: + raise return { "software_version": ooniprobe_version, "software_name": "ooniprobe", "asn": probe_ip.geodata['asn'], "country_code": probe_ip.geodata['countrycode'], "director_started": self._director_started, - "initialized": self._is_initialized + "initialized": self._is_initialized, + "quota_warning": quota_warning }
def handle_director_event(self, event): diff --git a/ooni/utils/files.py b/ooni/utils/files.py new file mode 100644 index 0000000..aefb831 --- /dev/null +++ b/ooni/utils/files.py @@ -0,0 +1,34 @@ +import os +import re + +HUMAN_SIZE = re.compile("(\d+.?\d*G)|(\d+.?\d*M)|(\d+.?\d*K)|(\d+.?\d*)") + +class InvalidFormat(Exception): + pass + +def human_size_to_bytes(human_size): + """ + Converts a size specified in a human friendly way (for example 1G, 10M, + 30K) into bytes. + """ + gb, mb, kb, b = HUMAN_SIZE.match(human_size).groups() + if gb is not None: + b = float(gb[:-1]) * (1024 ** 3) + elif mb is not None: + b = float(mb[:-1]) * (1024 ** 2) + elif kb is not None: + b = float(kb[:-1]) * 1024 + elif b is not None: + b = float(b) + else: + raise InvalidFormat + return b + + +def directory_usage(path): + total_usage = 0 + for root, dirs, filenames in os.walk(path): + for filename in filenames: + fp = os.path.join(root, filename) + total_usage += os.path.getsize(fp) + return total_usage
tor-commits@lists.torproject.org