博客
关于我
自定义Hive Sql Job分析工具
阅读量:358 次
发布时间:2019-03-05

本文共 17910 字,大约阅读时间需要 59 分钟。

前言

我们都知道,在大数据领域,Hive的出现帮我降低了许多使用Hadoop书写方式的学习成本.使用用户可以使用类似Sql的语法规则写明查询语句,从hive表数据中查询目标数据.最为重要的是这些sql语句会最终转化为map reduce作业进行处理.这也是Hive最强大的地方.可以简单的理解为Hive就是依托在Hadoop上的1个壳.但是这里有一点点小小的不同,不是每段hive查询sql语句与最后生成的job一一对应,如果你的这段sql是一个大sql,他在转化掉之后,会衍生出许多小job,这些小job是独立存在运行的,以不同的job名称进行区别,但是也会保留公共的job名称.所以一个问题来了,对于超级长的hive sql语句,我想查看到底是哪段子sql花费了我大量的执行时间,在JobHistory上只有每个子Job的运行时间,没有子Job对应的sql语句,一旦这个功能有了之后,就会帮助我们迅速的定位到问题所在.


Hive子Job中的Sql

OK,带着上述的目标,我们要想分析出到底哪段子sql所衍生的job运行时间更长,就要先知道这些sql到底在存在与哪里.在前面的描述中,已经提到了,Hive是依托于Hadoop,自然Hive提交的job信息也是保存在Hadoop的HDFS上的.在联想一下JobHistory中的各个文件类型.你应该会发现带有下面后缀的文件存在.


我们发现里面包含了之前分析过的.jhist文件,还有带conf字符的.xml格式文件,从文件名上来看就是job提交时的一些配置信息,然后我们用vim命令查阅conf.xml后缀的文件,看看里面是不是有我们想要的hive qury string 这样的属性


OK,目标算是找到了,这的确就是我们想要的属性.说明这样的信息的确是存在的,后面的操作就是怎么去解析这段有用的信息了.


程序工具分析Hive Sql Job

知道了目标数据源,我们能想到的最简单快速的方法就是逐行解析文件,做做文本匹配,筛选关键信息.这些代码谁都会写,首先要传入一个HDFS目录地址,这个是在JobHistory的存储目录上加上一个具体日期目录,这段解析程序在文章的末尾会加上.下面列举在调试分析程序时遇到的一些问题,这个还是比较有用的.

1.hive sql中的中文导致解析出现乱码

这个又是非常讨厌的java解析出现乱码的原因,因为考虑到sql中存在中文注释,而Hadoop在存中文的时候都是用utf8的编码方式,所以读出文件数据后进行一次转utf-8编码方式的处理,就是下面所示代码.

