hdfs удаляет записываемый файл - PullRequest
0 голосов
/ 23 мая 2018

Согласно блогу , hdfs использует механизм аренды, чтобы избежать того, чтобы два клиента писали один и тот же файл.Поэтому я думаю, что нельзя удалить файл, который написан другим клиентом.Но это неправильно.

Когда клиент A пишет lock.txt, клиент B может немедленно удалить lock.txt.Клиент А может продолжить запись, хотя файл больше не существует.Просто при закрытии потока A встретится исключение: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /user/lock.txt (inode 643845185): File does not exist. Holder DFSClient_NONMAPREDUCE_-1636584585_1 does not have any open files**

Почему это происходит?Моя версия Haddop - 2.7.3.

===============================

Это мой тестовый код:

// write process
object Create {

  private val filePath = "/user/lock.txt"

  def main(args: Array[String]): Unit = {
    println("Create start!")
    val fs = FileSystem.get(new Configuration())
    val os = Option(fs.create(new Path(filePath), false))
    if (os.isDefined) {
      println(s"Create result! $os", System.currentTimeMillis())
      0.until(300).foreach(index => {
        os.get.write(100)
        os.get.flush()
        println("Writing...")
        Thread.sleep(1000)
      })
      println("pre close" + System.currentTimeMillis())
      os.get.close()
      println("close success!" + System.currentTimeMillis())
    }
  }

}

// delete process
object Delete {

  private val filePath = "/user/lock.txt"

  def main(args: Array[String]): Unit = {
    println("Delete start!")
    val fs = FileSystem.get(new Configuration())
    while (!fs.exists(new Path(filePath))) {
      println("File no exist!")
      Thread.sleep(1000)
    }
    println("File exist!")
    while (true) {
      println("try delete!")
      val tmp = Option(fs.delete(new Path(filePath), false))
      if (tmp.isDefined) {
        println(s"delete result:${tmp.get}!" + System.currentTimeMillis())
      }
      println("Try recover")
      if (fs.asInstanceOf[DistributedFileSystem].recoverLease(new Path(filePath))) {
        println("Recover lease success!")
        val res = Option(fs.delete(new Path(filePath), false))
        println(s"File delete success:${res.get}")
      } else {
        println("Recover lease failed!")
      }
      Thread.sleep(1000)
    }
  }

}
...