用户名: 密码: 忘记密码? 注册

TFS 源码分析 写文件操作 Client端

作者:  时间: 2011-01-02
整个写文件的总体流程这里有介绍
http://blog.chinaunix.net/u3/99156/showart.php?id=2457731
主要分析了写文件时,NameServer端的源码分析
这篇文章介绍写文件时,Client端的源码分析
本文描述的内容涉及TFS写入流程图中的step1, step2, step3, step7

这里有介绍,本文主要分析源码
http://blog.chinaunix.net/u3/99156/showart.php?id=2458060

和通常的Linux文件操作API一样,写文件时包括三个步骤:open->write->close

TfsFile::tfs_open():
1. 读模式,发送block_id给NS,从NS查询相关ds_list返回给client
2. 写模式,发送block_id给NS,NS查询元数据,选择一个可写的block_id,以及相关ds_list返回给client

int TfsFile::tfs_open(const char *file_name, const char *suffix, const int32_t mode)
{
  mode_ = mode;
  //利用文件名计算得到block_id和file_id
  conv_name(file_name, suffix);
  int32_t iret = tfs_open(block_id_, file_id_, mode_);
  if (iret != TFS_SUCCESS)
  {
    return iret;
  }

  FSName fsname;
  fsname.set_cluster_id(session_->get_cluster_id());
  fsname.set_block_id(block_id_);
  fsname.set_file_id(file_id_);
  //这个有什么用
  fsname.set_prefix(suffix);
  file_id_ = fsname.get_file_id();
  strcpy(file_name_, fsname.get_name());
  return TFS_SUCCESS;
}

int TfsFile::tfs_open(const uint32_t block_id, const uint64_t file_id, const int32_t mode)
{
  if (session_ == NULL)
  {
    snprintf(error_message_, ERR_MSG_SIZE, "not set session, session is null");
    return TFS_ERROR;
  }

  //读模式的话,必须指明block_id
  if ((block_id == 0) && (mode == READ_MODE || mode == UNLINK_MODE))
  {
    snprintf(error_message_, ERR_MSG_SIZE, "%s no block, block_id(%u) file_id(%" PRI64_PREFIX "u)",
             file_name_, block_id, file_id);
    return TFS_ERROR;
  }

  mode_ = mode;
  block_id_ = block_id;
  file_id_ = file_id;
  ds_list_.clear();

  if (mode == READ_MODE)
  {
    //跟NameServer通信,主要是获取ds_list信息,主要是调用

    //TfsSession::get_block_info_ex()函数,加上查询cache的信息
    if (session_->get_block_info(block_id_, ds_list_) != TFS_SUCCESS)
    {

      //没有可读的DS,返回错误
      if (ds_list_.size() == 0)
      {
        snprintf(error_message_, ERR_MSG_SIZE, "tfs open fail, block(%u) no exist in nameserver", block_id_);
        return TFS_ERROR;
      }
    }
  }
  else if (mode == UNLINK_MODE)
  {
    if (session_->get_unlink_block_info(block_id_, ds_list_) != TFS_SUCCESS)
    {
      snprintf(error_message_, ERR_MSG_SIZE, "tfs open fail, block(%u) no exist in nameserver", block_id_);
      return TFS_ERROR;
    }
  }

  //写模式
  else
  {
    uint32_t current_block_id = block_id_;
    int32_t flag = BLOCK_WRITE | BLOCK_CREATE;
    if ((mode_ & NEWBLK_MODE))
    {
      flag |= BLOCK_NEWBLK;
    }
    if ((mode_ & NOLEASE_MODE))
    {
      flag |= BLOCK_NOLEASE;
    }
    //这里也调用TfsSession::get_block_info_ex,如果对应的文件不存在,则

    //NameServer会找到一个可用的block_id返回
    if (session_->create_block_info(current_block_id, ds_list_, flag, fail_servers_) != TFS_SUCCESS)
    {
      if (ds_list_.size() == 0 || current_block_id == 0)
      {
        snprintf(error_message_, ERR_MSG_SIZE, "create block(%u) fail in nameserver", block_id_);
        return TFS_ERROR;
      }
    }

    //这里的APPED_MODE是如何来判断的,跟block_id==0有什么关系
    if (block_id_ == 0)
    {
      block_id_ = current_block_id;
      mode_ |= APPEND_MODE;
    }
  }
  if (block_id_ == 0 || ds_list_.size() == 0)
  {
    snprintf(error_message_, ERR_MSG_SIZE, "block(%u),is invalid, dataserver size(%u)", block_id_,
             static_cast<uint32_t> (ds_list_.size()));
    return TFS_ERROR;
  }

  if ((mode_ & READ_MODE))
  {
    if (file_id_ == 0)
    {
      //TBSYS_LOG(WARN, "blockId(%u) read fileid is 0.", block_id_);
    }

    //读模式通过这个方法来选择DS
    pri_ds_index_ = static_cast<int32_t> (file_id_ % ds_list_.size());
  }
  else
  {
    pri_ds_index_ = 0;
  }

  //尝试连接ds_list中的DS,并且选出第一个可连接的DS作为primary DS
  if (connect_ds() != TFS_SUCCESS)
  {
    snprintf(error_message_, ERR_MSG_SIZE, "connect to dataserver fail.");
    if ((mode_ & APPEND_MODE))
    {
      fail_servers_.push_back(ds_list_[0]);
    }
    return TFS_ERROR;
  }

  fail_servers_.clear();

  if ((mode_ & WRITE_MODE))
  {

    //这个函数调用client_->call(CreateFileMessage),经过上面connect_ds()的调用,

    //client_是连到primary DS的客户端,所以这个消息不是发送给NS的

   //通过此消息,去primary DS上获取file_number

    int32_t ret = create_filename();
    if (ret != TFS_SUCCESS)
    {
      CLIENT_POOL.release_client(client_);
      TBSYS_LOG(ERROR, "create file name faile(%d)", ret);
      return ret;
    }
    if (file_id_ == 0)
    {
      snprintf(error_message_, ERR_MSG_SIZE, "create file name fail,fileid == 0");
      CLIENT_POOL.release_client(client_);
      return TFS_ERROR;
    }
  }
  offset_ = 0;
  eof_ = TFS_FILE_EOF_FLAG_NO;
  crc_ = 0;
  is_open_flag_ = TFS_FILE_OPEN_FLAG_YES;
  return TFS_SUCCESS;
}


