Add directory watchService with regex file name

This commit is contained in:
Maxime Chassagneux
2017-02-01 17:07:18 +01:00
parent 5e5fce2bb5
commit d57b9554f1
4 changed files with 419 additions and 215 deletions

View File

@@ -0,0 +1,100 @@
package com.airfrance.diqmqs.logparser;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class Application extends Thread {
private ParserEngine engine;
private boolean onlyOneTime;
private Pattern pattern;
private Matcher matcher;
private Log log;
private File paramFile;
private Watchdog watcher;
public Application(String patternPath, boolean onlyOneTime) throws IOException {
engine = new ParserEngine(patternPath);
this.onlyOneTime = onlyOneTime;
log = Log.getLogger(Application.class.getName());
watcher = new Watchdog(this);
}
void startParserFromCLI(String[] filesName, String[] regexName, String applicationName, boolean readFromStart)
throws IOException {
for (int i = 0; i < filesName.length; i++) {
File f = new File(filesName[i]);
engine.addNewParser(f, regexName[i], applicationName, readFromStart, onlyOneTime);
}
}
void stopParser(File file) {
engine.stopOneParser(file.getAbsolutePath());
}
void startParser(File file, String regexName, String applicationName) {
engine.addNewParser(file, regexName, applicationName, true, false);
}
void startFromParamFile(File paramFile) throws IOException {
this.paramFile = paramFile;
startParserFromFile(paramFile);
log.info("All parsers started");
watcher.register(paramFile.getParentFile().toPath());
watcher.start();
}
public void startParserFromFile(File file) throws IOException {
try {
Scanner scanner = new Scanner(file);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
boolean readFromStart = false;
if (!line.startsWith("#")) {
String[] arguments = line.split("\\t");
if (arguments.length >= 4) {
String applicationName = arguments[0].toLowerCase();
Paths pathDirectories = new Paths("/", arguments[1]);
String regexFile = arguments[2];
String regexGrokName = arguments[3];
if (arguments.length == 5) {
if (arguments[4].equalsIgnoreCase("true")) {
readFromStart = true;
}
}
for (File dir : pathDirectories.getFiles()) {
pattern = Pattern.compile(regexFile);
for (File f : dir.listFiles()) {
matcher = pattern.matcher(f.getName());
if (matcher.find()) {
engine.addNewParser(f, regexGrokName, applicationName, readFromStart, onlyOneTime);
}
}
watcher.register(dir.toPath(), regexFile, regexGrokName, applicationName);
}
}
else
{
log.error("Missing field on line : " + line );
}
}
}
scanner.close();
} catch (FileNotFoundException fnfe) {
log.error(fnfe.getMessage());
}
}
public File getParamFile() {
return paramFile;
}
}

View File

