package ever.pipeline; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; public abstract class Pipeline { private int currentAnalyzerIndex = 0; // Pipeline with the analyzers private ArrayList aList = new ArrayList(); // Cases to be processed. Map must be synchronized to allow multihreading private Map caseMap = Collections .synchronizedMap(new HashMap()); // this method is used to setup a pipeline public abstract void buildPipeline(); /** * This methods just adds an analyzer to the pipeline list * @param ana */ public void addAnalyzer(Analyzer ana) { aList.add(ana); } /** * Set the map with cases * @param input */ public void putCases(HashMap input) { synchronized (caseMap) { caseMap = input; } // Serilize to get testdate disabled for public version // FileOutputStream fos = null; // ObjectOutputStream out = null; // for (int c : input.keySet()) { // Case cas = input.get(c); // try { // fos = new FileOutputStream("objects/" + c + ".ser"); // out = new ObjectOutputStream(fos); // out.writeObject(cas); // out.close(); // } catch (IOException ex) { // ex.printStackTrace(); // } // } } /** * Start the pipeline */ public void run() { while (aList.size() > 0) { ArrayList currentGroup = null; try { currentGroup = getNextGroup(); } catch (EmptyGroupException e) { break; } aList.removeAll(currentGroup); iterateOverGroup(currentGroup); } } /** * Summarizes longest possible list of all analyzers of the same type. * (Local or Global) * * @return * @throws EmptyGroupException */ private ArrayList getNextGroup() throws EmptyGroupException { int pointer = currentAnalyzerIndex; ArrayList caList = new ArrayList(); for (int i = pointer; i < aList.size(); i++) { Analyzer a = aList.get(i); if (caList.size() > 0) { if (caList.get(0).isLocal() == a.isLocal()) caList.add(a); else break; } else { caList.add(a); } } // empty groups are not allowed if (caList.size() == 0) { throw new EmptyGroupException(); } return caList; } /** * Iterates over a group of analyzer and pushes the cases from one analyzer to the next one * @param group */ public void iterateOverGroup(ArrayList group) { boolean isLocal = group.get(0).isLocal(); ScheduledThreadPoolExecutor executer = new ScheduledThreadPoolExecutor( 1); HashSet callableSet = new HashSet(); ArrayList> results = new ArrayList>(); if (isLocal) { for (int i : caseMap.keySet()) { Case c = null; synchronized (caseMap) { c = caseMap.get(i); callableSet.add(new LocalGroupHandler(group, c)); } } for (LocalGroupHandler lgh : callableSet) { results.add(executer.submit(lgh)); } for (Future fc : results) { Case c = null; try { c = fc.get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } synchronized (caseMap) { caseMap.put(c.getId(), c); } } executer.shutdown(); } else { for (Analyzer a : group) { HashMap tmp = new HashMap(); File f = new File("cases.ser"); if (a.getClass().toString().equals("class ever.pipeline.CaseLoader")&f.exists()) { try { FileInputStream fileIn = new FileInputStream("cases.ser"); ObjectInputStream in = new ObjectInputStream(fileIn); tmp = (HashMap) in.readObject(); in.close(); fileIn.close(); synchronized (caseMap) { caseMap = tmp; } }catch(IOException i) { i.printStackTrace(); return; }catch(ClassNotFoundException c) { c.printStackTrace(); return; } } else { synchronized (caseMap) { tmp = (HashMap) caseMap; } tmp = a.analyze(tmp); synchronized (caseMap) { caseMap = tmp; } } } } } public Map getCaseMap() { return caseMap; } }