岗位聚类
岗位聚类主要有两部分第一部分是对学历和地区信息数据的处理还有一部分是对岗位数据的处理聚类
第一部分
添加HbaseStatistics.java中代码
/** * 学历分布 * * @param stmt * @param tableName * @throws SQLException * @throws IOException */ public void countEducationDistribution(String tableName, String mongoTable, String family) throws SQLException, IOException {///// MongoClient mongoClient = mongodbstorage.setUp(); // String sql = "select LOCATION,count(1) as count,sum(AMOUNT) from " + // tableName // + " where ISPERCEPTED = 'no' group by LOCATION order by count desc"; // ResultSet results = stmt.executeQuery(sql); // 建立表的连接 Table table = connection.getTable(TableName.valueOf(tableName)); // 创建一个空的Scan实例 Scan scan1 = new Scan(); // 可以指定具体的列族列 scan1.addColumn(Bytes.toBytes(family), Bytes.toBytes("EDUCATION")).addColumn(Bytes.toBytes(family), Bytes.toBytes("AMOUNT")); scan1.setCaching(60); scan1.setMaxResultSize(1 * 1024 * 1024); // 100k (MB1 * 1024 * 1024) scan1.setFilter(new PageFilter(1000)); // 在行上获取遍历器 ResultScanner scanner1 = table.getScanner(scan1); Map map = mongodbstorage.create(mongoTable, "job_education_distribution", mongoClient);// mongodb集合,key,value MongoCollection<Document> collection = (MongoCollection<Document>) map.get("collection"); Document document = (Document) map.get("document"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); java.util.Date date = new java.util.Date(); String da = sdf.format(date); mongodbstorage.appendString(document, "date", da); int[] cu = new int[34]; int[] nu = new int[34]; Vector<Document> vec = new Vector<Document>(); for (Result res : scanner1) { String loc = (new String(CellUtil.cloneValue(res.rawCells()[1]))).split("-")[0]; int total = Integer.parseInt(new String(CellUtil.cloneValue(res.rawCells()[0]))); int num = 1; if (!cities.containsKey(loc)) { } else { // String prov = jobanalysisreposity.appendProvinceDistribution(document, loc, // total, mongoClient); String prov = cities.get(loc); switch (prov) { case "初中": cu[0] += total; nu[0] += num; break; case "高中": cu[1] += total; nu[1] += num; break; case "中技": cu[2] += total; nu[2] += num; break; case "中专": cu[3] += total; nu[3] += num; break; case "大专": cu[4] += total; nu[4] += num; break; case "本科": cu[5] += total; nu[5] += num; break; case "硕士": cu[6] += total; nu[6] += num; break; case "博士": cu[7] += total; nu[7] += num; break; default: cu[8] += total; nu[8] += num; break; } } logger.info(loc + ":" + total); } Document[] doc = new Document[34]; for (int i = 0; i <= 33; i++) { doc[i] = new Document(); } mongodbstorage.appendExperience(doc[0], "初中", cu[0], nu[0]); mongodbstorage.appendExperience(doc[1], "高中", cu[1], nu[1]); mongodbstorage.appendExperience(doc[2], "中技", cu[2], nu[2]); mongodbstorage.appendExperience(doc[3], "中专", cu[3], nu[3]); mongodbstorage.appendExperience(doc[4], "大专", cu[4], nu[4]); mongodbstorage.appendExperience(doc[5], "本科", cu[5], nu[5]); mongodbstorage.appendExperience(doc[6], "硕士", cu[6], nu[6]); mongodbstorage.appendExperience(doc[7], "博士", cu[7], nu[7]); mongodbstorage.appendExperience(doc[8], "不限", cu[8], nu[8]); for (int i = 0; i <= 8; i++) { vec.add(doc[i]); } mongodbstorage.appendArray(document, "category", vec); mongodbstorage.insertOne(collection, document); }
第二部分
添加JobClusterService.java中代码
public ServiceState process() {///// try { isTaskDone.set(false); // 链接存储和Hive,用于聚类分析。 // Statement stmt = HiveOperator.getInstance().setUp(); mongoClient = mongodbstorage.setUp(); // 1. 读取配置 // 2 启动聚类,存到MongoDB分析库 List<Map<String, Object>> lists = new ArrayList<>(); JobCluster cluster = new JobCluster(); String JsonContext = ReadFile .ReadFile(System.getProperty("user.dir") + "/configuration/job_industry_skills_config.json"); // json to Object JSONObject jsonObject = new JSONObject(JsonContext); JSONArray jsonArray = jsonObject.getJSONArray("industry"); for (int i = 0; i < jsonArray.length(); i++) { Map<String, Object> map = new HashMap<String, Object>(); JSONObject object = jsonArray.getJSONObject(i); // 获取name(云计算、大数据、ai) String name = object.getString("industry-id"); String[] names = name.split("_"); map.put("industry-id", names[1]); // 获取小类 String category = object.getString("category"); String[] categorys_ = category.split(","); List<String> categorys = new ArrayList<>(); for (String str : categorys_) { categorys.add(str); } map.put("category", categorys); // 获取技能名称 String skill = object.getString("skills"); String[] skills_ = skill.split(","); List<String> skills = new ArrayList<>(); for (String str : skills_) { skills.add(str); } map.put("skills", skills); // 加入map list lists.add(map); } // 岗位聚类 for (Map<String, Object> map : lists) { String str = (String) map.get("industry-id"); for (String str1 : (List<String>) map.get("category"))// 岗位子分类 { for (int i = 0; i <= clusterCount.length-1; i++) { cluster.cluster(str, str1, (List<String>) map.get("skills"),clusterCount[i], mongoClient); } } } //数据统计 HbaseStatistics hbaseStatics =new HbaseStatistics(); hbaseStatics.doDataStatistics(); } catch (Exception e) { logger.error(e.toString()); } finally { isTaskDone.set(true); } return ServiceState.STATE_RUNNING; }
添加JobCluster.java中代码
public List<Integer> kMeans(List<double[]> des1, int cluster) { String masterName = sparkProperties.getProperty("spark_master"); SparkConf conf = new SparkConf().setAppName("JobCluseter").setMaster(masterName); JavaSparkContext jsc = new JavaSparkContext(conf); List<Vector> preData = new ArrayList<>(); for (int i = 0; i < des1.size(); i++) { Vector sv = Vectors.dense(des1.get(i)); preData.add(sv); } JavaRDD<Vector> data = jsc.parallelize(preData); int numClusters = cluster; int numIterations = 5000; int runs = 300; KMeansModel clusters = KMeans.train(data.rdd(), numClusters, numIterations, runs); JavaRDD<Integer> clusterResult = clusters.predict(data); List<Integer> indexs = new ArrayList<>(); Vector[] centers = clusters.clusterCenters(); clusterResult.collect(); for (int i = 0; i < clusterResult.collect().size(); i++) { indexs.add(clusterResult.collect().get(i)); } jsc.close(); return indexs; }
启动服务
@Override public ServiceState start() { //1. 启动收集服务(虫) //Service jobCollector = new JobCollectService(server, this); //jobCollector.start(); // 2. 启动清洗服务(清洗) Service jobCleaner = new JobCleanService(server, this); jobCleaner.start(); // 3. 启动分析服务(聚类) //Service jobCluster = new JobClusterService(server, this); //jobCluster.start(); return ServiceState.STATE_RUNNING; }
JobAnalysisService.java中只解除启动清洗服务的注解 然后运行EduInsightServer.java启动服务
以上2,3,4部分的题目数据已经完成,然后需要运行xueqing-client项目来显示效果(用户名:admin,密码:123456) 需要注解掉xueqing-client项目中controller层指定请求控制代码中的一些从mongodb获取数据的操作,具体请自行分析项目源码
Last updated
Was this helpful?