Значение глобальной переменной не меняется после цикла

Я занимаюсь разработкой проекта hadoop. Я хочу найти клиентов в определенный день, а затем написать тех, у кого максимальное потребление в этот день. В моем классе редуктора по какой-то причине глобальная переменнаяМаксимум не меняет своего значения после цикла for.

РЕДАКТИРОВАТЬ Я хочу найти клиентов с максимальным потреблением в определенный день. Мне удалось найти клиентов в нужную мне дату, но я столкнулся с проблемой в своем классе редукторов. Вот код:

РЕДАКТИРОВАТЬ № 2 Я уже знаю, что значения (потребление) являются натуральными числами. Поэтому в моем выходном файле я хочу быть только клиентами определенного дня с максимальным потреблением.

РЕДАКТИРОВАТЬ № 3 Мой входной файл состоит из множества данных. Имеет три столбца; идентификатор клиента, метка времени (гггг-мм-дд чч: мм: сс) и потребление

Класс водителя

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class alicanteDriver {

    public static void main(String[] args) throws Exception {
        long t_start = System.currentTimeMillis();
        long t_end;

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Alicante");
        job.setJarByClass(alicanteDriver.class);

        job.setMapperClass(alicanteMapperC.class);        

        //job.setCombinerClass(alicanteCombiner.class);

        job.setPartitionerClass(alicantePartitioner.class);

        job.setNumReduceTasks(2);

        job.setReducerClass(alicanteReducerC.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
        job.waitForCompletion(true);
        t_end = System.currentTimeMillis();

        System.out.println((t_end-t_start)/1000);
    }
 }

Класс Mapper

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class alicanteMapperC extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    String Customer = new String();
    SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date t = new Date();
    IntWritable Consumption = new IntWritable();
    int counter = 0;

    // new vars
    int max = 0;

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        Date d2 = null;
        try {
            d2 = ft.parse("2013-07-01 01:00:00");
        } catch (ParseException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        if (counter > 0) {

            String line = value.toString();
            StringTokenizer itr = new StringTokenizer(line, ",");

            while (itr.hasMoreTokens()) {
                Customer = itr.nextToken();
                try {
                    t = ft.parse(itr.nextToken());
                } catch (ParseException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                Consumption.set(Integer.parseInt(itr.nextToken()));

                //sort out as many values as possible
                if(Consumption.get() > max) {
                    max = Consumption.get();
                }

                //find customers in a certain date
                if (t.compareTo(d2) == 0 && Consumption.get() == max) {
                    context.write(new Text(Customer), Consumption);
                }
            }
        }
        counter++;
    }
}

Класс редуктора

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;

public class alicanteReducerC extends
        Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int max = 0; //this var

        // declaration of Lists
        List<Text> l1 = new ArrayList<Text>();
        List<IntWritable> l2 = new ArrayList<IntWritable>();

        for (IntWritable val : values) {
            if (val.get() > max) {
                max = val.get();
            }
            l1.add(key);
            l2.add(val);
        }

        for (int i = 0; i < l1.size(); i++) {
            if (l2.get(i).get() == max) {
                context.write(key, new IntWritable(max));
            }
        }
    }
}

Некоторые значения Входного файла

C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4

Выход должен быть

C11FA586258 5
C11FA586413 5

Я искал форумы в течение нескольких часов, и все еще не могу найти проблему. Есть идеи?

Ответы на вопрос(2)

Ваш ответ на вопрос