Index: src/java/org/apache/nutch/segment/SegmentMerger.java
===================================================================
--- src/java/org/apache/nutch/segment/SegmentMerger.java	(revision 561294)
+++ src/java/org/apache/nutch/segment/SegmentMerger.java	(working copy)
@@ -56,6 +56,7 @@
 import org.apache.nutch.metadata.MetaWrapper;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.parse.ParseText;
 import org.apache.nutch.protocol.Content;
@@ -116,6 +117,7 @@
   private static final String SEGMENT_SLICE_KEY = "slice";
 
   private URLFilters filters = null;
+  private URLNormalizers normalizers = null;
   private long sliceSize = -1;
   private long curCount = 0;
   
@@ -293,6 +295,8 @@
     if (conf == null) return;
     if (conf.getBoolean("segment.merger.filter", false))
       filters = new URLFilters(conf);
+    if (conf.getBoolean("segment.merger.normalizer", false))
+      normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
     sliceSize = conf.getLong("segment.merger.slice", -1);
     if ((sliceSize > 0) && (LOG.isInfoEnabled())) {
       LOG.info("Slice size: " + sliceSize + " URLs.");
@@ -312,23 +316,27 @@
   private Text newKey = new Text();
   
   public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
-    // convert on the fly from the old format
-    if (key instanceof UTF8) {
-      newKey.set(key.toString());
-      key = newKey;
+    String url = key.toString();
+    if (normalizers != null) {
+      try {
+        url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // normalize the url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e.getMessage());
+        url = null;
+      }
     }
-    if (filters != null) {
+    if (url != null && filters != null) {
       try {
-        if (filters.filter(((Text)key).toString()) == null) {
-          return;
-        }
+        url = filters.filter(url);
       } catch (Exception e) {
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Cannot filter key " + key + ": " + e.getMessage());
-        }
+        LOG.warn("Skipping key " + url + ": " + e.getMessage());
+        url = null;
       }
     }
-    output.collect(key, value);
+    if(url != null){
+      newKey.set(url);
+      output.collect(newKey, value);
+    }
   }
 
   /**
@@ -505,7 +513,7 @@
     }
   }
 
-  public void merge(Path out, Path[] segs, boolean filter, long slice) throws Exception {
+  public void merge(Path out, Path[] segs, boolean filter, boolean normalize, long slice) throws Exception {
     String segmentName = Generator.generateSegmentName();
     if (LOG.isInfoEnabled()) {
       LOG.info("Merging " + segs.length + " segments to " + out + "/" + segmentName);
@@ -513,6 +521,7 @@
     JobConf job = new NutchJob(getConf());
     job.setJobName("mergesegs " + out + "/" + segmentName);
     job.setBoolean("segment.merger.filter", filter);
+    job.setBoolean("segment.merger.normalizer", normalize);
     job.setLong("segment.merger.slice", slice);
     job.set("segment.merger.segmentName", segmentName);
     FileSystem fs = FileSystem.get(getConf());
@@ -616,6 +625,7 @@
     ArrayList segs = new ArrayList();
     long sliceSize = 0;
     boolean filter = false;
+    boolean normalize = false;
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-dir")) {
         Path[] files = fs.listPaths(new Path(args[++i]), new PathFilter() {
@@ -631,6 +641,8 @@
           segs.add(files[j]);
       } else if (args[i].equals("-filter")) {
         filter = true;
+      } else if (args[i].equals("-normalize")) {
+        normalize = true;
       } else if (args[i].equals("-slice")) {
         sliceSize = Long.parseLong(args[++i]);
       } else {
@@ -642,7 +654,7 @@
       return;
     }
     SegmentMerger merger = new SegmentMerger(conf);
-    merger.merge(out, (Path[]) segs.toArray(new Path[segs.size()]), filter, sliceSize);
+    merger.merge(out, (Path[]) segs.toArray(new Path[segs.size()]), filter, normalize, sliceSize);
   }
 
 }
