Index: src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
===================================================================
--- src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java	(revision 736736)
+++ src/test/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java	(working copy)
@@ -400,7 +400,7 @@
       String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
 
       // Truncate replica of block
-      truncateReplica(block, 0);
+      changeReplicaLength(block, 0, -1);
 
       cluster.shutdown();
 
@@ -421,18 +421,22 @@
     }
   }
   
-  private static void truncateReplica(String blockName, int dnIndex) throws IOException {
+  /**
+   * Change the length of a block at datanode dnIndex
+   */
+  static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
     for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
       File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
                                blockName);
       if (blockFile.exists()) {
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        raFile.setLength(raFile.length()-1);
+        raFile.setLength(raFile.length()+lenDelta);
         raFile.close();
-        break;
+        return true;
       }
     }
+    return false;
   }
   
   private static void waitForBlockDeleted(String blockName, int dnIndex) 
Index: src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
===================================================================
--- src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java	(revision 736736)
+++ src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java	(working copy)
@@ -640,4 +640,13 @@
   public String getStorageInfo() {
     return "Simulated FSDataset-" + storageId;
   }
+
+  /**
+   * Check if a block is being written to, i.e., not finalized yet
+   * @param b a block
+   * @return true if the block is under construction; false otherwise
+   */
+  public boolean isUnderConstruction(Block b) {
+    return false;
+  }
 }
Index: src/test/org/apache/hadoop/hdfs/TestReplication.java
===================================================================
--- src/test/org/apache/hadoop/hdfs/TestReplication.java	(revision 736736)
+++ src/test/org/apache/hadoop/hdfs/TestReplication.java	(working copy)
@@ -388,7 +388,58 @@
       if (cluster != null) {
         cluster.shutdown();
       }
+    }  
+  }
+  
+  /**
+   * Test if replication can detect mismatched length on-disk blocks
+   * @throws Exception
+   */
+  public void testReplicateLenMismatchedBlock() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
+    try {
+      cluster.waitActive();
+      // test truncated block
+      changeBlockLen(cluster, -1);
+      // test extended block
+      changeBlockLen(cluster, 1);
+    } finally {
+      cluster.shutdown();
     }
-  }  
+  }
   
+  private void changeBlockLen(MiniDFSCluster cluster, 
+      int lenDelta) throws IOException, InterruptedException {
+    final Path fileName = new Path("/file1");
+    final short REPLICATION_FACTOR = (short)1;
+    final int fileLen = 1;
+    final FileSystem fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, fileName, fileLen, REPLICATION_FACTOR, 0);
+    DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
+
+    String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
+
+    // Change the length of a replica
+    for (int i=0; i<cluster.getDataNodes().size(); i++) {
+      if (TestDatanodeBlockScanner.changeReplicaLength(block, i, lenDelta)) {
+        break;
+      }
+    }
+
+    // increase the file's replication factor
+    fs.setReplication(fileName, (short)(REPLICATION_FACTOR+1));
+
+    // block replication triggers corrupt block detection
+    DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", 
+        cluster.getNameNodePort()), fs.getConf());
+    LocatedBlocks blocks = dfsClient.namenode.getBlockLocations(
+        fileName.toString(), 0, fileLen);
+    while (!blocks.get(0).isCorrupt() || 
+        REPLICATION_FACTOR != blocks.get(0).getLocations().length) {
+      Thread.sleep(100);
+      blocks = dfsClient.namenode.getBlockLocations(
+          fileName.toString(), 0, fileLen);
+    }
+    fs.delete(fileName, true);
+  }
 }
Index: src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
===================================================================
--- src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java	(revision 736736)
+++ src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java	(working copy)
@@ -264,4 +264,11 @@
    * @throws IOException
    */
   public void validateBlockMetadata(Block b) throws IOException;
+  
+  /**
+   * Check if a block is being written to, i.e., not finalized yet
+   * @param b a block
+   * @return true if the block is under construction; false otherwise
+   */
+  public boolean isUnderConstruction(Block b);
 }
Index: src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
===================================================================
--- src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java	(revision 736736)
+++ src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java	(working copy)
@@ -1425,4 +1425,16 @@
   public String getStorageInfo() {
     return toString();
   }
+
+  /**
+   * Check if a block is being written to, i.e., not finalized yet
+   * @param b a block
+   * @return true if the block is under construction; false otherwise
+   */
+  public boolean isUnderConstruction(Block b) {
+    if (ongoingCreates.containsKey(b)) {
+      return true;
+    }
+    return false;
+  }
 }
Index: src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
===================================================================
--- src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java	(revision 736736)
+++ src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java	(working copy)
@@ -908,6 +908,25 @@
       return;
     }
 
+    // Check if block is under construction
+    if (data.isUnderConstruction(block)) {
+      LOG.info("Can't replicate block under construction " + block);
+      return;
+    }
+    
+    // Check if NN recorded length matches on-disk length 
+    long onDiskLength = data.getLength(block);
+    if (block.getNumBytes() != onDiskLength) {
+      // Mismatch indicates corruption so report NN the corrupt block
+      namenode.reportBadBlocks(new LocatedBlock[]{
+          new LocatedBlock(block, new DatanodeInfo[] {
+              new DatanodeInfo(dnRegistration)})});
+      LOG.info("Can't replicate block " + block
+          + " because on-disk length " + onDiskLength 
+          + " mismatches NameNode recorded length " + block.getNumBytes());
+      return;
+    }
+    
     int numTargets = xferTargets.length;
     if (numTargets > 0) {
       if (LOG.isInfoEnabled()) {
@@ -1088,7 +1107,7 @@
         out = new DataOutputStream(new BufferedOutputStream(baseStream, 
                                                             SMALL_BUFFER_SIZE));
 
-        blockSender = new BlockSender(b, 0, -1, false, false, false, 
+        blockSender = new BlockSender(b, 0, b.getNumBytes(), false, false, false, 
             datanode);
         DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
 