TfsFile::tfs_write():
1. 传入数据,进行数据写入

int TfsFile::tfs_write(char *data, const int32_t len)
{
  if (is_open_flag_ == TFS_FILE_OPEN_FLAG_NO)
  {
    snprintf(error_message_, ERR_MSG_SIZE, "tfs file(%s) don't open file", file_name_);
    return -2;
  }
  if (!(mode_ & WRITE_MODE))
  {
    snprintf(error_message_, ERR_MSG_SIZE, "open file mode isn't write.");
    return -1;
  }

  WriteDataMessage dsmessage;

  //file_number用来干什么??
  dsmessage.set_file_number(file_number_);
  dsmessage.set_block_id(block_id_);
  dsmessage.set_file_id(file_id_);

  //小文件写入,需要找到这个block_id对应的块的offset进行写入
  dsmessage.set_offset(offset_);
  dsmessage.set_length(len);
  //注意这里,讲ds_list_放入message中,发送给primary DS,让它去给其他DS发送数据
  dsmessage.set_ds_list(ds_list_);
  dsmessage.set_data(data);

  //给primary DS发送写入消息,这里是同步等待,还是异步等待??
  Message *message = client_->call(&dsmessage);

  int32_t iret = TFS_ERROR;
  if (message != NULL)
  {
    if (message->get_message_type() == STATUS_MESSAGE)
    {
      StatusMessage *msg = dynamic_cast<StatusMessage*> (message);
      if (msg->get_status() == STATUS_MESSAGE_OK)
      {

        //记录数据crc和文件偏移量
        crc_ = Func::crc(crc_, data, len);
        offset_ += len;
        iret = TFS_SUCCESS;
      }
      else
      {
#ifdef __CLIENT_METRICS__
        write_metrics_.incr_failed_count();
#endif
        snprintf(error_message_, ERR_MSG_SIZE, "tfs write, get error msg(%s)", msg->get_error());
      }
    }
    tbsys::gDelete(message);
  }
  else
  {
#ifdef __CLIENT_METRICS__
    write_metrics_.incr_timeout_count();
#endif
  }

  if (iret != TFS_SUCCESS)
  {
    snprintf(error_message_, ERR_MSG_SIZE, "tfs write, get error msg from dataserver(%s)",
             tbsys::CNetUtil::addrToString(client_->get_mip()).c_str());
    client_->disconnect();
    is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;
    CLIENT_POOL.release_client(client_);
    return -1;
  }
  return len;
}


