使用mapreduce的思想做了这周的题目,每个文件使用一个线程处理,最后的结果汇总到reduce,reduce把这些结果合并

用到的内容:多线程中不常用的一直方式Callable,正则表达式,HashMap排序,mapreduce思想

Callable可以获取线程执行完的结果,并且可以抛出异常

下面是代码

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.concurrent.Callable;
/**
 * map部分
 * @author Matrix42
 *
 */
public class WorldCountMap implements Callable<HashMap<String, Integer>>{
    //待处理的文本
    private String text;
    //待处理的文件
    private File file;
    //以单词为key,次数为value的结果
    private HashMap<String, Integer> result;

    public WorldCountMap(File file) {
        this.file = file;
        result = new HashMap<String, Integer>();
    }
    /**
     * 把文件内容读出来存到text中
     * @param file
     * @return
     */
    private String Transform(File file){
        BufferedReader reader = null;
        try {
            reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        StringBuffer sb = new StringBuffer();
        String string;
        try {
            while((string = reader.readLine())!=null){
                sb.append(string);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return sb.toString();
    }

    @Override
    public HashMap<String, Integer> call() {
        this.text = Transform(file);
        //使用正则进行分割
        String[] strings = text.split("[^\\w]|[\\d]");
        for (String string : strings) {
            //单词全部转换为小写的
            string = string.toLowerCase();
            //如果是空就跳过(正则分割时产生的)
            if (string.equals("")) {
                continue;
            }
            //把单词存入map,单词为key,如果之前不存在则value为1,存在则balue加1
            if(result.containsKey(string)){
                result.put(string, result.get(string)+1);
            }else {
                result.put(string, 1);
            }
        }
        return result;
    }

}
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
 * reduce部分
 * @author Matrix42
 *
 */
public class WorldCountReduce {

    //存放map处理完的结果
    private List<Map<String, Integer>> resultList;
    //存放reduce处理完的结果
    private HashMap<String, Integer> result;
    //单例对象
    private static WorldCountReduce instance;

    //private的构造方法
    private WorldCountReduce() {
        this.resultList = new ArrayList<Map<String,Integer>>();
        result = new HashMap<String, Integer>();
    }
    //获取单例对象的方法
    public static WorldCountReduce getInstance(){
        if(instance == null){
            instance = new WorldCountReduce();
        }
        return instance;
    }

    //添加一个待reduce的结果
    public void add(Map<String, Integer> res){
        resultList.add(res);
    }

    //对map结果进行合并,处理方式与map类似
    public void calculate(){
        for (Map<String, Integer> map : resultList) {
            for(Entry<String, Integer> entry:map.entrySet()){
                String key = entry.getKey();
                if(result.containsKey(key)){
                    result.put(key, result.get(key)+entry.getValue());
                }else{
                    result.put(key, entry.getValue());
                }
            }
        }
    }

    //返回reduce结果
    public HashMap<String, Integer> getResult(){
        return result;
    }

}
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
 * 任务管理器,用户不用WorldCountMap和WorldCountReduce打交道
 * @author Matrix42
 *
 */
public class TaskManager {

    private ArrayList<File> tasks;
    private List<Future> taskList = new ArrayList<Future>();
    private WorldCountReduce reduce;
    private ExecutorService exec;
    /**
     * 构造方法,参数为待处理的文本的File对象
     * @param tasks
     */
    public TaskManager(ArrayList<File> tasks) {
        this.tasks = tasks;
    }
    /**
     * 添加一个待处理的文本的File对象
     * @param file
     */
    public void addTask(File file){
        tasks.add(file);
    }
    /**
     * 开始任务
     * 多线程处理
     */
    public void start(){
        exec = Executors.newFixedThreadPool(tasks.size());
        for(File task:tasks){
            WorldCountMap mapTask = new WorldCountMap(task);
            taskList.add(exec.submit(mapTask));
        }
    }
    /**
     * 结束任务
     */
    public void shutdownTask(){
        exec.shutdownNow();
    }
    /**
     * 把map的结果传给reduce处理,然后返回最后结果
     * @return
     */
    public HashMap<String, Integer> getResult(){
        reduce = WorldCountReduce.getInstance();
        for(Future future:taskList){
            try {
                reduce.add((Map<String, Integer>) future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        reduce.calculate();
        return reduce.getResult();
    }

}
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

public class Client {

    public static void main(String[] args) {
        //添加待处理的文件
        ArrayList<File> tasks = new ArrayList<>();
        tasks.add(new File("D:/a.txt"));
        tasks.add(new File("D:/b.txt"));
        tasks.add(new File("D:/c.txt"));
        TaskManager manager = new TaskManager(tasks);
        //开始任务
        manager.start();
        //获取结果
        Map<String,Integer> resMap = manager.getResult();
        List<Entry<String, Integer>> list = new ArrayList<Entry<String, Integer>>(resMap.entrySet());  
        //按value排序
        Collections.sort(list,new Comparator<Map.Entry<String,Integer>>() {  
            //降序排序  
            public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {  
                return o2.getValue().compareTo(o1.getValue());  
            }  
        });  
        //输出
        for (Entry<String, Integer> e: list) {  
            System.out.println(e.getKey()+":"+e.getValue());  
        }  
        //结束任务
        manager.shutdownTask();
    }

}

正则有点捉急,在RegexBuddy和java中结果不一样

结果:

图片说明