...fileSystem = path.getFileSystem(new Configuration());			in = fileSystem.open(path);			InputStreamReader isr;			BufferedReader br;			isr = new InputStreamReader(in, "UTF-8");			br = new BufferedReader(isr);			while ((str = br.readLine()) != null) {...

2.单线程解析文件速度过慢

之前在测试环境中做文件解析看不出真实效果,文件一下解析就OK了,但是到真实环境中,多达几万个job文件,程序马上就吃不消了,算上解析文件,再把结果写入mysql,耗时达到60多分钟,后面改成了多线程的方式,后来开到10个线程去跑,速度才快了许多.


3.结果数据写入MySql过慢

后来处理速度是上去了,但是写入sql速度过慢,比如说,我有一次测试,开10个线程区解析,花了8分钟就解析好了几万个文件数据,但是插入数据库花了20分钟左右,而且量也就几万条语句.后来改成了批处理的方式,效果并没有什么大的改变,这个慢的问题具体并没有被解决掉,怀疑可能是因有些语句中存在超长的hive sql语句导致的.


下面是程序的主要分析代码,分为工具类代码,和解析线程类,代码全部链接再此处:

主工具代码

package org.apache.hadoop.mapreduce.v2.hs.tool.sqlanalyse;import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.PrintStream;import java.util.ArrayList;import java.util.HashMap;import java.util.LinkedList;import java.util.List;import java.util.Map.Entry;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileContext;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.hadoop.fs.UnsupportedFileSystemException;import org.apache.hadoop.io.IOUtils;public class HiveSqlAnalyseTool {	private int threadNum;	private String dirType;	private String jobHistoryPath;	private FileContext doneDirFc;	private Path doneDirPrefixPath;	private LinkedList<FileStatus> fileStatusList;	private HashMap<String, String[]> dataInfos;	private DbClient dbClient;	public HiveSqlAnalyseTool(String dirType, String jobHistoryPath,			int threadNum) {		this.threadNum = threadNum;		this.dirType = dirType;		this.jobHistoryPath = jobHistoryPath;		this.dataInfos = new HashMap<String, String[]>();		this.fileStatusList = new LinkedList<FileStatus>();		this.dbClient = new DbClient(BaseValues.DB_URL,				BaseValues.DB_USER_NAME, BaseValues.DB_PASSWORD,				BaseValues.DB_HIVE_SQL_STAT_TABLE_NAME);		try {			doneDirPrefixPath = FileContext.getFileContext(new Configuration())					.makeQualified(new Path(this.jobHistoryPath));			doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri());		} catch (UnsupportedFileSystemException e) {			// TODO Auto-generated catch block			e.printStackTrace();		} catch (IllegalArgumentException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}	}	public void readJobInfoFiles() {		List<FileStatus> files;		files = new ArrayList<FileStatus>();		try {			files = scanDirectory(doneDirPrefixPath, doneDirFc, files);		} catch (IOException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}		if (files != null) {			for (FileStatus fs : files) {				// parseFileInfo(fs);			}			System.out.println("files num is " + files.size());			System.out					.println("fileStatusList size is" + fileStatusList.size());			ParseThread[] threads;			threads = new ParseThread[threadNum];			for (int i = 0; i < threadNum; i++) {				System.out.println("thread " + i + "start run");				threads[i] = new ParseThread(this, fileStatusList, dataInfos);				threads[i].start();			}			for (int i = 0; i < threadNum; i++) {				System.out.println("thread " + i + "join run");				try {					if (threads[i] != null) {						threads[i].join();					}				} catch (InterruptedException e) {					// TODO Auto-generated catch block					e.printStackTrace();				}			}		} else {			System.out.println("files is null");		}		printStatDatas();	}	protected List<FileStatus> scanDirectory(Path path, FileContext fc,			List<FileStatus> jhStatusList) throws IOException {		path = fc.makeQualified(path);		System.out.println("dir path is " + path.getName());		try {			RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);			while (fileStatusIter.hasNext()) {				FileStatus fileStatus = fileStatusIter.next();				Path filePath = fileStatus.getPath();				if (fileStatus.isFile()) {					jhStatusList.add(fileStatus);					fileStatusList.add(fileStatus);				} else if (fileStatus.isDirectory()) {					scanDirectory(filePath, fc, jhStatusList);				}			}		} catch (FileNotFoundException fe) {			System.out.println("Error while scanning directory " + path);		}		return jhStatusList;	}	private void parseFileInfo(FileStatus fs) {		String resultStr;		String str;		String username;		String fileType;		String jobId;		String jobName;		String hiveSql;		int startPos;		int endPos;		int hiveSqlFlag;		long launchTime;		long finishTime;		int mapTaskNum;		int reduceTaskNum;		String xmlNameFlag;		String launchTimeFlag;		String finishTimeFlag;		String launchMapFlag;		String launchReduceFlag;		Path path;		FileSystem fileSystem;		InputStream in;		resultStr = "";		fileType = "";		hiveSql = "";		jobId = "";		jobName = "";		username = "";		hiveSqlFlag = 0;		launchTime = 0;		finishTime = 0;		mapTaskNum = 0;		reduceTaskNum = 0;		xmlNameFlag = "<value>";		launchTimeFlag = "\"launchTime\":";		finishTimeFlag = "\"finishTime\":";		launchMapFlag = "\"Launched map tasks\"";		launchReduceFlag = "\"Launched reduce tasks\"";		path = fs.getPath();		str = path.getName();		if (str.endsWith(".xml")) {			fileType = "config";			endPos = str.lastIndexOf("_");			jobId = str.substring(0, endPos);		} else if (str.endsWith(".jhist")) {			fileType = "info";			endPos = str.indexOf("-");			jobId = str.substring(0, endPos);		} else {			return;		}		try {			fileSystem = path.getFileSystem(new Configuration());			in = fileSystem.open(path);			InputStreamReader isr;			BufferedReader br;			isr = new InputStreamReader(in, "UTF-8");			br = new BufferedReader(isr);			while ((str = br.readLine()) != null) {				if (str.contains("mapreduce.job.user.name")) {					startPos = str.indexOf(xmlNameFlag);					endPos = str.indexOf("</value>");					username = str.substring(startPos + xmlNameFlag.length(),							endPos);				} else if (str.contains("mapreduce.job.name")) {					startPos = str.indexOf(xmlNameFlag);					endPos = str.indexOf("</value>");					jobName = str.substring(startPos + xmlNameFlag.length(),							endPos);				} else if (str.contains("hive.query.string")) {					hiveSqlFlag = 1;					hiveSql = str;				} else if (hiveSqlFlag == 1) {					hiveSql += str;					if (str.contains("</value>")) {						startPos = hiveSql.indexOf(xmlNameFlag);						endPos = hiveSql.indexOf("</value>");						hiveSql = hiveSql.substring(								startPos + xmlNameFlag.length(), endPos);						hiveSqlFlag = 0;					}				} else if (str.startsWith("{\"type\":\"JOB_INITED\"")) {					startPos = str.indexOf(launchTimeFlag);					str = str.substring(startPos + launchTimeFlag.length());					endPos = str.indexOf(",");					launchTime = Long.parseLong(str.substring(0, endPos));				} else if (str.startsWith("{\"type\":\"JOB_FINISHED\"")) {					mapTaskNum = parseTaskNum(launchMapFlag, str);					reduceTaskNum = parseTaskNum(launchReduceFlag, str);					startPos = str.indexOf(finishTimeFlag);					str = str.substring(startPos + finishTimeFlag.length());					endPos = str.indexOf(",");					finishTime = Long.parseLong(str.substring(0, endPos));				}			}			System.out.println("jobId is " + jobId);			System.out.println("jobName is " + jobName);			System.out.println("username is " + username);			System.out.println("map task num is " + mapTaskNum);			System.out.println("reduce task num is " + reduceTaskNum);			System.out.println("launchTime is " + launchTime);			System.out.println("finishTime is " + finishTime);			System.out.println("hive query sql is " + hiveSql);		} catch (IOException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}		if (fileType.equals("config")) {			insertConfParseData(jobId, jobName, username, hiveSql);		} else if (fileType.equals("info")) {			insertJobInfoParseData(jobId, launchTime, finishTime, mapTaskNum,					reduceTaskNum);		}	}	private void insertConfParseData(String jobId, String jobName,			String username, String sql) {		String[] array;		if (dataInfos.containsKey(jobId)) {			array = dataInfos.get(jobId);		} else {			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];		}		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBNAME] = jobName;		array[BaseValues.DB_COLUMN_HIVE_SQL_USERNAME] = username;		array[BaseValues.DB_COLUMN_HIVE_SQL_HIVE_SQL] = sql;		dataInfos.put(jobId, array);	}	private void insertJobInfoParseData(String jobId, long launchTime,			long finishedTime, int mapTaskNum, int reduceTaskNum) {		String[] array;		if (dataInfos.containsKey(jobId)) {			array = dataInfos.get(jobId);		} else {			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];		}		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;		array[BaseValues.DB_COLUMN_HIVE_SQL_START_TIME] = String				.valueOf(launchTime);		array[BaseValues.DB_COLUMN_HIVE_SQL_FINISH_TIME] = String				.valueOf(finishedTime);		array[BaseValues.DB_COLUMN_HIVE_SQL_MAP_TASK_NUM] = String				.valueOf(mapTaskNum);		array[BaseValues.DB_COLUMN_HIVE_SQL_REDUCE_TASK_NUM] = String				.valueOf(reduceTaskNum);		dataInfos.put(jobId, array);	}	private int parseTaskNum(String flag, String jobStr) {		int taskNum;		int startPos;		int endPos;		String tmpStr;		taskNum = 0;		tmpStr = jobStr;		startPos = tmpStr.indexOf(flag);		if (startPos == -1) {			return 0;		}		tmpStr = tmpStr.substring(startPos + flag.length());		endPos = tmpStr.indexOf("}");		tmpStr = tmpStr.substring(0, endPos);		taskNum = Integer.parseInt(tmpStr.split(":")[1]);		return taskNum;	}	private void printStatDatas() {		String jobId;		String jobInfo;		String[] infos;		if (dbClient != null) {			dbClient.createConnection();		}		if (dataInfos != null) {			System.out.println("map data size is" + dataInfos.size());						if (dbClient != null && dirType.equals("dateTimeDir")) {				dbClient.insertDataBatch(dataInfos);			}		}		/*for (Entry<String, String[]> entry : this.dataInfos.entrySet()) {			jobId = entry.getKey();			infos = entry.getValue();			jobInfo = String					.format("jobId is %s, jobName:%s, usrname:%s, launchTime:%s, finishTime:%s, mapTaskNum:%s, reduceTaskNum:%s, querySql:%s",							jobId, infos[1], infos[2], infos[3], infos[4],							infos[5], infos[6], infos[7]);			// System.out.println("job detail info " + jobInfo);			if (dbClient != null && dirType.equals("dateTimeDir")) {				dbClient.insertHiveSqlStatData(infos);			}		}*/		if (dbClient != null) {			dbClient.closeConnection();		}	}	public synchronized FileStatus getOneFile() {		FileStatus fs;		fs = null;		if (fileStatusList != null & fileStatusList.size() > 0) {			fs = fileStatusList.poll();		}		return fs;	}	public synchronized void addDataToMap(String jobId, String[] values) {		if (dataInfos != null) {			dataInfos.put(jobId, values);		}	}}

解析线程代码ParseThread.java:

package org.apache.hadoop.mapreduce.v2.hs.tool.sqlanalyse;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.util.HashMap;import java.util.LinkedList;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;public class ParseThread extends Thread{	private HiveSqlAnalyseTool tool;	private LinkedList<FileStatus> fileStatus;	private HashMap<String, String[]> dataInfos;		public ParseThread(HiveSqlAnalyseTool tool, LinkedList<FileStatus> fileStatus, HashMap<String, String[]> dataInfos){		this.tool = tool;		this.fileStatus = fileStatus;		this.dataInfos = dataInfos;	}		@Override	public void run() {		FileStatus fs;				while(fileStatus != null && !fileStatus.isEmpty()){			fs = tool.getOneFile();			parseFileInfo(fs);		}				super.run();	}	private void parseFileInfo(FileStatus fs) {		String str;		String username;		String fileType;		String jobId;		String jobName;		String hiveSql;		int startPos;		int endPos;		int hiveSqlFlag;		long launchTime;		long finishTime;		int mapTaskNum;		int reduceTaskNum;		String xmlNameFlag;		String launchTimeFlag;		String finishTimeFlag;		String launchMapFlag;		String launchReduceFlag;		Path path;		FileSystem fileSystem;		InputStream in;		fileType = "";		hiveSql = "";		jobId = "";		jobName = "";		username = "";		hiveSqlFlag = 0;		launchTime = 0;		finishTime = 0;		mapTaskNum = 0;		reduceTaskNum = 0;		xmlNameFlag = "<value>";		launchTimeFlag = "\"launchTime\":";		finishTimeFlag = "\"finishTime\":";		launchMapFlag = "\"Launched map tasks\"";		launchReduceFlag = "\"Launched reduce tasks\"";		path = fs.getPath();		str = path.getName();		if (str.endsWith(".xml")) {			fileType = "config";			endPos = str.lastIndexOf("_");			jobId = str.substring(0, endPos);		} else if (str.endsWith(".jhist")) {			fileType = "info";			endPos = str.indexOf("-");			jobId = str.substring(0, endPos);		}else{			return;		}		try {			fileSystem = path.getFileSystem(new Configuration());			in = fileSystem.open(path);			InputStreamReader isr;			BufferedReader br;			isr = new InputStreamReader(in, "UTF-8");			br = new BufferedReader(isr);			while ((str = br.readLine()) != null) {				if (str.contains("mapreduce.job.user.name")) {					startPos = str.indexOf(xmlNameFlag);					endPos = str.indexOf("</value>");					username = str.substring(startPos + xmlNameFlag.length(),							endPos);				} else if (str.contains("mapreduce.job.name")) {					startPos = str.indexOf(xmlNameFlag);					endPos = str.indexOf("</value>");					jobName = str.substring(startPos + xmlNameFlag.length(),							endPos);				} else if (str.contains("hive.query.string")) {					hiveSqlFlag = 1;					hiveSql = str;				} else if (hiveSqlFlag == 1) {					hiveSql += str;					if (str.contains("</value>")) {						startPos = hiveSql.indexOf(xmlNameFlag);						endPos = hiveSql.indexOf("</value>");						hiveSql = hiveSql.substring(								startPos + xmlNameFlag.length(), endPos);						hiveSqlFlag = 0;					}				} else if (str.startsWith("{\"type\":\"JOB_INITED\"")) {					startPos = str.indexOf(launchTimeFlag);					str = str.substring(startPos + launchTimeFlag.length());					endPos = str.indexOf(",");					launchTime = Long.parseLong(str.substring(0, endPos));				} else if (str.startsWith("{\"type\":\"JOB_FINISHED\"")) {					mapTaskNum = parseTaskNum(launchMapFlag, str);					reduceTaskNum = parseTaskNum(launchReduceFlag, str);					startPos = str.indexOf(finishTimeFlag);					str = str.substring(startPos + finishTimeFlag.length());					endPos = str.indexOf(",");					finishTime = Long.parseLong(str.substring(0, endPos));				}			}			/*System.out.println("jobId is " + jobId);			System.out.println("jobName is " + jobName);			System.out.println("username is " + username);			System.out.println("map task num is " + mapTaskNum);			System.out.println("reduce task num is " + reduceTaskNum);			System.out.println("launchTime is " + launchTime);			System.out.println("finishTime is " + finishTime);			System.out.println("hive query sql is " + hiveSql);*/		} catch (IOException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}		if (fileType.equals("config")) {			insertConfParseData(jobId, jobName, username, hiveSql);		} else if (fileType.equals("info")) {			insertJobInfoParseData(jobId, launchTime, finishTime, mapTaskNum,					reduceTaskNum);		}	}	private void insertConfParseData(String jobId, String jobName,			String username, String sql) {		String[] array;		if (dataInfos.containsKey(jobId)) {			array = dataInfos.get(jobId);		} else {			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];		}		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBNAME] = jobName;		array[BaseValues.DB_COLUMN_HIVE_SQL_USERNAME] = username;		array[BaseValues.DB_COLUMN_HIVE_SQL_HIVE_SQL] = sql;		tool.addDataToMap(jobId, array);	}	private void insertJobInfoParseData(String jobId, long launchTime,			long finishedTime, int mapTaskNum, int reduceTaskNum) {		String[] array;		if (dataInfos.containsKey(jobId)) {			array = dataInfos.get(jobId);		} else {			array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];		}		array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;		array[BaseValues.DB_COLUMN_HIVE_SQL_START_TIME] = String				.valueOf(launchTime);		array[BaseValues.DB_COLUMN_HIVE_SQL_FINISH_TIME] = String				.valueOf(finishedTime);		array[BaseValues.DB_COLUMN_HIVE_SQL_MAP_TASK_NUM] = String				.valueOf(mapTaskNum);		array[BaseValues.DB_COLUMN_HIVE_SQL_REDUCE_TASK_NUM] = String				.valueOf(reduceTaskNum);		tool.addDataToMap(jobId, array);	}	private int parseTaskNum(String flag, String jobStr) {		int taskNum;		int startPos;		int endPos;		String tmpStr;		taskNum = 0;		tmpStr = jobStr;		startPos = tmpStr.indexOf(flag);				if(startPos == -1){			return 0;		}				tmpStr = tmpStr.substring(startPos + flag.length());		endPos = tmpStr.indexOf("}");		tmpStr = tmpStr.substring(0, endPos);		taskNum = Integer.parseInt(tmpStr.split(":")[1]);		return taskNum;	}}


