Change batch mode - read all the queue each second (drainTo)

This commit is contained in:
Maxime Chassagneux
2016-03-25 15:57:00 +01:00
parent e2b46c3e8c
commit 9b5aa30043
3 changed files with 20 additions and 19 deletions

View File

@@ -4,8 +4,12 @@
<artifactId>logParser</artifactId> <artifactId>logParser</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<name>logParser</name> <name>logParser</name>
<description>logParser - send data to influxdb</description>
<description>logParser - send data to influxdb</description>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>commons-cli</groupId> <groupId>commons-cli</groupId>

View File

@@ -108,7 +108,7 @@ public class Parser extends TailerListenerAdapter {
{ {
sb.append("value=1"); sb.append("value=1");
} }
// Time
sb.append(" " + System.currentTimeMillis() + "000000"); sb.append(" " + System.currentTimeMillis() + "000000");
// Add to the queue // Add to the queue

View File

@@ -18,14 +18,13 @@ import org.apache.commons.io.input.TailerListener;
public class ParserEngine implements Runnable{ public class ParserEngine implements Runnable{
private static final long ONE_SECOND = 1L;
private static final int BATCH_SIZE_MAX = 10;
final private String database = "qualif"; final private String database = "qualif";
final long delay = 500; final long delay = 100;
private ArrayList<Thread> threadsParser = null; private ArrayList<Thread> threadsParser = null;
static BlockingDeque<String> queue = new LinkedBlockingDeque<String>(1000); static BlockingDeque<String> queue = new LinkedBlockingDeque<String>(2000);
private boolean debug = false; private boolean debug = false;
private String patternPath = ""; private String patternPath = "";
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
@SuppressWarnings("unused") @SuppressWarnings("unused")
private ScheduledFuture<?> timerHandle; private ScheduledFuture<?> timerHandle;
@@ -36,11 +35,12 @@ public class ParserEngine implements Runnable{
this.patternPath = patternPath; this.patternPath = patternPath;
scheduler = Executors.newScheduledThreadPool(1); scheduler = Executors.newScheduledThreadPool(1);
// Don't change this as metrics are per second // Don't change this as metrics are per second
this.timerHandle = scheduler.scheduleAtFixedRate( this, ONE_SECOND, ONE_SECOND, TimeUnit.SECONDS); this.timerHandle = scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS);
if (debug) { if (debug) {
System.out.println("Create ParserEngine with pattern file : " + patternPath); System.out.println("Create ParserEngine with pattern file : " + patternPath);
System.out.println("Schedule sender is set to " + ONE_SECOND + " second(s)"); System.out.println("Schedule sender is set to " + 1 + " sec " );
} }
} }
@@ -73,17 +73,15 @@ public class ParserEngine implements Runnable{
public void run() { public void run() {
StringBuilder postData = new StringBuilder(); StringBuilder postData = new StringBuilder();
for (int i = 0 ; i < BATCH_SIZE_MAX; i++) if (debug) {
{ System.out.println("Actual queue size " + queue.size() );
try { }
String message = queue.poll( 50, TimeUnit.MILLISECONDS); ArrayList<String> res = new ArrayList<String>(2000);
if (message != null) { int nbElement = queue.drainTo(res, 2000);
postData.append( message + "\n" ); if (nbElement > 0) {
} for ( String s : res) {
} catch (InterruptedException e) { postData.append( s + "\n" );
e.printStackTrace();
} }
} }
try { try {
if (postData.length() > 0) { if (postData.length() > 0) {
@@ -92,7 +90,6 @@ public class ParserEngine implements Runnable{
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
public boolean sendMetricToInfluxdb(String postData) throws IOException { public boolean sendMetricToInfluxdb(String postData) throws IOException {