/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.SequencedCollection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InMemoryFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.InputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MRConstants;
import org.apache.hadoop.mapred.MapOutputLocation;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.ReduceTaskRunner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskRunner;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

class ReduceTask
extends Task {
    private static final Log LOG;
    private int numMaps;
    private ReduceCopier reduceCopier;
    private Progress copyPhase;
    private Progress sortPhase;
    private Progress reducePhase;
    private Counters.Counter reduceInputKeyCounter;
    private Counters.Counter reduceInputValueCounter;
    private Counters.Counter reduceOutputCounter;
    private Comparator<FileStatus> mapOutputFileComparator;
    private final SortedSet<FileStatus> mapOutputFilesOnDisk;

    public ReduceTask() {
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
        this.reduceInputKeyCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = this.getCounters().findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS);
        this.mapOutputFileComparator = new Comparator<FileStatus>(){

            @Override
            public int compare(FileStatus a, FileStatus b) {
                if (a.getLen() < b.getLen()) {
                    return -1;
                }
                if (a.getLen() == b.getLen()) {
                    if (a.getPath().toString().equals(b.getPath().toString())) {
                        return 0;
                    }
                    return -1;
                }
                return 1;
            }
        };
        this.mapOutputFilesOnDisk = new TreeSet<FileStatus>(this.mapOutputFileComparator);
    }

    public ReduceTask(String jobId, String jobFile, String tipId, String taskId, int partition, int numMaps) {
        super(jobId, jobFile, tipId, taskId, partition);
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
        this.reduceInputKeyCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = this.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = this.getCounters().findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS);
        this.mapOutputFileComparator = new /* invalid duplicate definition of identical inner class */;
        this.mapOutputFilesOnDisk = new TreeSet<FileStatus>(this.mapOutputFileComparator);
        this.numMaps = numMaps;
    }

    public TaskRunner createRunner(TaskTracker tracker) throws IOException {
        return new ReduceTaskRunner(this, tracker, this.conf);
    }

    public boolean isMapTask() {
        return false;
    }

    public int getNumMaps() {
        return this.numMaps;
    }

    public void localizeConfiguration(JobConf conf) throws IOException {
        super.localizeConfiguration(conf);
        conf.setNumMapTasks(this.numMaps);
    }

    public void write(DataOutput out) throws IOException {
        super.write(out);
        out.writeInt(this.numMaps);
    }

    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        this.numMaps = in.readInt();
    }

    private Path[] getMapFiles(FileSystem fs, boolean isLocal) throws IOException {
        ArrayList<Path> fileList = new ArrayList<Path>();
        if (isLocal) {
            for (int i = 0; i < this.numMaps; ++i) {
                fileList.add(this.mapOutputFile.getInputFile(i, this.getTaskId()));
            }
        } else {
            for (FileStatus filestatus : this.mapOutputFilesOnDisk) {
                fileList.add(filestatus.getPath());
            }
        }
        return fileList.toArray(new Path[0]);
    }

    public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException {
        Reducer reducer = (Reducer)ReflectionUtils.newInstance(job.getReducerClass(), job);
        this.startCommunicationThread(umbilical);
        LocalFileSystem lfs = FileSystem.getLocal(job);
        boolean isLocal = true;
        if (!job.get("mapred.job.tracker", "local").equals("local")) {
            this.reduceCopier = new ReduceCopier(umbilical, job);
            if (!this.reduceCopier.fetchOutputs()) {
                throw new IOException(this.getTaskId() + "The reduce copier failed");
            }
            isLocal = false;
        }
        this.copyPhase.complete();
        Path[] mapFiles = this.getMapFiles(lfs, isLocal);
        Path tempDir = new Path(this.getTaskId());
        this.setPhase(TaskStatus.Phase.SORT);
        final Reporter reporter = this.getReporter(umbilical);
        SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, job.getOutputKeyComparator(), job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job);
        sorter.setProgressable(reporter);
        SequenceFile.Sorter.RawKeyValueIterator rIter = sorter.merge(mapFiles, tempDir, !this.conf.getKeepFailedTaskFiles());
        this.mapOutputFilesOnDisk.clear();
        mapFiles = null;
        this.sortPhase.complete();
        this.setPhase(TaskStatus.Phase.REDUCE);
        String finalName = ReduceTask.getOutputName(this.getPartition());
        FileSystem fs = FileSystem.get(job);
        final RecordWriter out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
        OutputCollector collector = new OutputCollector(){

            public void collect(Object key, Object value) throws IOException {
                out.write(key, value);
                ReduceTask.this.reduceOutputCounter.increment(1L);
                reporter.progress();
            }
        };
        try {
            Class<?> keyClass = job.getMapOutputKeyClass();
            Class<?> valClass = job.getMapOutputValueClass();
            ReduceValuesIterator values = new ReduceValuesIterator(rIter, job.getOutputValueGroupingComparator(), keyClass, valClass, job, reporter);
            values.informReduceProgress();
            while (values.more()) {
                this.reduceInputKeyCounter.increment(1L);
                reducer.reduce(values.getKey(), values, collector, reporter);
                values.nextKey();
                values.informReduceProgress();
            }
            reducer.close();
            out.close(reporter);
        }
        catch (IOException ioe) {
            try {
                reducer.close();
            }
            catch (IOException ignored) {
                // empty catch block
            }
            try {
                out.close(reporter);
            }
            catch (IOException ignored) {
                // empty catch block
            }
            throw ioe;
        }
        this.done(umbilical);
    }

    private static int getClosestPowerOf2(int value) {
        int approx;
        int power = 0;
        for (approx = 1; approx < value; approx <<= 1) {
            ++power;
        }
        if (value - (approx >> 1) < approx - value) {
            --power;
        }
        return power;
    }

    static {
        WritableFactories.setFactory(ReduceTask.class, new WritableFactory(){

            public Writable newInstance() {
                return new ReduceTask();
            }
        });
        LOG = LogFactory.getLog((String)ReduceTask.class.getName());
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class ReduceCopier
    implements MRConstants {
        private TaskUmbilicalProtocol umbilical;
        private static final int STALLED_COPY_TIMEOUT = 180000;
        private ReduceTask reduceTask;
        private List<MapOutputLocation> scheduledCopies;
        private List<CopyResult> copyResults;
        private int numCopiers;
        private int maxBackoff;
        private Map<String, Long> penaltyBox;
        private Set<String> uniqueHosts;
        private long lastPollTime;
        private InMemoryFileSystem inMemFileSys;
        private FileSystem localFileSys;
        private SequenceFile.Sorter sorter;
        private int ioSortFactor;
        private volatile Throwable mergeThrowable;
        private volatile boolean localFSMergeInProgress = false;
        private volatile boolean mergeInProgress = false;
        private int mergeThreshold = 500;
        private MapOutputCopier[] copiers = null;
        private ShuffleClientMetrics shuffleClientMetrics = null;
        private static final long MIN_POLL_INTERVAL = 1000L;
        private int probe_sample_size = 100;
        private List<MapOutputLocation> retryFetches = new ArrayList<MapOutputLocation>();
        private Set<Integer> neededOutputs = Collections.synchronizedSet(new TreeSet());
        private Set<String> obsoleteMapIds = Collections.synchronizedSet(new TreeSet());
        private Random random = null;
        private long ramfsMergeOutputSize;
        private int maxMapRuntime;
        private int maxFetchRetriesPerMap;
        private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
        private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
        private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
        private static final int MAX_FAILED_UNIQUE_FETCHES = 5;
        Set<Integer> fetchFailedMaps = new TreeSet<Integer>();
        Map<String, Integer> mapTaskToFailedFetchesMap = new HashMap<String, Integer>();
        private static final int BACKOFF_INIT = 4000;
        private int nextMapOutputCopierId = 0;
        private final PathFilter MAP_OUTPUT_FILTER = new PathFilter(){

            public boolean accept(Path file) {
                return file.toString().endsWith(".out");
            }
        };

        private int extractMapIdFromPathName(Path pathname) {
            String firstPathName = pathname.getName();
            int beginIndex = firstPathName.lastIndexOf("map_");
            int endIndex = firstPathName.lastIndexOf(".out");
            return Integer.parseInt(firstPathName.substring(beginIndex + "map_".length(), endIndex));
        }

        private void configureClasspath(JobConf conf) throws IOException {
            ReduceTask task = ReduceTask.this;
            ClassLoader parent = conf.getClassLoader();
            File workDir = new File(task.getJobFile()).getParentFile();
            ArrayList<URL> urllist = new ArrayList<URL>();
            String jar = conf.getJar();
            if (jar != null) {
                File jobCacheDir = new File(new Path(jar).getParent().toString());
                File[] libs = new File(jobCacheDir, "lib").listFiles();
                if (libs != null) {
                    for (int i = 0; i < libs.length; ++i) {
                        urllist.add(libs[i].toURL());
                    }
                }
                urllist.add(new File(jobCacheDir, "classes").toURL());
                urllist.add(jobCacheDir.toURL());
            }
            urllist.add(workDir.toURL());
            URL[] urls = urllist.toArray(new URL[urllist.size()]);
            URLClassLoader loader = new URLClassLoader(urls, parent);
            conf.setClassLoader(loader);
        }

        public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf) throws IOException {
            this.configureClasspath(conf);
            this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
            this.umbilical = umbilical;
            this.reduceTask = ReduceTask.this;
            this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
            this.copyResults = new ArrayList<CopyResult>(100);
            this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
            this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
            this.ioSortFactor = conf.getInt("io.sort.factor", 10);
            this.maxFetchRetriesPerMap = ReduceTask.getClosestPowerOf2(this.maxBackoff * 1000 / 4000 + 1);
            this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
            URI uri = URI.create("ramfs://mapoutput" + this.reduceTask.hashCode());
            this.inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
            LOG.info((Object)(this.reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: " + uri));
            this.ramfsMergeOutputSize = (long)(0.5f * (float)this.inMemFileSys.getFSSize());
            this.localFileSys = FileSystem.getLocal(conf);
            this.sorter = new SequenceFile.Sorter(this.inMemFileSys, conf.getOutputKeyComparator(), conf.getMapOutputKeyClass(), conf.getMapOutputValueClass(), conf);
            this.sorter.setProgressable(ReduceTask.this.getReporter(umbilical));
            this.penaltyBox = new Hashtable<String, Long>();
            this.uniqueHosts = new HashSet<String>();
            this.lastPollTime = 0L;
            long randomSeed = System.nanoTime() + (long)Math.pow(this.reduceTask.getPartition(), this.reduceTask.getPartition() % 10);
            this.random = new Random(randomSeed);
            this.maxMapRuntime = 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public boolean fetchOutputs() throws IOException {
            long startTime;
            int numOutputs = this.reduceTask.getNumMaps();
            ArrayList<MapOutputLocation> knownOutputs = new ArrayList<MapOutputLocation>(this.numCopiers);
            int totalFailures = 0;
            int numInFlight = 0;
            int numCopied = 0;
            int lowThreshold = this.numCopiers * 2;
            long bytesTransferred = 0L;
            DecimalFormat mbpsFormat = new DecimalFormat("0.00");
            Progress copyPhase = this.reduceTask.getProgress().phase();
            this.probe_sample_size = Math.max(this.numCopiers * 5, 50);
            for (int i = 0; i < numOutputs; ++i) {
                this.neededOutputs.add(i);
                copyPhase.addPhase();
            }
            this.copiers = new MapOutputCopier[this.numCopiers];
            Reporter reporter = ReduceTask.this.getReporter(this.umbilical);
            SequenceFile.Sorter localFileSystemSorter = new SequenceFile.Sorter(this.localFileSys, ReduceTask.this.conf.getOutputKeyComparator(), ReduceTask.this.conf.getMapOutputKeyClass(), ReduceTask.this.conf.getMapOutputValueClass(), ReduceTask.this.conf);
            localFileSystemSorter.setProgressable(reporter);
            for (int i = 0; i < this.copiers.length; ++i) {
                this.copiers[i] = new MapOutputCopier(reporter);
                this.copiers[i].start();
            }
            long currentTime = startTime = System.currentTimeMillis();
            long lastProgressTime = System.currentTimeMillis();
            IntWritable fromEventId = new IntWritable(0);
            try {
                block30: while (!this.neededOutputs.isEmpty() && this.mergeThrowable == null) {
                    LOG.info((Object)(this.reduceTask.getTaskId() + " Need " + this.neededOutputs.size() + " map output(s)"));
                    try {
                        knownOutputs.addAll(this.retryFetches);
                        boolean busy = this.isBusy(numInFlight, this.numCopiers, lowThreshold, this.uniqueHosts.size(), this.probe_sample_size, numOutputs - numCopied);
                        if (!busy) {
                            int currentNumKnownMaps = knownOutputs.size();
                            int currentNumObsoleteMapIds = this.obsoleteMapIds.size();
                            this.getMapCompletionEvents(fromEventId, knownOutputs);
                            LOG.info((Object)(this.reduceTask.getTaskId() + ": " + "Got " + (knownOutputs.size() - currentNumKnownMaps) + " new map-outputs & " + (this.obsoleteMapIds.size() - currentNumObsoleteMapIds) + " obsolete map-outputs from tasktracker and " + this.retryFetches.size() + " map-outputs from previous failures"));
                        } else {
                            LOG.info((Object)(" Busy enough - did not query the tasktracker for new map outputs. Have " + this.retryFetches.size() + " map outputs from previous failures"));
                        }
                        this.retryFetches.clear();
                    }
                    catch (IOException ie) {
                        LOG.warn((Object)(this.reduceTask.getTaskId() + " Problem locating map outputs: " + StringUtils.stringifyException(ie)));
                    }
                    int numKnown = knownOutputs.size();
                    int numScheduled2 = 0;
                    int numSlow = 0;
                    int numDups = 0;
                    LOG.info((Object)(this.reduceTask.getTaskId() + " Got " + numKnown + " known map output location(s); scheduling..."));
                    SequencedCollection<MapOutputLocation> sequencedCollection = this.scheduledCopies;
                    synchronized (sequencedCollection) {
                        Collections.shuffle(knownOutputs, this.random);
                        Iterator locIt = knownOutputs.iterator();
                        currentTime = System.currentTimeMillis();
                        while (locIt.hasNext()) {
                            MapOutputLocation loc = (MapOutputLocation)locIt.next();
                            if (this.obsoleteMapIds.contains(loc.getMapTaskId())) {
                                locIt.remove();
                                continue;
                            }
                            Long penaltyEnd = this.penaltyBox.get(loc.getHost());
                            boolean penalized = false;
                            boolean duplicate = false;
                            if (penaltyEnd != null && currentTime < penaltyEnd) {
                                penalized = true;
                                ++numSlow;
                            }
                            if (this.uniqueHosts.contains(loc.getHost())) {
                                duplicate = true;
                                ++numDups;
                            }
                            if (penalized || duplicate) continue;
                            this.uniqueHosts.add(loc.getHost());
                            this.scheduledCopies.add(loc);
                            locIt.remove();
                            ++numInFlight;
                            ++numScheduled2;
                        }
                        this.scheduledCopies.notifyAll();
                    }
                    LOG.info((Object)(this.reduceTask.getTaskId() + " Scheduled " + numScheduled2 + " of " + numKnown + " known outputs (" + numSlow + " slow hosts and " + numDups + " dup hosts)"));
                    sequencedCollection = ReduceTask.this.mapOutputFilesOnDisk;
                    synchronized (sequencedCollection) {
                        if (!this.localFSMergeInProgress && ReduceTask.this.mapOutputFilesOnDisk.size() >= 2 * this.ioSortFactor - 1) {
                            this.localFSMergeInProgress = true;
                            LocalFSMerger lfsm = new LocalFSMerger((LocalFileSystem)this.localFileSys, localFileSystemSorter);
                            lfsm.setName("Thread for merging on-disk files");
                            lfsm.setDaemon(true);
                            lfsm.start();
                        }
                    }
                    try {
                        if (numInFlight == 0 && numScheduled2 == 0) {
                            reporter.progress();
                            Thread.sleep(5000L);
                        }
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    while (numInFlight > 0 && this.mergeThrowable == null) {
                        Object mapTaskId;
                        LOG.debug((Object)(this.reduceTask.getTaskId() + " numInFlight = " + numInFlight));
                        CopyResult cr = this.getCopyResult();
                        if (cr != null) {
                            if (cr.getSuccess()) {
                                ++numCopied;
                                lastProgressTime = System.currentTimeMillis();
                                long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000L + 1L;
                                float mbs = (float)(bytesTransferred += cr.getSize()) / 1048576.0f;
                                float transferRate = mbs / (float)secsSinceStart;
                                copyPhase.startNextPhase();
                                copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " + mbpsFormat.format(transferRate) + " MB/s)");
                                this.fetchFailedMaps.remove(cr.getLocation().getMapId());
                            } else if (cr.isObsolete()) {
                                LOG.info((Object)(this.reduceTask.getTaskId() + " Ignoring obsolete copy result for Map Task: " + cr.getLocation().getMapTaskId() + " from host: " + cr.getHost()));
                            } else {
                                this.retryFetches.add(cr.getLocation());
                                mapTaskId = cr.getLocation().getMapTaskId();
                                Integer mapId = cr.getLocation().getMapId();
                                ++totalFailures;
                                Integer noFailedFetches = this.mapTaskToFailedFetchesMap.get(mapTaskId);
                                noFailedFetches = noFailedFetches == null ? 1 : noFailedFetches + 1;
                                this.mapTaskToFailedFetchesMap.put((String)mapTaskId, noFailedFetches);
                                LOG.info((Object)("Task " + ReduceTask.this.getTaskId() + ": Failed fetch #" + noFailedFetches + " from " + (String)mapTaskId));
                                if (noFailedFetches >= this.maxFetchRetriesPerMap && (noFailedFetches - this.maxFetchRetriesPerMap) % 2 == 0) {
                                    ReduceTask transferRate = ReduceTask.this;
                                    synchronized (transferRate) {
                                        ReduceTask.this.taskStatus.addFetchFailedMap((String)mapTaskId);
                                        LOG.info((Object)("Failed to fetch map-output from " + (String)mapTaskId + " even after MAX_FETCH_RETRIES_PER_MAP retries... " + " reporting to the JobTracker"));
                                    }
                                }
                                if (noFailedFetches == this.maxFetchRetriesPerMap) {
                                    boolean reducerStalled;
                                    this.fetchFailedMaps.add(mapId);
                                    boolean reducerHealthy = (float)totalFailures / (float)(totalFailures + numCopied) < 0.5f;
                                    boolean reducerProgressedEnough = (float)numCopied / (float)ReduceTask.this.numMaps >= 0.5f;
                                    int stallDuration = (int)(System.currentTimeMillis() - lastProgressTime);
                                    int shuffleProgressDuration = (int)(lastProgressTime - startTime);
                                    int minShuffleRunDuration = shuffleProgressDuration > this.maxMapRuntime ? shuffleProgressDuration : this.maxMapRuntime;
                                    boolean bl = reducerStalled = (float)stallDuration / (float)minShuffleRunDuration >= 0.5f;
                                    if (!(this.fetchFailedMaps.size() < 5 || reducerHealthy || reducerProgressedEnough && !reducerStalled)) {
                                        LOG.fatal((Object)("Shuffle failed with too many fetch failures and insufficient progress!Killing task " + ReduceTask.this.getTaskId() + "."));
                                        this.umbilical.shuffleError(ReduceTask.this.getTaskId(), "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.");
                                    }
                                }
                                currentTime = System.currentTimeMillis();
                                int currentBackOff = noFailedFetches <= this.maxFetchRetriesPerMap ? 4000 * (1 << noFailedFetches - 1) : this.maxBackoff * 1000 / 2;
                                this.penaltyBox.put(cr.getHost(), currentTime + (long)currentBackOff);
                                LOG.warn((Object)(this.reduceTask.getTaskId() + " adding host " + cr.getHost() + " to penalty box, next contact in " + currentBackOff / 1000 + " seconds"));
                                Iterator locIt = knownOutputs.iterator();
                                while (locIt.hasNext()) {
                                    MapOutputLocation loc = (MapOutputLocation)locIt.next();
                                    if (!cr.getHost().equals(loc.getHost())) continue;
                                    this.retryFetches.add(loc);
                                    locIt.remove();
                                }
                            }
                            this.uniqueHosts.remove(cr.getHost());
                            --numInFlight;
                        }
                        mapTaskId = this.copyResults;
                        synchronized (mapTaskId) {
                            if (this.copyResults.size() == 0) {
                                continue block30;
                            }
                        }
                    }
                }
                MapOutputCopier[] numKnown = this.copiers;
                synchronized (this.copiers) {
                    List<MapOutputLocation> numScheduled2 = this.scheduledCopies;
                    synchronized (numScheduled2) {
                        for (int i = 0; i < this.copiers.length; ++i) {
                            this.copiers[i].interrupt();
                            this.copiers[i] = null;
                        }
                    }
                    // ** MonitorExit[numKnown] (shouldn't be in output)
                    if (this.mergeThrowable == null) {
                        Path[] inMemClosedFiles;
                        try {
                            while (this.localFSMergeInProgress) {
                                Thread.sleep(200L);
                            }
                            while (this.mergeInProgress) {
                                Thread.sleep(200L);
                            }
                            LOG.info((Object)(this.reduceTask.getTaskId() + " Copying of all map outputs complete. " + "Initiating the last merge on the remaining files in " + this.inMemFileSys.getUri()));
                            if (this.mergeThrowable != null) {
                                throw this.mergeThrowable;
                            }
                            inMemClosedFiles = this.inMemFileSys.getFiles(this.MAP_OUTPUT_FILTER);
                            if (inMemClosedFiles.length == 0) {
                                LOG.info((Object)(this.reduceTask.getTaskId() + "Nothing to merge from " + this.inMemFileSys.getUri()));
                                boolean numScheduled2 = this.neededOutputs.isEmpty();
                                return numScheduled2;
                            }
                        }
                        catch (Throwable t) {
                            LOG.warn((Object)(this.reduceTask.getTaskId() + " Final merge of the inmemory files threw an exception: " + StringUtils.stringifyException(t)));
                            if (this.mergeThrowable != null) {
                                this.mergeThrowable = t;
                            }
                            boolean bl = false;
                            this.inMemFileSys.close();
                            return bl;
                        }
                        {
                            int mapId = this.extractMapIdFromPathName(inMemClosedFiles[0]);
                            Path outputPath = ReduceTask.this.mapOutputFile.getInputFileForWrite(mapId, this.reduceTask.getTaskId(), this.ramfsMergeOutputSize);
                            SequenceFile.Writer writer = this.sorter.cloneFileAttributes(this.inMemFileSys.makeQualified(inMemClosedFiles[0]), this.localFileSys.makeQualified(outputPath), null);
                            SequenceFile.Sorter.RawKeyValueIterator rIter = null;
                            try {
                                rIter = this.sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, new Path(this.reduceTask.getTaskId()));
                            }
                            catch (Exception e) {
                                writer.close();
                                this.localFileSys.delete(inMemClosedFiles[0], true);
                                throw new IOException(StringUtils.stringifyException(e));
                            }
                            this.sorter.writeFile(rIter, writer);
                            writer.close();
                            LOG.info((Object)(this.reduceTask.getTaskId() + " Merge of the " + inMemClosedFiles.length + " files in InMemoryFileSystem complete." + " Local file is " + outputPath));
                            FileStatus status = this.localFileSys.getFileStatus(outputPath);
                            SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
                            synchronized (sortedSet) {
                                ReduceTask.this.mapOutputFilesOnDisk.add(status);
                            }
                        }
                    }
                    boolean bl = this.mergeThrowable == null && this.neededOutputs.isEmpty();
                    return bl;
                }
            }
            finally {
                this.inMemFileSys.close();
            }
        }

        private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold, int uniqueHostsSize, int probeSampleSize, int remainCopy) {
            return (numInFlight >= lowThreshold || remainCopy <= probeSampleSize) && uniqueHostsSize >= numCopiers;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CopyResult getCopyResult() {
            List<CopyResult> list = this.copyResults;
            synchronized (list) {
                if (this.copyResults.isEmpty()) {
                    try {
                        this.copyResults.wait(2000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                if (this.copyResults.isEmpty()) {
                    return null;
                }
                return this.copyResults.remove(0);
            }
        }

        private void getMapCompletionEvents(IntWritable fromEventId, List<MapOutputLocation> knownOutputs) throws IOException {
            long currentTime = System.currentTimeMillis();
            long pollTime = this.lastPollTime + 1000L;
            while (currentTime < pollTime) {
                try {
                    Thread.sleep(pollTime - currentTime);
                }
                catch (InterruptedException ie) {
                    // empty catch block
                }
                currentTime = System.currentTimeMillis();
            }
            TaskCompletionEvent[] events = this.umbilical.getMapCompletionEvents(this.reduceTask.getJobId(), fromEventId.get(), this.probe_sample_size);
            this.lastPollTime = currentTime;
            fromEventId.set(fromEventId.get() + events.length);
            block8: for (TaskCompletionEvent event : events) {
                switch (event.getTaskStatus()) {
                    case SUCCEEDED: {
                        URI u = URI.create(event.getTaskTrackerHttp());
                        String host = u.getHost();
                        int port = u.getPort();
                        String taskId = event.getTaskId();
                        int mId = event.idWithinJob();
                        int duration = event.getTaskRunTime();
                        if (duration > this.maxMapRuntime) {
                            this.maxMapRuntime = duration;
                            this.maxFetchRetriesPerMap = ReduceTask.getClosestPowerOf2(this.maxMapRuntime / 4000 + 1);
                        }
                        knownOutputs.add(new MapOutputLocation(this.reduceTask.getJobId(), taskId, mId, host, port));
                        continue block8;
                    }
                    case FAILED: 
                    case KILLED: 
                    case OBSOLETE: {
                        this.obsoleteMapIds.add(event.getTaskId());
                        LOG.info((Object)("Ignoring obsolete output of " + (Object)((Object)event.getTaskStatus()) + " map-task: '" + event.getTaskId() + "'"));
                        continue block8;
                    }
                    case TIPFAILED: {
                        this.neededOutputs.remove(event.idWithinJob());
                        LOG.info((Object)("Ignoring output of failed map TIP: '" + event.getTaskId() + "'"));
                    }
                }
            }
        }

        static /* synthetic */ int access$508(ReduceCopier x0) {
            return x0.nextMapOutputCopierId++;
        }

        static /* synthetic */ List access$900(ReduceCopier x0) {
            return x0.scheduledCopies;
        }

        private class InMemFSMergeThread
        extends Thread {
            private InMemoryFileSystem inMemFileSys;
            private LocalFileSystem localFileSys;
            private SequenceFile.Sorter sorter;

            public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
                this.inMemFileSys = inMemFileSys;
                this.localFileSys = localFileSys;
                this.sorter = sorter;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                block14: {
                    LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Thread started: " + this.getName()));
                    try {
                        Path[] inMemClosedFiles;
                        ReduceTask reduceTask = ReduceTask.this;
                        synchronized (reduceTask) {
                            inMemClosedFiles = this.inMemFileSys.getFiles(ReduceCopier.this.MAP_OUTPUT_FILTER);
                        }
                        if (inMemClosedFiles.length >= 2) {
                            SequenceFile.Sorter.RawKeyValueIterator rIter;
                            int mapId = ReduceCopier.this.extractMapIdFromPathName(inMemClosedFiles[0]);
                            Path outputPath = ReduceTask.this.mapOutputFile.getInputFileForWrite(mapId, ReduceCopier.this.reduceTask.getTaskId(), ReduceCopier.this.ramfsMergeOutputSize);
                            SequenceFile.Writer writer = this.sorter.cloneFileAttributes(this.inMemFileSys.makeQualified(inMemClosedFiles[0]), this.localFileSys.makeQualified(outputPath), null);
                            try {
                                rIter = this.sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, new Path(ReduceCopier.this.reduceTask.getTaskId()));
                            }
                            catch (Exception e) {
                                writer.close();
                                this.localFileSys.delete(outputPath, true);
                                throw new IOException(StringUtils.stringifyException(e));
                            }
                            this.sorter.writeFile(rIter, writer);
                            writer.close();
                            LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Merge of the " + inMemClosedFiles.length + " files in InMemoryFileSystem complete." + " Local file is " + outputPath));
                            FileStatus status = this.localFileSys.getFileStatus(outputPath);
                            SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
                            synchronized (sortedSet) {
                                ReduceTask.this.mapOutputFilesOnDisk.add(status);
                                break block14;
                            }
                        }
                        LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Nothing to merge from " + this.inMemFileSys.getUri()));
                    }
                    catch (Throwable t) {
                        LOG.warn((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Intermediate Merge of the inmemory files threw an exception: " + StringUtils.stringifyException(t)));
                        ReduceCopier.this.mergeThrowable = t;
                    }
                    finally {
                        ReduceCopier.this.mergeInProgress = false;
                    }
                }
            }
        }

        private class LocalFSMerger
        extends Thread {
            private LocalFileSystem localFileSys;
            private SequenceFile.Sorter sorter;

            public LocalFSMerger(LocalFileSystem fs, SequenceFile.Sorter sorter) {
                this.localFileSys = fs;
                this.sorter = sorter;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                try {
                    Path[] mapFiles = new Path[ReduceCopier.this.ioSortFactor];
                    long approxOutputSize = 0L;
                    int bytesPerSum = ReduceCopier.this.reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
                    LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Merging map output files on disk"));
                    SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
                    synchronized (sortedSet) {
                        for (int i = 0; i < ReduceCopier.this.ioSortFactor; ++i) {
                            FileStatus filestatus = (FileStatus)ReduceTask.this.mapOutputFilesOnDisk.first();
                            ReduceTask.this.mapOutputFilesOnDisk.remove(filestatus);
                            mapFiles[i] = filestatus.getPath();
                            approxOutputSize += filestatus.getLen();
                        }
                    }
                    approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
                    Path outputPath = ReduceTask.this.lDirAlloc.getLocalPathForWrite(mapFiles[0].toString(), approxOutputSize, ReduceTask.this.conf).suffix(".merged");
                    SequenceFile.Writer writer = this.sorter.cloneFileAttributes(mapFiles[0], outputPath, null);
                    SequenceFile.Sorter.RawKeyValueIterator iter = null;
                    Path tmpDir = new Path(ReduceCopier.this.reduceTask.getTaskId());
                    try {
                        iter = this.sorter.merge(mapFiles, true, ReduceCopier.this.ioSortFactor, tmpDir);
                    }
                    catch (Exception e) {
                        writer.close();
                        this.localFileSys.delete(outputPath, true);
                        throw new IOException(StringUtils.stringifyException(e));
                    }
                    this.sorter.writeFile(iter, writer);
                    writer.close();
                    SortedSet sortedSet2 = ReduceTask.this.mapOutputFilesOnDisk;
                    synchronized (sortedSet2) {
                        ReduceTask.this.mapOutputFilesOnDisk.add(this.localFileSys.getFileStatus(outputPath));
                    }
                    LOG.info((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Finished merging map output files on disk."));
                }
                catch (Throwable t) {
                    LOG.warn((Object)(ReduceCopier.this.reduceTask.getTaskId() + " Merging of the local FS files threw an exception: " + StringUtils.stringifyException(t)));
                    if (ReduceCopier.this.mergeThrowable == null) {
                        ReduceCopier.this.mergeThrowable = t;
                    }
                }
                finally {
                    ReduceCopier.this.localFSMergeInProgress = false;
                }
            }
        }

        private class MapOutputCopier
        extends Thread {
            private MapOutputLocation currentLocation = null;
            private int id = ReduceCopier.access$508(ReduceCopier.this);
            private Reporter reporter;

            public MapOutputCopier(Reporter reporter) {
                this.setName("MapOutputCopier " + ReduceCopier.this.reduceTask.getTaskId() + "." + this.id);
                LOG.debug((Object)(this.getName() + " created"));
                this.reporter = reporter;
            }

            public synchronized boolean fail() {
                if (this.currentLocation != null) {
                    this.finish(-1L);
                    return true;
                }
                return false;
            }

            public synchronized MapOutputLocation getLocation() {
                return this.currentLocation;
            }

            private synchronized void start(MapOutputLocation loc) {
                this.currentLocation = loc;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private synchronized void finish(long size) {
                if (this.currentLocation != null) {
                    LOG.debug((Object)(this.getName() + " finishing " + this.currentLocation + " =" + size));
                    List list = ReduceCopier.this.copyResults;
                    synchronized (list) {
                        ReduceCopier.this.copyResults.add(new CopyResult(this.currentLocation, size));
                        ReduceCopier.this.copyResults.notify();
                    }
                    this.currentLocation = null;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Unable to fully structure code
             */
            public void run() {
                while (true) {
                    try {
                        while (true) lbl-1000:
                        // 4 sources

                        {
                            loc = null;
                            size = -1L;
                            var4_5 = ReduceCopier.access$900(ReduceCopier.this);
                            synchronized (var4_5) {
                                while (ReduceCopier.access$900(ReduceCopier.this).isEmpty()) {
                                    ReduceCopier.access$900(ReduceCopier.this).wait();
                                }
                                loc = (MapOutputLocation)ReduceCopier.access$900(ReduceCopier.this).remove(0);
                            }
                            try {
                                ReduceCopier.access$1000(ReduceCopier.this).threadBusy();
                                this.start(loc);
                                size = this.copyOutput(loc);
                                ReduceCopier.access$1000(ReduceCopier.this).successFetch();
                            }
                            catch (IOException e) {
                                ReduceTask.access$700().warn((Object)(ReduceCopier.access$600(ReduceCopier.this).getTaskId() + " copy failed: " + loc.getMapTaskId() + " from " + loc.getHost()));
                                ReduceTask.access$700().warn((Object)StringUtils.stringifyException(e));
                                ReduceCopier.access$1000(ReduceCopier.this).failedFetch();
                                size = -1L;
                            }
                            finally {
                                ReduceCopier.access$1000(ReduceCopier.this).threadFree();
                                this.finish(size);
                                continue;
                            }
                            break;
                        }
                    }
                    catch (InterruptedException e) {
                        return;
                    }
                    catch (Throwable th) {
                        ReduceTask.access$700().error((Object)("Map output copy failure: " + StringUtils.stringifyException(th)));
                        continue;
                    }
                    ** GOTO lbl-1000
                    break;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private long copyOutput(MapOutputLocation loc) throws IOException, InterruptedException {
                if (!ReduceCopier.this.neededOutputs.contains(loc.getMapId()) || ReduceCopier.this.obsoleteMapIds.contains(loc.getMapTaskId())) {
                    return -2L;
                }
                String reduceId = ReduceCopier.this.reduceTask.getTaskId();
                LOG.info((Object)(reduceId + " Copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."));
                Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() + "/" + ReduceTask.this.getJobId() + "/" + reduceId + "/" + "output" + "/map_" + loc.getMapId() + ".out");
                Path tmpFilename = new Path(filename + "-" + this.id);
                tmpFilename = loc.getFile(ReduceCopier.this.inMemFileSys, ReduceCopier.this.localFileSys, ReduceCopier.this.shuffleClientMetrics, tmpFilename, ReduceTask.this.lDirAlloc, ReduceTask.this.conf, ReduceCopier.this.reduceTask.getPartition(), 180000, this.reporter);
                if (!ReduceCopier.this.neededOutputs.contains(loc.getMapId())) {
                    if (tmpFilename != null) {
                        FileSystem fs = tmpFilename.getFileSystem(ReduceTask.this.conf);
                        fs.delete(tmpFilename, true);
                    }
                    return -2L;
                }
                if (tmpFilename == null) {
                    throw new IOException("File " + filename + "-" + this.id + " not created");
                }
                FileSystem fs = tmpFilename.getFileSystem(ReduceTask.this.conf);
                long bytes = -1L;
                ReduceTask reduceTask = ReduceTask.this;
                synchronized (reduceTask) {
                    if (!ReduceCopier.this.neededOutputs.contains(loc.getMapId())) {
                        fs.delete(tmpFilename, true);
                        return -2L;
                    }
                    bytes = fs.getFileStatus(tmpFilename).getLen();
                    filename = new Path(tmpFilename.getParent(), filename.getName());
                    if (!fs.rename(tmpFilename, filename)) {
                        fs.delete(tmpFilename, true);
                        bytes = -1L;
                        throw new IOException("failure to rename map output " + tmpFilename);
                    }
                    LOG.info((Object)(reduceId + " done copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."));
                    if (!ReduceCopier.this.mergeInProgress && (ReduceCopier.this.inMemFileSys.getPercentUsed() >= 0.5f || ReduceCopier.this.mergeThreshold > 0 && ReduceCopier.this.inMemFileSys.getNumFiles(ReduceCopier.this.MAP_OUTPUT_FILTER) >= ReduceCopier.this.mergeThreshold) && ReduceCopier.this.mergeThrowable == null) {
                        LOG.info((Object)(reduceId + " InMemoryFileSystem " + ReduceCopier.this.inMemFileSys.getUri().toString() + " is " + ReduceCopier.this.inMemFileSys.getPercentUsed() + " full. Triggering merge"));
                        InMemFSMergeThread m = new InMemFSMergeThread(ReduceCopier.this.inMemFileSys, (LocalFileSystem)ReduceCopier.this.localFileSys, ReduceCopier.this.sorter);
                        m.setName("Thread for merging in memory files");
                        m.setDaemon(true);
                        ReduceCopier.this.mergeInProgress = true;
                        m.start();
                    }
                    ReduceCopier.this.neededOutputs.remove(loc.getMapId());
                }
                String localFSScheme = ReduceCopier.this.localFileSys.getUri().getScheme();
                String outputFileScheme = fs.getUri().getScheme();
                if (localFSScheme.equals(outputFileScheme)) {
                    SortedSet sortedSet = ReduceTask.this.mapOutputFilesOnDisk;
                    synchronized (sortedSet) {
                        ReduceTask.this.mapOutputFilesOnDisk.add(fs.getFileStatus(filename));
                    }
                }
                return bytes;
            }
        }

        private class CopyResult {
            private final MapOutputLocation loc;
            private final long size;
            private static final int OBSOLETE = -2;

            CopyResult(MapOutputLocation loc, long size) {
                this.loc = loc;
                this.size = size;
            }

            public int getMapId() {
                return this.loc.getMapId();
            }

            public boolean getSuccess() {
                return this.size >= 0L;
            }

            public boolean isObsolete() {
                return this.size == -2L;
            }

            public long getSize() {
                return this.size;
            }

            public String getHost() {
                return this.loc.getHost();
            }

            public MapOutputLocation getLocation() {
                return this.loc;
            }
        }

        class ShuffleClientMetrics
        implements Updater {
            private MetricsRecord shuffleMetrics = null;
            private int numFailedFetches = 0;
            private int numSuccessFetches = 0;
            private long numBytes = 0L;
            private int numThreadsBusy = 0;

            ShuffleClientMetrics(JobConf conf) {
                MetricsContext metricsContext = MetricsUtil.getContext("mapred");
                this.shuffleMetrics = MetricsUtil.createRecord(metricsContext, "shuffleInput");
                this.shuffleMetrics.setTag("user", conf.getUser());
                this.shuffleMetrics.setTag("jobName", conf.getJobName());
                this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobId());
                this.shuffleMetrics.setTag("taskId", ReduceTask.this.getTaskId());
                this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
                metricsContext.registerUpdater(this);
            }

            public synchronized void inputBytes(long numBytes) {
                this.numBytes += numBytes;
            }

            public synchronized void failedFetch() {
                ++this.numFailedFetches;
            }

            public synchronized void successFetch() {
                ++this.numSuccessFetches;
            }

            public synchronized void threadBusy() {
                ++this.numThreadsBusy;
            }

            public synchronized void threadFree() {
                --this.numThreadsBusy;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void doUpdates(MetricsContext unused) {
                ShuffleClientMetrics shuffleClientMetrics = this;
                synchronized (shuffleClientMetrics) {
                    this.shuffleMetrics.incrMetric("shuffle_input_bytes", this.numBytes);
                    this.shuffleMetrics.incrMetric("shuffle_failed_fetches", this.numFailedFetches);
                    this.shuffleMetrics.incrMetric("shuffle_success_fetches", this.numSuccessFetches);
                    if (ReduceCopier.this.numCopiers != 0) {
                        this.shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 100.0f * ((float)this.numThreadsBusy / (float)ReduceCopier.this.numCopiers));
                    } else {
                        this.shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
                    }
                    this.numBytes = 0L;
                    this.numSuccessFetches = 0;
                    this.numFailedFetches = 0;
                }
                this.shuffleMetrics.update();
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class ReduceValuesIterator<KEY, VALUE>
    extends ValuesIterator<KEY, VALUE> {
        public ReduceValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, RawComparator<KEY> comparator, Class<KEY> keyClass, Class<VALUE> valClass, Configuration conf, Progressable reporter) throws IOException {
            super(in, comparator, keyClass, valClass, conf, reporter);
        }

        public void informReduceProgress() {
            ReduceTask.this.reducePhase.set(((ValuesIterator)this).in.getProgress().get());
            this.reporter.progress();
        }

        @Override
        public VALUE next() {
            ReduceTask.this.reduceInputValueCounter.increment(1L);
            return super.next();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    static class ValuesIterator<KEY, VALUE>
    implements Iterator<VALUE> {
        private SequenceFile.Sorter.RawKeyValueIterator in;
        private KEY key;
        private KEY nextKey;
        private VALUE value;
        private boolean hasNext;
        private boolean more;
        private RawComparator<KEY> comparator;
        private DataOutputBuffer nextValue = new DataOutputBuffer();
        private InputBuffer valIn = new InputBuffer();
        private InputBuffer keyIn = new InputBuffer();
        protected Progressable reporter;
        private Deserializer<KEY> keyDeserializer;
        private Deserializer<VALUE> valDeserializer;

        public ValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, RawComparator<KEY> comparator, Class<KEY> keyClass, Class<VALUE> valClass, Configuration conf, Progressable reporter) throws IOException {
            this.in = in;
            this.comparator = comparator;
            this.reporter = reporter;
            SerializationFactory serializationFactory = new SerializationFactory(conf);
            this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
            this.keyDeserializer.open(this.keyIn);
            this.valDeserializer = serializationFactory.getDeserializer(valClass);
            this.valDeserializer.open(this.valIn);
            this.readNextKey();
            this.key = this.nextKey;
            this.nextKey = null;
            this.hasNext = this.more;
        }

        @Override
        public boolean hasNext() {
            return this.hasNext;
        }

        @Override
        public VALUE next() {
            if (!this.hasNext) {
                throw new NoSuchElementException("iterate past last value");
            }
            try {
                this.readNextValue();
                this.readNextKey();
            }
            catch (IOException ie) {
                throw new RuntimeException("problem advancing", ie);
            }
            this.reporter.progress();
            return this.value;
        }

        @Override
        public void remove() {
            throw new RuntimeException("not implemented");
        }

        public void nextKey() throws IOException {
            while (this.hasNext) {
                this.readNextKey();
            }
            KEY tmpKey = this.key;
            this.key = this.nextKey;
            this.nextKey = tmpKey;
            this.hasNext = this.more;
        }

        public boolean more() {
            return this.more;
        }

        public Object getKey() {
            return this.key;
        }

        private void readNextKey() throws IOException {
            this.more = this.in.next();
            if (this.more) {
                DataOutputBuffer nextKeyBytes = this.in.getKey();
                this.keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength());
                this.nextKey = this.keyDeserializer.deserialize(this.nextKey);
                this.hasNext = this.key != null && this.comparator.compare(this.key, this.nextKey) == 0;
            } else {
                this.hasNext = false;
            }
        }

        private void readNextValue() throws IOException {
            this.nextValue.reset();
            this.in.getValue().writeUncompressedBytes(this.nextValue);
            this.valIn.reset(this.nextValue.getData(), this.nextValue.getLength());
            this.value = this.valDeserializer.deserialize(this.value);
        }
    }
}

