diff --git a/src/main/java/com/airfrance/diqmqs/logparser/Application.java b/src/main/java/com/airfrance/diqmqs/logparser/Application.java new file mode 100644 index 0000000..43eaceb --- /dev/null +++ b/src/main/java/com/airfrance/diqmqs/logparser/Application.java @@ -0,0 +1,100 @@ +package com.airfrance.diqmqs.logparser; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Scanner; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Application extends Thread { + + private ParserEngine engine; + private boolean onlyOneTime; + private Pattern pattern; + private Matcher matcher; + private Log log; + private File paramFile; + private Watchdog watcher; + + public Application(String patternPath, boolean onlyOneTime) throws IOException { + engine = new ParserEngine(patternPath); + this.onlyOneTime = onlyOneTime; + log = Log.getLogger(Application.class.getName()); + watcher = new Watchdog(this); + } + + void startParserFromCLI(String[] filesName, String[] regexName, String applicationName, boolean readFromStart) + throws IOException { + for (int i = 0; i < filesName.length; i++) { + File f = new File(filesName[i]); + engine.addNewParser(f, regexName[i], applicationName, readFromStart, onlyOneTime); + + } + } + + void stopParser(File file) { + engine.stopOneParser(file.getAbsolutePath()); + } + + void startParser(File file, String regexName, String applicationName) { + engine.addNewParser(file, regexName, applicationName, true, false); + } + + void startFromParamFile(File paramFile) throws IOException { + + this.paramFile = paramFile; + startParserFromFile(paramFile); + log.info("All parsers started"); + watcher.register(paramFile.getParentFile().toPath()); + watcher.start(); + + } + + public void startParserFromFile(File file) throws IOException { + + try { + Scanner scanner = new Scanner(file); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + boolean readFromStart = false; + if (!line.startsWith("#")) { + String[] arguments = line.split("\\t"); + if (arguments.length >= 4) { + String applicationName = arguments[0].toLowerCase(); + Paths pathDirectories = new Paths("/", arguments[1]); + String regexFile = arguments[2]; + String regexGrokName = arguments[3]; + if (arguments.length == 5) { + if (arguments[4].equalsIgnoreCase("true")) { + readFromStart = true; + } + } + for (File dir : pathDirectories.getFiles()) { + pattern = Pattern.compile(regexFile); + for (File f : dir.listFiles()) { + matcher = pattern.matcher(f.getName()); + if (matcher.find()) { + engine.addNewParser(f, regexGrokName, applicationName, readFromStart, onlyOneTime); + } + } + watcher.register(dir.toPath(), regexFile, regexGrokName, applicationName); + } + } + else + { + log.error("Missing field on line : " + line ); + } + } + } + scanner.close(); + } catch (FileNotFoundException fnfe) { + log.error(fnfe.getMessage()); + } + } + + public File getParamFile() { + return paramFile; + } + +} diff --git a/src/main/java/com/airfrance/diqmqs/logparser/Cli.java b/src/main/java/com/airfrance/diqmqs/logparser/Cli.java index ac940fe..a4456fe 100644 --- a/src/main/java/com/airfrance/diqmqs/logparser/Cli.java +++ b/src/main/java/com/airfrance/diqmqs/logparser/Cli.java @@ -1,8 +1,7 @@ package com.airfrance.diqmqs.logparser; import java.io.File; -import java.io.FileNotFoundException; -import java.util.Scanner; +import java.io.IOException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -13,189 +12,124 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; public class Cli { - - public final static String VER = "1.1"; - public static Options options = new Options(); - public static Option help = new Option( "help", "Print this message" ); - public static Option version = new Option( "version", "Print the version information and exit" ); - public static Option application = new Option( "application", true , "Input application name" ); - public static Option patternFilePath = new Option( "pattern", true , "Input pattern path file" ); - public static Option logFile = new Option( "logfile", true , "Input log path files" ); - public static Option regex = new Option( "regex", true , "Name of the regex to apply" ); - public static Option fromStart = new Option( "fromStart", "Read the file from start" ); - public static Option fileParam = new Option( "paramfile", true , "Input a param file" ); - public static Option debugOption = new Option( "debug", "Active debug output message" ); - public static Option infoOption = new Option( "info", "Active info output message" ); - public static Option oneTime = new Option( "oneTime", "Read the file once time" ); - private static long lastModifiedTime = 0L; + + public final static String VER = "2.0"; + public static Options options = new Options(); + public static Option help = new Option("help", "Print this message"); + public static Option version = new Option("version", "Print the version information and exit"); + public static Option application = new Option("application", true, "Input application name"); + public static Option patternFilePath = new Option("pattern", true, "Input pattern path file"); + public static Option logFile = new Option("logfile", true, "Input log path files"); + public static Option regex = new Option("regex", true, "Name of the regex to apply"); + public static Option fromStart = new Option("fromStart", "Read the file from start"); + public static Option fileParam = new Option("paramfile", true, "Input a param file"); + public static Option debugOption = new Option("debug", "Active debug output message"); + public static Option infoOption = new Option("info", "Active info output message"); + public static Option oneTime = new Option("oneTime", "Read the file once time"); + public Cli() { // TODO Auto-generated constructor stub } public static void main(String[] args) { - - + application.setArgName("application name"); patternFilePath.setArgName("path to the pattern file"); logFile.setArgName("path to the logs file"); fileParam.setArgName("path to the param file"); - + options.addOption(help); options.addOption(version); options.addOption(application); - options.addOption(patternFilePath); + options.addOption(patternFilePath); options.addOption(logFile); options.addOption(regex); options.addOption(debugOption); options.addOption(infoOption); options.addOption(fileParam); options.addOption(fromStart); - + HelpFormatter formatter = new HelpFormatter(); boolean readFromStart = false; boolean onlyOneTime = false; CommandLineParser parser = new DefaultParser(); + CommandLine cmd = null; try { - cmd = parser.parse( options, args); - } - catch (ParseException e) { - System.err.println("Command parse error : " + e.getMessage() ); + cmd = parser.parse(options, args); + } catch (ParseException e) { + System.err.println("Command parse error : " + e.getMessage()); System.exit(0); } - - if(cmd.hasOption("help")) { - formatter.printHelp( "logParser" , options ); + if (cmd.hasOption("help")) { + formatter.printHelp("logParser", options); System.exit(0); } - - if(cmd.hasOption("version")) { - System.out.println("logParser " + Cli.VER ); + + if (cmd.hasOption("version")) { + System.out.println("logParser " + Cli.VER); System.exit(0); } - - if(cmd.hasOption("debug")) { + + if (cmd.hasOption("debug")) { Log.setLevel(Log.DEBUG); } - - if(cmd.hasOption("info")) { + + if (cmd.hasOption("info")) { Log.setLevel(Log.INFO); } - + String patternPath = ""; - if(cmd.hasOption("pattern")) { + if (cmd.hasOption("pattern")) { patternPath = cmd.getOptionValue("pattern"); - } - else - { + } else { System.err.println("patternFilePath application missing"); System.exit(0); } - - if(cmd.hasOption("oneTime")) { + + if (cmd.hasOption("oneTime")) { onlyOneTime = true; } - - if(cmd.hasOption("paramfile")) { - String fileParamName = cmd.getOptionValue("paramfile"); - File param = new File(fileParamName); - ParserEngine engine = new ParserEngine(patternPath); - Log log = Log.getLogger(Cli.class.getName()); - try { - startParserFromFile(param, engine , onlyOneTime); - lastModifiedTime = param.lastModified(); - log.info("All parsers started"); - // Check for reload the param file - while (true) { - try { - Thread.sleep(5000); - if (param.exists()) - { - if ( param.lastModified() != lastModifiedTime ) - { - engine.stopAllParser(); - startParserFromFile(param, engine , onlyOneTime); - lastModifiedTime = param.lastModified(); - } - } - - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - } catch (FileNotFoundException e) { - System.err.println("Execption : " + e.getMessage()); - System.exit(0); - } - } - else - { - String applicationName = ""; - if(cmd.hasOption("application")) { - applicationName = cmd.getOptionValue("application"); - } - else - { - System.err.println("Option application missing"); - System.exit(0); - } - - String[] regexName = null; - if(cmd.hasOption("regex")) { - regexName = cmd.getOptionValues("regex"); - } - else - { - System.err.println("regex application missing"); - System.exit(0); - } - - if(cmd.hasOption("fromStart")) { - readFromStart = true; - } - if(cmd.hasOption("logfile")) { - ParserEngine engine = new ParserEngine(patternPath); - String[] filesName = cmd.getOptionValues("logfile"); - for (int i = 0; i < filesName.length ; i++) { - File f = new File (filesName[i]); - engine.addNewParser(f, regexName[i], applicationName, readFromStart, onlyOneTime); + Application app; + try { + app = new Application(patternPath, onlyOneTime); + + if (cmd.hasOption("paramfile")) { + File f = new File(cmd.getOptionValue("paramfile")); + app.startFromParamFile(f); + } else { + String applicationName = ""; + if (cmd.hasOption("application")) { + applicationName = cmd.getOptionValue("application"); + } else { + System.err.println("Option application missing"); + System.exit(0); + } + + String[] regexName = null; + if (cmd.hasOption("regex")) { + regexName = cmd.getOptionValues("regex"); + } else { + System.err.println("regex application missing"); + System.exit(0); + } + + if (cmd.hasOption("fromStart")) { + readFromStart = true; + } + + if (cmd.hasOption("logfile")) { + String[] filesName = cmd.getOptionValues("logfile"); + app.startParserFromCLI(filesName, regexName, applicationName, readFromStart); } } + } catch (IOException e) { + System.err.println(e.getMessage()); + System.exit(0); } } - - - static void startParserFromFile(File file, ParserEngine engine, boolean onceTime) throws FileNotFoundException { - - Scanner scanner = new Scanner(file); - - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); - boolean readFromStart = false; - if (!line.startsWith("#")) - { - String[] arguments = line.split("\\t"); - String applicationName = arguments[0].toLowerCase(); - Paths paths = new Paths("/", arguments[1]); - String regexName = arguments[2]; - if (arguments.length == 4) { - if ( arguments[3].equalsIgnoreCase("true") ){ - readFromStart = true; - } - } - - for (File f : paths.getFiles()) { - engine.addNewParser(f, regexName, applicationName, readFromStart, onceTime); - } - - } - } - scanner.close(); - - } } diff --git a/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java b/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java index 1950525..1bed4e3 100644 --- a/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java +++ b/src/main/java/com/airfrance/diqmqs/logparser/ParserEngine.java @@ -8,6 +8,7 @@ import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Iterator; import java.util.concurrent.BlockingDeque; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; @@ -18,139 +19,142 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.input.Tailer; import org.apache.commons.io.input.TailerListener; -public class ParserEngine implements Runnable{ +public class ParserEngine implements Runnable { - final private String database = "qualif"; - final private String urlInfluxdb = "http://diqmqs.airfrance.fr/influxdb_query/write?rp=one_week&db=" + database; - final long delay = 200; - private boolean onceTime = false; - private ArrayList threadsParser = null; - static BlockingDeque queue = new LinkedBlockingDeque(2000); - private String patternPath = ""; + final private String database = "qualif"; + final private String urlInfluxdb = "http://diqmqs.airfrance.fr/influxdb_query/write?rp=one_week&db=" + database; + final long delay = 200; + private boolean onceTime = false; + private ArrayList threadsParser = null; + static BlockingDeque queue = new LinkedBlockingDeque(2000); + private String patternPath = ""; ArrayList res = new ArrayList(2000); Log log = Log.getLogger(ParserEngine.class.getName()); private ScheduledExecutorService scheduler; private String hostname; - - @SuppressWarnings("unused") private ScheduledFuture timerHandle; - + public ParserEngine(String patternPath) { threadsParser = new ArrayList(); - this.patternPath = patternPath; + this.patternPath = patternPath; scheduler = Executors.newScheduledThreadPool(1); try { hostname = InetAddress.getLocalHost().getHostName().split("\\.")[0]; } catch (UnknownHostException e) { - e.printStackTrace(); + log.error(e.getMessage()); } - this.timerHandle = scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS); + this.timerHandle = scheduler.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS); log.info("Create ParserEngine with pattern file : " + patternPath); - log.info("Schedule sender is set to " + 1 + " secs" ); + log.info("Schedule sender is set to " + 1 + " secs"); } - - void addNewParser(File f , String regexName, String application , boolean readFromStart, boolean onceTime) { + + void addNewParser(File f, String regexName, String application, boolean readFromStart, boolean onceTime) { TailerListener listener = new Parser(application, regexName, this); Tailer tailer = new Tailer(f, listener, delay, !readFromStart, true, 8192); - Thread thread = new Thread(tailer); - thread.setDaemon(true); - thread.setName("Parser - " + threadsParser.size() ); - thread.start(); - threadsParser.add(thread); - this.onceTime = onceTime; - log.info("Thread Parser - " + threadsParser.size() + " started on file : " + f.getName()); + Thread thread = new Thread(tailer); + thread.setDaemon(true); + thread.setName(f.getAbsolutePath()); + thread.start(); + threadsParser.add(thread); + this.onceTime = onceTime; + log.info("Thread Parser - " + threadsParser.size() + " started on file : " + f.getName()); } - + void stopAllParser() { for (Thread t : threadsParser) { t.interrupt(); - log.info("Stop Thread " + t.getName() ); + log.info("Stop Thread " + t.getName()); } threadsParser.clear(); } - - // Read data from the Queue and send it to influxdb - public void run() { - - StringBuilder postData = new StringBuilder(); - log.info("Actual queue size " + queue.size() ); - int nbElement = queue.drainTo(res, 2000); - if (nbElement > 0) { - for ( String s : res) { - postData.append( s + "\n" ); + + void stopOneParser(String name) { + for (Iterator ite = threadsParser.iterator(); ite.hasNext();) { + Thread thread = ite.next(); + if (thread.getName().equals(name)) { + thread.interrupt(); + log.info("Stop Thread " + thread.getName()); + ite.remove(); } } - else { - if (onceTime) - { + } + + // Read data from the Queue and send it to influxdb + public void run() { + StringBuilder postData = new StringBuilder(); + log.info("Actual queue size " + queue.size()); + int nbElement = queue.drainTo(res, 2000); + if (nbElement > 0) { + for (String s : res) { + postData.append(s + "\n"); + } + } else { + if (onceTime) { stopAllParser(); System.exit(0); } - + } try { if (postData.length() > 0) { - if (sendMetricToInfluxdb(postData.toString())) - { + if (sendMetricToInfluxdb(postData.toString())) { res.clear(); - } - else{ - + } else { + } } } catch (IOException e) { e.printStackTrace(); } } - - public boolean sendMetricToInfluxdb(String postData) throws IOException { - URL url = new URL(urlInfluxdb); - long start = System.currentTimeMillis(); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setDoOutput(true); - connection.setRequestMethod("POST"); - connection.setRequestProperty("User-Agent", "LogParser/1.0"); - connection.setRequestProperty("Authorization", "Basic cXVhbGlmOmRpcW0wMQ=="); - connection.setRequestProperty("Content-Length", String.valueOf(postData.length())); + public boolean sendMetricToInfluxdb(String postData) throws IOException { - // Write data - OutputStream os = connection.getOutputStream(); - log.info("Message to send : \n" + postData ); - os.write(postData.getBytes()); - int HttpResult = connection.getResponseCode(); - long end = System.currentTimeMillis(); - long elapseTime = end - start; - if (HttpResult == 204) { - log.debug("Message sended in " + elapseTime + " ms"); - return true; - } else { - log.debug(connection.getResponseCode() + " " + connection.getResponseMessage()); - return false; - } + URL url = new URL(urlInfluxdb); + long start = System.currentTimeMillis(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setDoOutput(true); + connection.setRequestMethod("POST"); + connection.setRequestProperty("User-Agent", "LogParser/1.0"); + connection.setRequestProperty("Authorization", "Basic cXVhbGlmOmRpcW0wMQ=="); + connection.setRequestProperty("Content-Length", String.valueOf(postData.length())); + + // Write data + OutputStream os = connection.getOutputStream(); + log.info("Message to send : \n" + postData); + os.write(postData.getBytes()); + int HttpResult = connection.getResponseCode(); + long end = System.currentTimeMillis(); + long elapseTime = end - start; + if (HttpResult == 204) { + log.debug("Message sended in " + elapseTime + " ms"); + return true; + } else { + log.debug(connection.getResponseCode() + " " + connection.getResponseMessage()); + return false; + } } - - public void addToQueue(String s) { - + + public void addToQueue(String s) { + try { - queue.put(s); + queue.put(s); } catch (InterruptedException e) { e.printStackTrace(); } - - } + + } public String getPatternPath() { return patternPath; } - public void setPatternPath(String patternPath) { this.patternPath = patternPath; } - + public String getHostname() { return hostname; } diff --git a/src/main/java/com/airfrance/diqmqs/logparser/Watchdog.java b/src/main/java/com/airfrance/diqmqs/logparser/Watchdog.java new file mode 100644 index 0000000..95ed876 --- /dev/null +++ b/src/main/java/com/airfrance/diqmqs/logparser/Watchdog.java @@ -0,0 +1,166 @@ +package com.airfrance.diqmqs.logparser; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Watchdog extends Thread { + + private final WatchService watcher; + private final Map keys; + private Pattern pattern; + private Matcher matcher; + private Log log; + private Application app; + private boolean interrupted = false; + + public Watchdog(Application app) throws IOException { + this.watcher = FileSystems.getDefault().newWatchService(); + this.keys = new HashMap(); + log = Log.getLogger(Watchdog.class.getName()); + this.app = app; + } + + @SuppressWarnings("rawtypes") + public void start() { + + while (!interrupted) { + WatchKey key; + try { + key = watcher.take(); + BeanFile bean = keys.get(key); + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + WatchEvent pathEvent = (WatchEvent) event; + String filename = ((Path)pathEvent.context()).toString(); + Path file = bean.getDir().resolve((Path)pathEvent.context()); + + if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) { + // New file + for ( int i = 0 ; i < bean.getRegexFiles().size() ; i++ ) { + pattern = Pattern.compile(bean.getRegexFiles().get(i)); + matcher = pattern.matcher(filename); + if (matcher.find()) { + app.startParser(file.toFile(), bean.getRegexGroks().get(i), bean.getApplicationNames().get(i)); + } + } + } else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) { + // File deleted, stop the parser + app.stopParser(file.toFile()); + } else if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY)) { + if ( file.toFile().equals(app.getParamFile()) ) { + // Reload all + log.info("Reload all parsers from ParamFile"); + try { + app.startParserFromFile(file.toFile()); + } catch (IOException e) { + log.error("Error when loading param file to start Parser : " + e.getMessage()); + } + } + } + } + key.reset(); + + } catch (InterruptedException x) { + interrupted = true; + } + } + try { + watcher.close(); + log.info("Watch service closed."); + } catch (IOException e) { + log.info("Watch service close error : " + e.getMessage()); + } + + } + + public void register(Path dir) throws IOException { + log.info("Register : " + dir.toString() + " for file " + app.getParamFile()); + WatchKey key = dir.register(watcher, ENTRY_MODIFY); + BeanFile bf = new BeanFile(dir, null, null, null); + keys.put(key, bf); + } + + /** + * Register the given directory with the WatchService + */ + public void register(Path dir, String regexFile, String regexGrok, String applicationName) throws IOException { + log.info("Register : " + dir.toString() + " with regex on file " + regexFile); + WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE); + if (keys.containsKey(key)) { + log.info("Folder already register, add new Grok regex : " + regexGrok + " on regex file " + regexFile + " for application " + applicationName); + BeanFile bf = keys.get(key); + bf.addApplicationNames(applicationName); + bf.addRegexFiles(regexFile); + bf.addRegexGroks(regexGrok); + keys.put(key, bf); + } + else { + BeanFile bf = new BeanFile(dir, regexFile, regexGrok, applicationName); + keys.put(key, bf); + } + + } + + class BeanFile { + public Path getDir() { + return dir; + } + + public void setDir(Path dir) { + this.dir = dir; + } + + public List getRegexGroks() { + return regexGroks; + } + + public void addRegexGroks(String regexGrok) { + regexGroks.add(regexGrok); + } + + public List getApplicationNames() { + return applicationNames; + } + + public void addApplicationNames(String applicationName) { + applicationNames.add(applicationName); + } + + public List getRegexFiles() { + return regexFiles; + } + + public void addRegexFiles(String regexFile) { + regexFiles.add(regexFile); + } + + + List regexGroks = new ArrayList(); + List applicationNames = new ArrayList(); + List regexFiles = new ArrayList(); + Path dir; + + BeanFile(Path dir, String regexFile, String regexGrok, String applicationName) { + this.dir = dir; + applicationNames.add(applicationName); + regexGroks.add(regexGrok); + regexFiles.add(regexFile); + } + } + +}