其他更多Yarn,Hadoop方面代码的分析请点击链接,后续将会继续更新YARN其他方面的代码分析。

你可能感兴趣的文章
回调函数(callback function)
查看>>
omnet++
查看>>
23种设计模式一:单例模式
查看>>
Qt中的析构函数
查看>>
CSharp中委托(一)委托、匿名函数、lambda表达式、多播委托、窗体传值、泛型委托
查看>>
二叉堆的c++模板类实现
查看>>
C语言实现dijkstra(adjacence matrix)
查看>>
C#学习笔记(十四)事件(一)通知
查看>>
SQL Server SQL语句调优技巧
查看>>
用C#实现封装-徐新帅-专题视频课程
查看>>
C语言学习从初级到精通的疯狂实战教程-徐新帅-专题视频课程
查看>>
三层框架+sql server数据库 实战教学-徐新帅-专题视频课程
查看>>
NAT工作原理
查看>>
Processes, threads and goroutines
查看>>
c++中的10种常见继承
查看>>
语义化版本编号(Semantic Versioning)
查看>>
E28 LoRa模块透传 定点传输 RSSI测试与MicroPython应用
查看>>
成功解决Keil MDK5中no browse information available in ‘xxx’的问题
查看>>
抽离css文件
查看>>
babel预设、插件和webpack中运行
查看>>