关闭文件:
1. 由client向primary DS发送closeFileMessage消息,由primary DS通知其他DS关闭文件(如果不关闭文件的话,是否该文件是不可读的????)

int TfsFile::tfs_close()
{
  if (is_open_flag_ == TFS_FILE_OPEN_FLAG_NO)
  {
    TBSYS_LOG(INFO, "tfs close successful , buf tfs file not open");
    return TFS_SUCCESS;
  }

  //读模式,直接关闭client,设置is_open_flag就行
  if (mode_ & READ_MODE)
  {
    is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;
    if (client_ != NULL)
      CLIENT_POOL.release_client(client_);
    TBSYS_LOG(DEBUG, "tfs close successful");
    return TFS_SUCCESS;
  }

  if (!(mode_ & WRITE_MODE))
  {
    TBSYS_LOG(INFO, "tfs close successful");
    return TFS_SUCCESS;
  }

  if (offset_ == 0)
  {
    is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;
    if (client_ != NULL)
    {
      CLIENT_POOL.release_client(client_);
    }
    TBSYS_LOG(INFO, "tfs close successful, offset(%d)", offset_);
    return TFS_ERROR;
  }

  if (client_ == NULL)
  {
    TBSYS_LOG(ERROR, "tfs close fail, client is null");
    return TFS_ERROR;
  }

  CloseFileMessage dsmessage;
  dsmessage.set_file_number(file_number_);
  dsmessage.set_block_id(block_id_);
  dsmessage.set_file_id(file_id_);
  dsmessage.set_option_flag(option_flag_);
  //发送closeFileMessage给primary,让primary通知其他DS关闭文件
  dsmessage.set_ds_list(ds_list_);
  dsmessage.set_crc(crc_);

  Message *message = client_->call(&dsmessage);

  int32_t iret = TFS_ERROR;
  if (message != NULL)
  {
    if (message->get_message_type() == STATUS_MESSAGE)
    {
      StatusMessage* msg = dynamic_cast<StatusMessage*> (message);
      if (msg->get_status() == STATUS_MESSAGE_OK)
      {
        iret = TFS_SUCCESS;
      }
      else
      {
#ifdef __CLIENT_METRICS__
        close_metrics_.incr_failed_count();
#endif
        snprintf(error_message_, ERR_MSG_SIZE, "tfs file close, get errow msg(%s)", msg->get_error());
        client_->disconnect();
      }
    }
    tbsys::gDelete(message);
  }
  else
  {
    snprintf (error_message_, ERR_MSG_SIZE, "tfs file close,send msg to dataserver fail, errors(time out, blockid(%u) fileid(%" PRI64_PREFIX "u)",
               block_id_, file_id_);
#ifdef __CLIENT_METRICS__
    close_metrics_.incr_failed_count();
#endif
  }

#ifdef __CLIENT_METRICS__
  if (close_metrics_.total_count % 50 == 0)
  {
    TBSYS_LOG(DEBUG, "write file (%s) on server(%s)", file_name_, tbsys::CNetUtil::addrToString(last_elect_ds_id_).c_str());
  }
#endif

  is_open_flag_ = TFS_FILE_OPEN_FLAG_NO;
  CLIENT_POOL.release_client(client_);
  return iret;
}