使用mapreduce的思想做了这周的题目,每个文件使用一个线程处理,最后的结果汇总到reduce,reduce把这些结果合并
用到的内容:多线程中不常用的一直方式Callable,正则表达式,HashMap排序,mapreduce思想
Callable可以获取线程执行完的结果,并且可以抛出异常
下面是代码
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.concurrent.Callable;
/**
* map部分
* @author Matrix42
*
*/
public class WorldCountMap implements Callable<HashMap<String, Integer>>{
//待处理的文本
private String text;
//待处理的文件
private File file;
//以单词为key,次数为value的结果
private HashMap<String, Integer> result;
public WorldCountMap(File file) {
this.file = file;
result = new HashMap<String, Integer>();
}
/**
* 把文件内容读出来存到text中
* @param file
* @return
*/
private String Transform(File file){
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
StringBuffer sb = new StringBuffer();
String string;
try {
while((string = reader.readLine())!=null){
sb.append(string);
}
} catch (IOException e) {
e.printStackTrace();
}
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
return sb.toString();
}
@Override
public HashMap<String, Integer> call() {
this.text = Transform(file);
//使用正则进行分割
String[] strings = text.split("[^\\w]|[\\d]");
for (String string : strings) {
//单词全部转换为小写的
string = string.toLowerCase();
//如果是空就跳过(正则分割时产生的)
if (string.equals("")) {
continue;
}
//把单词存入map,单词为key,如果之前不存在则value为1,存在则balue加1
if(result.containsKey(string)){
result.put(string, result.get(string)+1);
}else {
result.put(string, 1);
}
}
return result;
}
}
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* reduce部分
* @author Matrix42
*
*/
public class WorldCountReduce {
//存放map处理完的结果
private List<Map<String, Integer>> resultList;
//存放reduce处理完的结果
private HashMap<String, Integer> result;
//单例对象
private static WorldCountReduce instance;
//private的构造方法
private WorldCountReduce() {
this.resultList = new ArrayList<Map<String,Integer>>();
result = new HashMap<String, Integer>();
}
//获取单例对象的方法
public static WorldCountReduce getInstance(){
if(instance == null){
instance = new WorldCountReduce();
}
return instance;
}
//添加一个待reduce的结果
public void add(Map<String, Integer> res){
resultList.add(res);
}
//对map结果进行合并,处理方式与map类似
public void calculate(){
for (Map<String, Integer> map : resultList) {
for(Entry<String, Integer> entry:map.entrySet()){
String key = entry.getKey();
if(result.containsKey(key)){
result.put(key, result.get(key)+entry.getValue());
}else{
result.put(key, entry.getValue());
}
}
}
}
//返回reduce结果
public HashMap<String, Integer> getResult(){
return result;
}
}
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 任务管理器,用户不用WorldCountMap和WorldCountReduce打交道
* @author Matrix42
*
*/
public class TaskManager {
private ArrayList<File> tasks;
private List<Future> taskList = new ArrayList<Future>();
private WorldCountReduce reduce;
private ExecutorService exec;
/**
* 构造方法,参数为待处理的文本的File对象
* @param tasks
*/
public TaskManager(ArrayList<File> tasks) {
this.tasks = tasks;
}
/**
* 添加一个待处理的文本的File对象
* @param file
*/
public void addTask(File file){
tasks.add(file);
}
/**
* 开始任务
* 多线程处理
*/
public void start(){
exec = Executors.newFixedThreadPool(tasks.size());
for(File task:tasks){
WorldCountMap mapTask = new WorldCountMap(task);
taskList.add(exec.submit(mapTask));
}
}
/**
* 结束任务
*/
public void shutdownTask(){
exec.shutdownNow();
}
/**
* 把map的结果传给reduce处理,然后返回最后结果
* @return
*/
public HashMap<String, Integer> getResult(){
reduce = WorldCountReduce.getInstance();
for(Future future:taskList){
try {
reduce.add((Map<String, Integer>) future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
reduce.calculate();
return reduce.getResult();
}
}
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class Client {
public static void main(String[] args) {
//添加待处理的文件
ArrayList<File> tasks = new ArrayList<>();
tasks.add(new File("D:/a.txt"));
tasks.add(new File("D:/b.txt"));
tasks.add(new File("D:/c.txt"));
TaskManager manager = new TaskManager(tasks);
//开始任务
manager.start();
//获取结果
Map<String,Integer> resMap = manager.getResult();
List<Entry<String, Integer>> list = new ArrayList<Entry<String, Integer>>(resMap.entrySet());
//按value排序
Collections.sort(list,new Comparator<Map.Entry<String,Integer>>() {
//降序排序
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
return o2.getValue().compareTo(o1.getValue());
}
});
//输出
for (Entry<String, Integer> e: list) {
System.out.println(e.getKey()+":"+e.getValue());
}
//结束任务
manager.shutdownTask();
}
}
正则有点捉急,在RegexBuddy和java中结果不一样
结果: