From 9b5aa30043db54737f6d7dc5dfa89682d5a9c397 Mon Sep 17 00:00:00 2001
From: Maxime Chassagneux <4163013@airfrance.fr>
Date: Fri, 25 Mar 2016 15:57:00 +0100
Subject: [PATCH] Change batch mode - read all the queue each second (drainTo)
---
pom.xml | 6 +++-
.../airfrance/diqmqs/logparser/Parser.java | 2 +-
.../diqmqs/logparser/ParserEngine.java | 31 +++++++++----------
3 files changed, 20 insertions(+), 19 deletions(-)
diff --git a/pom.xml b/pom.xml
index 94d41eb..46f88d3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,8 +4,12 @@
logParser
0.0.1-SNAPSHOT
logParser
- logParser - send data to influxdb
+ logParser - send data to influxdb
+
+ 1.7
+ 1.7
+
commons-cli
diff --git a/src/main/java/com/airfrance/diqmqs/logparser/Parser.java b/src/main/java/com/airfrance/diqmqs/logparser/Parser.java
index 596af83..23c0fea 100644
--- a/src/main/java/com/airfrance/diqmqs/logparser/Parser.java
+++ b/src/main/java/com/airfrance/diqmqs/logparser/Parser.java
@@ -108,7 +108,7 @@ public class Parser extends TailerListenerAdapter {
{
sb.append("value=1");
}
-
+ // Time
sb.append(" " + System.currentTimeMillis() + "000000");
// Add to the queue
diff --git a/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java b/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java
index 1acc091..382775d 100644
--- a/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java
+++ b/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java
@@ -18,14 +18,13 @@ import org.apache.commons.io.input.TailerListener;
public class ParserEngine implements Runnable{
- private static final long ONE_SECOND = 1L;
- private static final int BATCH_SIZE_MAX = 10;
final private String database = "qualif";
- final long delay = 500;
+ final long delay = 100;
private ArrayList threadsParser = null;
- static BlockingDeque queue = new LinkedBlockingDeque(1000);
+ static BlockingDeque queue = new LinkedBlockingDeque(2000);
private boolean debug = false;
private String patternPath = "";
+
private ScheduledExecutorService scheduler;
@SuppressWarnings("unused")
private ScheduledFuture> timerHandle;
@@ -36,11 +35,12 @@ public class ParserEngine implements Runnable{
this.patternPath = patternPath;
scheduler = Executors.newScheduledThreadPool(1);
// Don't change this as metrics are per second
- this.timerHandle = scheduler.scheduleAtFixedRate( this, ONE_SECOND, ONE_SECOND, TimeUnit.SECONDS);
+ this.timerHandle = scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS);
+
if (debug) {
System.out.println("Create ParserEngine with pattern file : " + patternPath);
- System.out.println("Schedule sender is set to " + ONE_SECOND + " second(s)");
+ System.out.println("Schedule sender is set to " + 1 + " sec " );
}
}
@@ -73,17 +73,15 @@ public class ParserEngine implements Runnable{
public void run() {
StringBuilder postData = new StringBuilder();
- for (int i = 0 ; i < BATCH_SIZE_MAX; i++)
- {
- try {
- String message = queue.poll( 50, TimeUnit.MILLISECONDS);
- if (message != null) {
- postData.append( message + "\n" );
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
+ if (debug) {
+ System.out.println("Actual queue size " + queue.size() );
+ }
+ ArrayList res = new ArrayList(2000);
+ int nbElement = queue.drainTo(res, 2000);
+ if (nbElement > 0) {
+ for ( String s : res) {
+ postData.append( s + "\n" );
}
-
}
try {
if (postData.length() > 0) {
@@ -92,7 +90,6 @@ public class ParserEngine implements Runnable{
} catch (IOException e) {
e.printStackTrace();
}
-
}
public boolean sendMetricToInfluxdb(String postData) throws IOException {