[metrics-web/master] Make R object generation thread-safe.

commit b0b2f22e160750f9c7acc9de509d2467d3b5c572 Author: Karsten Loesing <karsten.loesing@gmx.net> Date: Wed Mar 21 10:53:55 2012 +0100 Make R object generation thread-safe. Problems with concurrent R object generation were highly unlikely before this patch. But that makes them even harder to track down. Now, we're generating all R objects in a separate thread that we join. Before starting a new thread we check if there's already a thread running to generate the same object, and if so, we join that one. --- src/org/torproject/ernie/web/RObjectGenerator.java | 200 +++++++++++--------- 1 files changed, 108 insertions(+), 92 deletions(-) diff --git a/src/org/torproject/ernie/web/RObjectGenerator.java b/src/org/torproject/ernie/web/RObjectGenerator.java index 011eccd..3482dce 100644 --- a/src/org/torproject/ernie/web/RObjectGenerator.java +++ b/src/org/torproject/ernie/web/RObjectGenerator.java @@ -4,11 +4,13 @@ package org.torproject.ernie.web; import java.io.BufferedInputStream; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -181,48 +183,11 @@ public class RObjectGenerator implements ServletContextListener { /* See if we need to generate this graph. */ File imageFile = new File(this.cachedGraphsDirectory + "/" + imageFilename); - long now = System.currentTimeMillis(); - if (!checkCache || !imageFile.exists() || - imageFile.lastModified() < now - this.maxCacheAge * 1000L) { - - /* We do. Update the R query to contain the absolute path to the - * file to be generated, create a connection to Rserve, run the R - * query, and close the connection. The generated graph will be on - * disk. */ - rQuery = String.format(rQuery, imageFile.getAbsolutePath()); - try { - RConnection rc = new RConnection(rserveHost, rservePort); - rc.eval(rQuery); - rc.close(); - } catch (RserveException e) { - return null; - } - - /* Check that we really just generated the file */ - if (!imageFile.exists() || imageFile.lastModified() < now - - this.maxCacheAge * 1000L) { - return null; - } - } - - /* Read the image from disk and write it to a byte array. */ - byte[] result = null; - try { - BufferedInputStream bis = new BufferedInputStream( - new FileInputStream(imageFile), 1024); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - int length; - while ((length = bis.read(buffer)) > 0) { - baos.write(buffer, 0, length); - } - result = baos.toByteArray(); - } catch (IOException e) { - return null; - } + byte[] imageBytes = this.generateRObject(rQuery, imageFile, + checkCache); /* Return the graph bytes. */ - return result; + return imageBytes; } public SortedSet<String> getAvailableCsvFiles() { @@ -245,35 +210,14 @@ public class RObjectGenerator implements ServletContextListener { /* See if we need to generate this .csv file. */ File csvFile = new File(this.cachedGraphsDirectory + "/" + csvFilename); - long now = System.currentTimeMillis(); - if (!checkCache || !csvFile.exists() || - csvFile.lastModified() < now - this.maxCacheAge * 1000L) { - - /* We do. Update the R query to contain the absolute path to the - * file to be generated, create a connection to Rserve, run the R - * query, and close the connection. The generated csv file will be - * on disk in the same directory as the generated graphs. */ - rQuery = String.format(rQuery, csvFile.getAbsolutePath()); - try { - RConnection rc = new RConnection(rserveHost, rservePort); - rc.eval(rQuery); - rc.close(); - } catch (RserveException e) { - return null; - } - - /* Check that we really just generated the file */ - if (!csvFile.exists() || csvFile.lastModified() < now - - this.maxCacheAge * 1000L) { - return null; - } - } + byte[] csvBytes = this.generateRObject(rQuery, csvFile, checkCache); /* Read the text file from disk and write it to a string. */ String result = null; try { StringBuilder sb = new StringBuilder(); - BufferedReader br = new BufferedReader(new FileReader(csvFile)); + BufferedReader br = new BufferedReader(new InputStreamReader( + new ByteArrayInputStream(csvBytes))); String line = null; while ((line = br.readLine()) != null) { sb.append(line + "\n"); @@ -283,7 +227,7 @@ public class RObjectGenerator implements ServletContextListener { return null; } - /* Return the csv file. */ + /* Return the csv file content. */ return result; } @@ -343,36 +287,15 @@ public class RObjectGenerator implements ServletContextListener { /* See if we need to generate this table. */ File tableFile = new File(this.cachedGraphsDirectory + "/" + tableFilename); - long now = System.currentTimeMillis(); - if (!checkCache || !tableFile.exists() || - tableFile.lastModified() < now - this.maxCacheAge * 1000L) { - - /* We do. Update the R query to contain the absolute path to the - * file to be generated, create a connection to Rserve, run the R - * query, and close the connection. The generated csv file will be - * on disk in the same directory as the generated graphs. */ - rQuery = String.format(rQuery, tableFile.getAbsolutePath()); - try { - RConnection rc = new RConnection(rserveHost, rservePort); - rc.eval(rQuery); - rc.close(); - } catch (RserveException e) { - return null; - } + byte[] tableBytes = this.generateRObject(rQuery, tableFile, + checkCache); - /* Check that we really just generated the file */ - if (!tableFile.exists() || tableFile.lastModified() < now - - this.maxCacheAge * 1000L) { - return null; - } - } - - /* Read the text file from disk and write the table content to a - * map. */ + /* Write the table content to a map. */ List<Map<String, String>> result = null; try { result = new ArrayList<Map<String, String>>(); - BufferedReader br = new BufferedReader(new FileReader(tableFile)); + BufferedReader br = new BufferedReader(new InputStreamReader( + new ByteArrayInputStream(tableBytes))); String line = br.readLine(); if (line != null) { List<String> headers = new ArrayList<String>(Arrays.asList( @@ -396,5 +319,98 @@ public class RObjectGenerator implements ServletContextListener { /* Return table values. */ return result; } -} + /* Generate an R object in a separate worker thread, or wait for an + * already running worker thread to finish and get its result. */ + private byte[] generateRObject(String rQuery, File rObjectFile, + boolean checkCache) { + RObjectGeneratorWorker worker = null; + synchronized (this.rObjectGeneratorThreads) { + if (this.rObjectGeneratorThreads.containsKey(rQuery)) { + worker = this.rObjectGeneratorThreads.get(rQuery); + } else { + worker = new RObjectGeneratorWorker(rQuery, rObjectFile, + checkCache); + this.rObjectGeneratorThreads.put(rQuery, worker); + worker.start(); + } + } + try { + worker.join(); + } catch (InterruptedException e) { + } + synchronized (this.rObjectGeneratorThreads) { + if (this.rObjectGeneratorThreads.containsKey(rQuery) && + this.rObjectGeneratorThreads.get(rQuery) == worker) { + this.rObjectGeneratorThreads.remove(rQuery); + } + } + return worker.getRObjectBytes(); + } + + private Map<String, RObjectGeneratorWorker> rObjectGeneratorThreads = + new HashMap<String, RObjectGeneratorWorker>(); + + private class RObjectGeneratorWorker extends Thread { + + private String rQuery; + private File rObjectFile; + private boolean checkCache; + private byte[] result = null; + + public RObjectGeneratorWorker(String rQuery, File rObjectFile, + boolean checkCache) { + this.rQuery = rQuery; + this.rObjectFile = rObjectFile; + this.checkCache = checkCache; + } + + public void run() { + + /* See if we need to generate this R object. */ + long now = System.currentTimeMillis(); + if (!this.checkCache || !this.rObjectFile.exists() || + this.rObjectFile.lastModified() < now - maxCacheAge * 1000L) { + + /* We do. Update the R query to contain the absolute path to the + * file to be generated, create a connection to Rserve, run the R + * query, and close the connection. The generated object will be + * on disk. */ + this.rQuery = String.format(this.rQuery, + this.rObjectFile.getAbsolutePath()); + try { + RConnection rc = new RConnection(rserveHost, rservePort); + rc.eval(this.rQuery); + rc.close(); + } catch (RserveException e) { + return; + } + + /* Check that we really just generated the R object. */ + if (!this.rObjectFile.exists() || this.rObjectFile.lastModified() + < now - maxCacheAge * 1000L) { + return; + } + } + + /* Read the R object from disk and write it to a byte array. */ + try { + BufferedInputStream bis = new BufferedInputStream( + new FileInputStream(this.rObjectFile), 1024); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int length; + while ((length = bis.read(buffer)) > 0) { + baos.write(buffer, 0, length); + } + this.result = baos.toByteArray(); + } catch (IOException e) { + return; + } + } + + public byte[] getRObjectBytes() { + return result; + } + } +}
participants (1)
-
karsten@torproject.org