@@ -1,8 +1,7 @@
package com.airfrance.diqmqs.logparser; package com.airfrance.diqmqs.logparser;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.IOException;
import java.util.Scanner;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
@@ -13,189 +12,124 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
public class Cli { public class Cli {
public final static String VER = "1.1"; public final static String VER = "2.0";
public static Options options = new Options(); public static Options options = new Options();
public static Option help = new Option( "help", "Print this message" ); public static Option help = new Option("help", "Print this message");
public static Option version = new Option( "version", "Print the version information and exit" ); public static Option version = new Option("version", "Print the version information and exit");
public static Option application = new Option( "application", true , "Input application name" ); public static Option application = new Option("application", true, "Input application name");
public static Option patternFilePath = new Option( "pattern", true , "Input pattern path file" ); public static Option patternFilePath = new Option("pattern", true, "Input pattern path file");
public static Option logFile = new Option( "logfile", true , "Input log path files" ); public static Option logFile = new Option("logfile", true, "Input log path files");
public static Option regex = new Option( "regex", true , "Name of the regex to apply" ); public static Option regex = new Option("regex", true, "Name of the regex to apply");
public static Option fromStart = new Option( "fromStart", "Read the file from start" ); public static Option fromStart = new Option("fromStart", "Read the file from start");
public static Option fileParam = new Option( "paramfile", true , "Input a param file" ); public static Option fileParam = new Option("paramfile", true, "Input a param file");
public static Option debugOption = new Option( "debug", "Active debug output message" ); public static Option debugOption = new Option("debug", "Active debug output message");
public static Option infoOption = new Option( "info", "Active info output message" ); public static Option infoOption = new Option("info", "Active info output message");
public static Option oneTime = new Option( "oneTime", "Read the file once time" ); public static Option oneTime = new Option("oneTime", "Read the file once time");
private static long lastModifiedTime = 0L;
public Cli() { public Cli() {
// TODO Auto-generated constructor stub // TODO Auto-generated constructor stub
} }
public static void main(String[] args) { public static void main(String[] args) {
application.setArgName("application name"); application.setArgName("application name");
patternFilePath.setArgName("path to the pattern file"); patternFilePath.setArgName("path to the pattern file");
logFile.setArgName("path to the logs file"); logFile.setArgName("path to the logs file");
fileParam.setArgName("path to the param file"); fileParam.setArgName("path to the param file");
options.addOption(help); options.addOption(help);
options.addOption(version); options.addOption(version);
options.addOption(application); options.addOption(application);
options.addOption(patternFilePath); options.addOption(patternFilePath);
options.addOption(logFile); options.addOption(logFile);
options.addOption(regex); options.addOption(regex);
options.addOption(debugOption); options.addOption(debugOption);
options.addOption(infoOption); options.addOption(infoOption);
options.addOption(fileParam); options.addOption(fileParam);
options.addOption(fromStart); options.addOption(fromStart);
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
boolean readFromStart = false; boolean readFromStart = false;
boolean onlyOneTime = false; boolean onlyOneTime = false;
CommandLineParser parser = new DefaultParser(); CommandLineParser parser = new DefaultParser();
CommandLine cmd = null; CommandLine cmd = null;
try { try {
cmd = parser.parse( options, args); cmd = parser.parse(options, args);
} } catch (ParseException e) {
catch (ParseException e) { System.err.println("Command parse error : " + e.getMessage());
System.err.println("Command parse error : " + e.getMessage() );
System.exit(0); System.exit(0);
} }
if(cmd.hasOption("help")) { if (cmd.hasOption("help")) {
formatter.printHelp( "logParser" , options ); formatter.printHelp("logParser", options);
System.exit(0); System.exit(0);
} }
if(cmd.hasOption("version")) { if (cmd.hasOption("version")) {
System.out.println("logParser " + Cli.VER ); System.out.println("logParser " + Cli.VER);
System.exit(0); System.exit(0);
} }
if(cmd.hasOption("debug")) { if (cmd.hasOption("debug")) {
Log.setLevel(Log.DEBUG); Log.setLevel(Log.DEBUG);
} }
if(cmd.hasOption("info")) { if (cmd.hasOption("info")) {
Log.setLevel(Log.INFO); Log.setLevel(Log.INFO);
} }
String patternPath = ""; String patternPath = "";
if(cmd.hasOption("pattern")) { if (cmd.hasOption("pattern")) {
patternPath = cmd.getOptionValue("pattern"); patternPath = cmd.getOptionValue("pattern");
} } else {
else
{
System.err.println("patternFilePath application missing"); System.err.println("patternFilePath application missing");
System.exit(0); System.exit(0);
} }
if(cmd.hasOption("oneTime")) { if (cmd.hasOption("oneTime")) {
onlyOneTime = true; onlyOneTime = true;
} }
if(cmd.hasOption("paramfile")) {
String fileParamName = cmd.getOptionValue("paramfile");
File param = new File(fileParamName);
ParserEngine engine = new ParserEngine(patternPath);
Log log = Log.getLogger(Cli.class.getName());
try {
startParserFromFile(param, engine , onlyOneTime);
lastModifiedTime = param.lastModified();
log.info("All parsers started");
// Check for reload the param file
while (true) {
try {
Thread.sleep(5000);
if (param.exists())
{
if ( param.lastModified() != lastModifiedTime )
{
engine.stopAllParser();
startParserFromFile(param, engine , onlyOneTime);
lastModifiedTime = param.lastModified();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (FileNotFoundException e) {
System.err.println("Execption : " + e.getMessage());
System.exit(0);
}
}
else
{
String applicationName = "";
if(cmd.hasOption("application")) {
applicationName = cmd.getOptionValue("application");
}
else
{
System.err.println("Option application missing");
System.exit(0);
}
String[] regexName = null;
if(cmd.hasOption("regex")) {
regexName = cmd.getOptionValues("regex");
}
else
{
System.err.println("regex application missing");
System.exit(0);
}
if(cmd.hasOption("fromStart")) {
readFromStart = true;
}
if(cmd.hasOption("logfile")) { Application app;
ParserEngine engine = new ParserEngine(patternPath); try {
String[] filesName = cmd.getOptionValues("logfile"); app = new Application(patternPath, onlyOneTime);
for (int i = 0; i < filesName.length ; i++) {
File f = new File (filesName[i]); if (cmd.hasOption("paramfile")) {
engine.addNewParser(f, regexName[i], applicationName, readFromStart, onlyOneTime); File f = new File(cmd.getOptionValue("paramfile"));
app.startFromParamFile(f);
} else {
String applicationName = "";
if (cmd.hasOption("application")) {
applicationName = cmd.getOptionValue("application");
} else {
System.err.println("Option application missing");
System.exit(0);
}
String[] regexName = null;
if (cmd.hasOption("regex")) {
regexName = cmd.getOptionValues("regex");
} else {
System.err.println("regex application missing");
System.exit(0);
}
if (cmd.hasOption("fromStart")) {
readFromStart = true;
}
if (cmd.hasOption("logfile")) {
String[] filesName = cmd.getOptionValues("logfile");
app.startParserFromCLI(filesName, regexName, applicationName, readFromStart);
} }
} }
} catch (IOException e) {
System.err.println(e.getMessage());
System.exit(0);
} }
} }
static void startParserFromFile(File file, ParserEngine engine, boolean onceTime) throws FileNotFoundException {
Scanner scanner = new Scanner(file);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
boolean readFromStart = false;
if (!line.startsWith("#"))
{
String[] arguments = line.split("\\t");
String applicationName = arguments[0].toLowerCase();
Paths paths = new Paths("/", arguments[1]);
String regexName = arguments[2];
if (arguments.length == 4) {
if ( arguments[3].equalsIgnoreCase("true") ){
readFromStart = true;
}
}
for (File f : paths.getFiles()) {
engine.addNewParser(f, regexName, applicationName, readFromStart, onceTime);
}
}
}
scanner.close();
}
} }

View File

@@ -8,6 +8,7 @@ import java.net.InetAddress;
import java.net.URL; import java.net.URL;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
@@ -18,139 +19,142 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.input.Tailer; import org.apache.commons.io.input.Tailer;
import org.apache.commons.io.input.TailerListener; import org.apache.commons.io.input.TailerListener;
public class ParserEngine implements Runnable{ public class ParserEngine implements Runnable {
final private String database = "qualif"; final private String database = "qualif";
final private String urlInfluxdb = "http://diqmqs.airfrance.fr/influxdb_query/write?rp=one_week&db=" + database; final private String urlInfluxdb = "http://diqmqs.airfrance.fr/influxdb_query/write?rp=one_week&db=" + database;
final long delay = 200; final long delay = 200;
private boolean onceTime = false; private boolean onceTime = false;
private ArrayList<Thread> threadsParser = null; private ArrayList<Thread> threadsParser = null;
static BlockingDeque<String> queue = new LinkedBlockingDeque<String>(2000); static BlockingDeque<String> queue = new LinkedBlockingDeque<String>(2000);
private String patternPath = ""; private String patternPath = "";
ArrayList<String> res = new ArrayList<String>(2000); ArrayList<String> res = new ArrayList<String>(2000);
Log log = Log.getLogger(ParserEngine.class.getName()); Log log = Log.getLogger(ParserEngine.class.getName());
private ScheduledExecutorService scheduler; private ScheduledExecutorService scheduler;
private String hostname; private String hostname;
@SuppressWarnings("unused") @SuppressWarnings("unused")
private ScheduledFuture<?> timerHandle; private ScheduledFuture<?> timerHandle;
public ParserEngine(String patternPath) { public ParserEngine(String patternPath) {
threadsParser = new ArrayList<Thread>(); threadsParser = new ArrayList<Thread>();
this.patternPath = patternPath; this.patternPath = patternPath;
scheduler = Executors.newScheduledThreadPool(1); scheduler = Executors.newScheduledThreadPool(1);
try { try {
hostname = InetAddress.getLocalHost().getHostName().split("\\.")[0]; hostname = InetAddress.getLocalHost().getHostName().split("\\.")[0];
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
e.printStackTrace(); log.error(e.getMessage());
} }
this.timerHandle = scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS); this.timerHandle = scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS);
log.info("Create ParserEngine with pattern file : " + patternPath); log.info("Create ParserEngine with pattern file : " + patternPath);
log.info("Schedule sender is set to " + 1 + " secs" ); log.info("Schedule sender is set to " + 1 + " secs");
} }
void addNewParser(File f , String regexName, String application , boolean readFromStart, boolean onceTime) { void addNewParser(File f, String regexName, String application, boolean readFromStart, boolean onceTime) {
TailerListener listener = new Parser(application, regexName, this); TailerListener listener = new Parser(application, regexName, this);
Tailer tailer = new Tailer(f, listener, delay, !readFromStart, true, 8192); Tailer tailer = new Tailer(f, listener, delay, !readFromStart, true, 8192);
Thread thread = new Thread(tailer); Thread thread = new Thread(tailer);
thread.setDaemon(true); thread.setDaemon(true);
thread.setName("Parser - " + threadsParser.size() ); thread.setName(f.getAbsolutePath());
thread.start(); thread.start();
threadsParser.add(thread); threadsParser.add(thread);
this.onceTime = onceTime; this.onceTime = onceTime;
log.info("Thread Parser - " + threadsParser.size() + " started on file : " + f.getName()); log.info("Thread Parser - " + threadsParser.size() + " started on file : " + f.getName());
} }
void stopAllParser() { void stopAllParser() {
for (Thread t : threadsParser) { for (Thread t : threadsParser) {
t.interrupt(); t.interrupt();
log.info("Stop Thread " + t.getName() ); log.info("Stop Thread " + t.getName());
} }
threadsParser.clear(); threadsParser.clear();
} }
// Read data from the Queue and send it to influxdb void stopOneParser(String name) {
public void run() { for (Iterator<Thread> ite = threadsParser.iterator(); ite.hasNext();) {
Thread thread = ite.next();
StringBuilder postData = new StringBuilder(); if (thread.getName().equals(name)) {
log.info("Actual queue size " + queue.size() ); thread.interrupt();
int nbElement = queue.drainTo(res, 2000); log.info("Stop Thread " + thread.getName());
if (nbElement > 0) { ite.remove();
for ( String s : res) {
postData.append( s + "\n" );
} }
} }
else { }
if (onceTime)
{ // Read data from the Queue and send it to influxdb
public void run() {
StringBuilder postData = new StringBuilder();
log.info("Actual queue size " + queue.size());
int nbElement = queue.drainTo(res, 2000);
if (nbElement > 0) {
for (String s : res) {
postData.append(s + "\n");
}
} else {
if (onceTime) {
stopAllParser(); stopAllParser();
System.exit(0); System.exit(0);
} }
} }
try { try {
if (postData.length() > 0) { if (postData.length() > 0) {
if (sendMetricToInfluxdb(postData.toString())) if (sendMetricToInfluxdb(postData.toString())) {
{
res.clear(); res.clear();
} } else {
else{
} }
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
public boolean sendMetricToInfluxdb(String postData) throws IOException {
URL url = new URL(urlInfluxdb); public boolean sendMetricToInfluxdb(String postData) throws IOException {
long start = System.currentTimeMillis();
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setRequestMethod("POST");
connection.setRequestProperty("User-Agent", "LogParser/1.0");
connection.setRequestProperty("Authorization", "Basic cXVhbGlmOmRpcW0wMQ==");
connection.setRequestProperty("Content-Length", String.valueOf(postData.length()));
// Write data URL url = new URL(urlInfluxdb);
OutputStream os = connection.getOutputStream(); long start = System.currentTimeMillis();
log.info("Message to send : \n" + postData ); HttpURLConnection connection = (HttpURLConnection) url.openConnection();
os.write(postData.getBytes()); connection.setDoOutput(true);
int HttpResult = connection.getResponseCode(); connection.setRequestMethod("POST");
long end = System.currentTimeMillis(); connection.setRequestProperty("User-Agent", "LogParser/1.0");
long elapseTime = end - start; connection.setRequestProperty("Authorization", "Basic cXVhbGlmOmRpcW0wMQ==");
if (HttpResult == 204) { connection.setRequestProperty("Content-Length", String.valueOf(postData.length()));
log.debug("Message sended in " + elapseTime + " ms");
return true; // Write data
} else { OutputStream os = connection.getOutputStream();
log.debug(connection.getResponseCode() + " " + connection.getResponseMessage()); log.info("Message to send : \n" + postData);
return false; os.write(postData.getBytes());
} int HttpResult = connection.getResponseCode();
long end = System.currentTimeMillis();
long elapseTime = end - start;
if (HttpResult == 204) {
log.debug("Message sended in " + elapseTime + " ms");
return true;
} else {
log.debug(connection.getResponseCode() + " " + connection.getResponseMessage());
return false;
}
} }
public void addToQueue(String s) { public void addToQueue(String s) {
try { try {
queue.put(s); queue.put(s);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
public String getPatternPath() { public String getPatternPath() {
return patternPath; return patternPath;
} }
public void setPatternPath(String patternPath) { public void setPatternPath(String patternPath) {
this.patternPath = patternPath; this.patternPath = patternPath;
} }
public String getHostname() { public String getHostname() {
return hostname; return hostname;
} }

View File

@@ -0,0 +1,166 @@
package com.airfrance.diqmqs.logparser;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class Watchdog extends Thread {
private final WatchService watcher;
private final Map<WatchKey, BeanFile> keys;
private Pattern pattern;
private Matcher matcher;
private Log log;
private Application app;
private boolean interrupted = false;
public Watchdog(Application app) throws IOException {
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<WatchKey, BeanFile>();
log = Log.getLogger(Watchdog.class.getName());
this.app = app;
}
@SuppressWarnings("rawtypes")
public void start() {
while (!interrupted) {
WatchKey key;
try {
key = watcher.take();
BeanFile bean = keys.get(key);
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind kind = event.kind();
WatchEvent pathEvent = (WatchEvent) event;
String filename = ((Path)pathEvent.context()).toString();
Path file = bean.getDir().resolve((Path)pathEvent.context());
if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) {
// New file
for ( int i = 0 ; i < bean.getRegexFiles().size() ; i++ ) {
pattern = Pattern.compile(bean.getRegexFiles().get(i));
matcher = pattern.matcher(filename);
if (matcher.find()) {
app.startParser(file.toFile(), bean.getRegexGroks().get(i), bean.getApplicationNames().get(i));
}
}
} else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) {
// File deleted, stop the parser
app.stopParser(file.toFile());
} else if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY)) {
if ( file.toFile().equals(app.getParamFile()) ) {
// Reload all
log.info("Reload all parsers from ParamFile");
try {
app.startParserFromFile(file.toFile());
} catch (IOException e) {
log.error("Error when loading param file to start Parser : " + e.getMessage());
}
}
}
}
key.reset();
} catch (InterruptedException x) {
interrupted = true;
}
}
try {
watcher.close();
log.info("Watch service closed.");
} catch (IOException e) {
log.info("Watch service close error : " + e.getMessage());
}
}
public void register(Path dir) throws IOException {
log.info("Register : " + dir.toString() + " for file " + app.getParamFile());
WatchKey key = dir.register(watcher, ENTRY_MODIFY);
BeanFile bf = new BeanFile(dir, null, null, null);
keys.put(key, bf);
}
/**
* Register the given directory with the WatchService
*/
public void register(Path dir, String regexFile, String regexGrok, String applicationName) throws IOException {
log.info("Register : " + dir.toString() + " with regex on file " + regexFile);
WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE);
if (keys.containsKey(key)) {
log.info("Folder already register, add new Grok regex : " + regexGrok + " on regex file " + regexFile + " for application " + applicationName);
BeanFile bf = keys.get(key);
bf.addApplicationNames(applicationName);
bf.addRegexFiles(regexFile);
bf.addRegexGroks(regexGrok);
keys.put(key, bf);
}
else {
BeanFile bf = new BeanFile(dir, regexFile, regexGrok, applicationName);
keys.put(key, bf);
}
}
class BeanFile {
public Path getDir() {
return dir;
}
public void setDir(Path dir) {
this.dir = dir;
}
public List<String> getRegexGroks() {
return regexGroks;
}
public void addRegexGroks(String regexGrok) {
regexGroks.add(regexGrok);
}
public List<String> getApplicationNames() {
return applicationNames;
}
public void addApplicationNames(String applicationName) {
applicationNames.add(applicationName);
}
public List<String> getRegexFiles() {
return regexFiles;
}
public void addRegexFiles(String regexFile) {
regexFiles.add(regexFile);
}
List<String> regexGroks = new ArrayList<String>();
List<String> applicationNames = new ArrayList<String>();
List<String> regexFiles = new ArrayList<String>();
Path dir;
BeanFile(Path dir, String regexFile, String regexGrok, String applicationName) {
this.dir = dir;
applicationNames.add(applicationName);
regexGroks.add(regexGrok);
regexFiles.add(regexFile);
}
}
}