diff --git a/pom.xml b/pom.xml index 94d41eb..46f88d3 100644 --- a/pom.xml +++ b/pom.xml @@ -4,8 +4,12 @@ logParser 0.0.1-SNAPSHOT logParser - logParser - send data to influxdb + logParser - send data to influxdb + + 1.7 + 1.7 + commons-cli diff --git a/src/main/java/com/airfrance/diqmqs/logparser/Parser.java b/src/main/java/com/airfrance/diqmqs/logparser/Parser.java index 596af83..23c0fea 100644 --- a/src/main/java/com/airfrance/diqmqs/logparser/Parser.java +++ b/src/main/java/com/airfrance/diqmqs/logparser/Parser.java @@ -108,7 +108,7 @@ public class Parser extends TailerListenerAdapter { { sb.append("value=1"); } - + // Time sb.append(" " + System.currentTimeMillis() + "000000"); // Add to the queue diff --git a/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java b/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java index 1acc091..382775d 100644 --- a/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java +++ b/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java @@ -18,14 +18,13 @@ import org.apache.commons.io.input.TailerListener; public class ParserEngine implements Runnable{ - private static final long ONE_SECOND = 1L; - private static final int BATCH_SIZE_MAX = 10; final private String database = "qualif"; - final long delay = 500; + final long delay = 100; private ArrayList threadsParser = null; - static BlockingDeque queue = new LinkedBlockingDeque(1000); + static BlockingDeque queue = new LinkedBlockingDeque(2000); private boolean debug = false; private String patternPath = ""; + private ScheduledExecutorService scheduler; @SuppressWarnings("unused") private ScheduledFuture timerHandle; @@ -36,11 +35,12 @@ public class ParserEngine implements Runnable{ this.patternPath = patternPath; scheduler = Executors.newScheduledThreadPool(1); // Don't change this as metrics are per second - this.timerHandle = scheduler.scheduleAtFixedRate( this, ONE_SECOND, ONE_SECOND, TimeUnit.SECONDS); + this.timerHandle = scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS); + if (debug) { System.out.println("Create ParserEngine with pattern file : " + patternPath); - System.out.println("Schedule sender is set to " + ONE_SECOND + " second(s)"); + System.out.println("Schedule sender is set to " + 1 + " sec " ); } } @@ -73,17 +73,15 @@ public class ParserEngine implements Runnable{ public void run() { StringBuilder postData = new StringBuilder(); - for (int i = 0 ; i < BATCH_SIZE_MAX; i++) - { - try { - String message = queue.poll( 50, TimeUnit.MILLISECONDS); - if (message != null) { - postData.append( message + "\n" ); - } - } catch (InterruptedException e) { - e.printStackTrace(); + if (debug) { + System.out.println("Actual queue size " + queue.size() ); + } + ArrayList res = new ArrayList(2000); + int nbElement = queue.drainTo(res, 2000); + if (nbElement > 0) { + for ( String s : res) { + postData.append( s + "\n" ); } - } try { if (postData.length() > 0) { @@ -92,7 +90,6 @@ public class ParserEngine implements Runnable{ } catch (IOException e) { e.printStackTrace(); } - } public boolean sendMetricToInfluxdb(String postData) throws IOException {