首页 > Berkeley DB, David Zhao > Berkeley DB示例程序详解(3.1)

Berkeley DB示例程序详解(3.1)

2009年10月4日 davidzhao

/*
 * 这个示例程序演示了使用Berkeley DB的replication功能的方法。
 *
 * Berkeley DB提供了一套基本API和一套replication manager API来使用
 * 它的replication功能。
 * 前者有更大的灵活性,用户可以选择各种网络协议来实现数据传输,并且使用各种
 * 线程库来进行多线程编程。同时由于使用最基本的API, 用户可以定制选举和
 * replication系统的内部消息处理方式,非常灵活。所以这也意味着用户需要更多
 * 的代码来使用Berkeley DB replication功能,开发难度相对后者较大;
 * 后者基于TCP/IP协议和pthread线程库(Windows上面使用win32线程库),并且按照通常
 * 的需求,对选举和内部消息处理进行了通用的处理,并且通过让用户配置
 * 策略(policy)和参数的方式提供一定的灵活性和可定制性。
 *
 * 本程序基于Berkeley DB的replication manager。
 *
 *
 * 名词术语:
 *
 * site:一个(数据库环境, IP地址, 端口(port))的三元组。所以,多个site可以处于
 * 不同的计算机上面,也可以在同一个计算机上面,只要配置了不同的端口并使用不同
 * 的数据库环境目录。
 *
 * replication group:由若干个site组成的集合,其中有且只有一个site 可以写入
 * 数据到数据库当中, 称为master;其余的site只可以从中读取数据,称为replica。
 *
 * master:接受读写请求。如果写入数据,那么这些写入的数据不仅会更新master
 * 自己的数据库,还会通过Berkeley DB的
 * replication功能,被传播到replication group当中的replica上面。一个
 * replication group通过选举或者显式指定的方式,得到master。
 *
 * replica:接受读请求。从master写入的数据会自动传播到每一个replica上面,从而
 * 将replica上面的数据更新。任何一个replica在选举的时候,都有可能成为新的master
 *
 * 一个replication group作为一个整体来看的话,它比一个单机有
 * 更大的数据读写能力,因为所有的site都可以提供数据,自然读数据的吞吐量更大;
 * 并且master可以被配置为只写入数据而不必理会读数据请求,由replica提供数据,
 * 这样,写入的吞吐量也更大。
 *
 * replication group比单机的另一个优势,就是高可靠性(high availability),因为,
 * 其中如果任何一个site脱离(断电,网络故障,应用程序错误等),这个组都可以
 * 继续服务:如果一个replica脱离,那么
 * 还有其他replica可以提供数据,并且这个脱离的replica可以重新加入该组,其数据
 * 被自动更新到最新;如果master脱离,那么该组的所有replica举行选举,从中选
 * 出新的master。
 */
int
main(argc, argv)
 int argc;
 char *argv[];
{
 DB_ENV *dbenv;
 SETUP_DATA setup_info;
 repsite_t *site_list;
 APP_DATA my_app_data;
 thread_t ckp_thr, lga_thr;
 supthr_args sup_args;
 u_int32_t start_policy;
 int i, ret, t_ret;

 memset(&setup_info, 0, sizeof(SETUP_DATA));
 setup_info.progname = progname;
 memset(&my_app_data, 0, sizeof(APP_DATA));
 dbenv = NULL;
 ret = 0;
 /*
  * 默认使用选举的方法选出master。替代方案就是指定某一个site是master,
  * 其他site是replica。
  */
 start_policy = DB_REP_ELECTION;

 /* 创建一个拥有replication功能的数据库环境,但还不打开。 */
 if ((ret = create_env(progname, &dbenv)) != 0)
  goto err;
 dbenv->app_private = &my_app_data;

 /*
  * 配置replication消息处理回调函数。
  * 有任何replication消息,Berkeley DB都会调用该函数来处理。
  */
 (void)dbenv->set_event_notify(dbenv, event_callback);

 /* Parse command line and perform common replication setup. */
 if ((ret = common_rep_setup(dbenv, argc, argv, &setup_info)) != 0)
  goto err;

 /*
  * Perform repmgr-specific setup based on command line options.
  * 如果显式指定了master,那么就使用DB_REP_MASTER把本site设置为master,
  * 并用DB_REP_CLIENT把其他site设置为replica。
  */
 if (setup_info.role == MASTER)
  start_policy = DB_REP_MASTER;
 else if (setup_info.role == CLIENT)
  start_policy = DB_REP_CLIENT;
 /*
  * 设置本地site信息,也就是IP地址和端口信息。因为site是一个
  * (数据库环境,IP地址,port)的三元组,而数据库环境我们已经创建好
  * 了。
  */
 if ((ret = dbenv->repmgr_set_local_site(dbenv, setup_info.self.host,
     setup_info.self.port, 0)) != 0) {
  fprintf(stderr, “Could not set listen address (%d).\n”, ret);
  goto err;
 }
 site_list = setup_info.site_list;
 /*
  * 添加其他site的信息到本site。如果显式指定master,那么一个replica
  * 一定要知道master的信息;如果选举,那么每一个site要知道其他所有
  * site的信息。
  *
  * 两个site可以成为peer关系,通过在下面的函数中设置
  * DB_REPMGR_PEER来指定。一个replica可以被它的peer更新到最新,这样
  * 就为master去掉了更新过时的replica的负担。
  */
 for (i = 0; i < setup_info.remotesites; i++) {
  if ((ret = dbenv->repmgr_add_remote_site(dbenv,
      site_list[i].host, site_list[i].port, NULL,
      site_list[i].peer ? DB_REPMGR_PEER : 0)) != 0) {
   dbenv->err(dbenv, ret,
       “Could not add site %s:%d”, site_list[i].host,
       (int)site_list[i].port);
   goto err;
  }
 }

 /*
  * Configure heartbeat timeouts so that repmgr monitors the
  * health of the TCP connection.  Master sites broadcast a heartbeat
  * at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
  * Client sites wait for message activity the length of the
  * DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
  * connection to the master is lost.  The DB_REP_HEARTBEAT_MONITOR
  * timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
  */
 if ((ret = dbenv->rep_set_timeout(dbenv, DB_REP_HEARTBEAT_SEND,
     5000000)) != 0)
  dbenv->err(dbenv, ret,
      “Could not set heartbeat send timeout.\n”);
 if ((ret = dbenv->rep_set_timeout(dbenv, DB_REP_HEARTBEAT_MONITOR,
     10000000)) != 0)
  dbenv->err(dbenv, ret,
      “Could not set heartbeat monitor timeout.\n”);

 /*
  * The following repmgr features may also be useful to your
  * application.  See Berkeley DB documentation for more details.
  *  – Two-site strict majority rule – In a two-site replication
  *    group, require both sites to be available to elect a new
  *    master.
  *  – Timeouts – Customize the amount of time repmgr waits
  *    for such things as waiting for acknowledgements or attempting
  *    to reconnect to other sites.
  *  – Site list – return a list of sites currently known to repmgr.
  */
 /* 打开数据库环境。*/
 if ((ret = env_init(dbenv, setup_info.home)) != 0)
  goto err;

 /* Start checkpoint and log archive threads. */
 sup_args.dbenv = dbenv;
 sup_args.shared = &my_app_data.shared_data;
 if ((ret = start_support_threads(dbenv, &sup_args, &ckp_thr,
     &lga_thr)) != 0)
  goto err;

 if ((ret = dbenv->repmgr_start(dbenv, 3, start_policy)) != 0)
  goto err;

 if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) {
  dbenv->err(dbenv, ret, “Client failed”);
  goto err;
 }

 /* Finish checkpoint and log archive threads. */
 if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0)
  goto err;

 /*
  * We have used the DB_TXN_NOSYNC environment flag for improved
  * performance without the usual sacrifice of transactional durability,
  * as discussed in the “Transactional guarantees” page of the Reference
  * Guide: if one replication site crashes, we can expect the data to
  * exist at another site.  However, in case we shut down all sites
  * gracefully, we push out the end of the log here so that the most
  * recent transactions don’t mysteriously disappear.
  */
 if ((ret = dbenv->log_flush(dbenv, NULL)) != 0) {
  dbenv->err(dbenv, ret, “log_flush”);
  goto err;
 }

err:
 if (dbenv != NULL &&
     (t_ret = dbenv->close(dbenv, 0)) != 0) {
  fprintf(stderr, “failure closing env: %s (%d)\n”,
      db_strerror(t_ret), t_ret);
  if (ret == 0)
   ret = t_ret;
 }

 return (ret);
}

/*
 * 事件通知函数。当使用replication manager的时候,该事件已经被
 * replication manager 适当地处理好了,所以在这个回调函数当中,
 * 我们只需要简单地更新状态或者打印消息即可。
 */
static void
event_callback(dbenv, which, info)
 DB_ENV *dbenv;
 u_int32_t which;
 void *info;
{
 APP_DATA *app = dbenv->app_private;
 SHARED_DATA *shared = &app->shared_data;

 info = NULL;    /* Currently unused. */

 switch (which) {
 case DB_EVENT_REP_CLIENT:
  shared->is_master = 0;
  shared->in_client_sync = 1;
  break;

 case DB_EVENT_REP_MASTER:
  shared->is_master = 1;
  shared->in_client_sync = 0;
  break;

 case DB_EVENT_REP_NEWMASTER:
  shared->in_client_sync = 1;
  break;

 case DB_EVENT_REP_PERM_FAILED:
  /*
   * Did not get enough acks to guarantee transaction
   * durability based on the configured ack policy.  This
   * transaction will be flushed to the master site’s
   * local disk storage for durability.
   */
  printf(
    “Insufficient acknowledgements to guarantee transaction durability.\n”);
  break;

 case DB_EVENT_REP_STARTUPDONE:
  shared->in_client_sync = 0;
  break;

 default:
  dbenv->errx(dbenv, “ignoring event %d”, which);
 }
}

分类: Berkeley DB, David Zhao 标签: ,
  1. camel
    2009年12月4日07:32 | #1

    请教一个EntityJoin的问题,EntityJoin join;ForwardCursor cursor = join.entities();
    这个游标无法定位到最后一条记录,那多条件查询的结果倒排序该如何处理?
    我看了 ReverseOrder 的例子,但是我不想破坏数据结构,请问还有其它处理方法吗?

  2. xiaoxin
    2010年1月15日13:00 | #2

    请教一个关于BDB 的支持多key重复且排序(DB_DUPSORT)问题!我使用的是 BerkeleyDB 4.8.24 版本!
    以下程序是在visual stdio 2008上编译的,主要是一个主数据库与一个次数据库,其中次数据库支持重复(DB_DUPSORT),比较函数由自己设(不使用系统的)但在实际测试中不会调自己设的回调函数(secondary_compare)来进行次索引的比较,但是如果次索引的字段改成u_char(本程序中用的为u_long)则会调到,这是为什么呢?有没有人知道的?
    #include “db.h”
    #include
    typedef struct _testID
    {
    u_long PIndex;
    u_long stime;
    }TID;
    u_long tmp;
    //主索引比较函数
    int primary_compare( DB* db, const DBT *key1, const DBT *key2)
    {
    u_long a = ((TID*)(key1 ->data))->PIndex;
    u_long b = ((TID*)(key2 ->data))->PIndex;
    return (a – b);
    }
    // secondary compare function
    int secondary_compare( DB* db, const DBT *key1, const DBT *key2)
    {
    //在这个函数打上断点怎么老是执行不到?是不是前面我哪里设错了?
    u_long a = *(u_long*)(key1->data);
    u_long b = *(u_long*)(key2->data);
    return (a – b);
    }
    // get
    int ExtSKEYC(DB* dbp, const DBT* pkey, const DBT *pdata, DBT *skey)
    {

    skey->data = &((TID*)(pkey ->data))->stime;
    skey->size = sizeof(u_long);
    return 0;
    }
    int _tmain(int argc, _TCHAR* argv[])
    {
    DB* db_primary = NULL;
    DB* db_secondary = NULL;

    // PRIMARY DB
    int nRet = -1;
    // CREATE
    nRet = db_create(&db_primary, NULL, 0);
    // COMPARE
    nRet = db_primary->set_bt_compare(db_primary, primary_compare);
    // open
    nRet = db_primary->open(db_primary, NULL, “primary.db”, NULL, DB_BTREE, DB_CREATE,0);

    // SECONDARY DB
    nRet = db_create(&db_secondary, NULL, 0);
    // flag
    nRet = db_secondary->set_flags(db_secondary, DB_DUPSORT );
    // compare
    nRet = db_secondary->set_dup_compare(db_secondary, secondary_compare);
    // open
    nRet = db_secondary->open(db_secondary, NULL, “secondary.db”, NULL, DB_BTREE, DB_CREATE, 0);
    // associate
    nRet = db_primary->associate(db_primary, NULL, db_secondary, ExtSKEYC, 0);

    // write ,以上每一步的返回值 nRet 都是0.
    DBT key, data;
    TID pindex = {0,0};
    for( int i = 0; i set_dup_compare”;
    data.size = strlen(“DB->set_dup_compare”) + 1;
    nRet = db_primary->put( db_primary, NULL, &key, &data, 0);
    }
    getchar();
    nRet = db_secondary->close(db_secondary, 0);
    nRet = db_primary->close(db_primary, 0);
    return 0;
    }

  3. 旺旺
    2010年2月4日19:20 | #3

    不错,试试先

  4. 和尚
    2010年3月20日20:30 | #4

    请教个问题:
    replication开发中的repmgr_set_ack_policy函数赋值DB_REPMGR_ACKS_NONE有什么不利影响吗?
    是否会出现主备间数据不一至,备机丢数的情况。谢谢!!

  5. 2010年3月22日12:43 | #5

    @和尚
    使用了DB_REPMGR_ACKS_NONE意味着master不等待任何client确认消息就继续执行后续操作。 这样有可能使得master的日志信息还没完全同步到client, 造成二者数据在同一时间点上的不一致,也就是主备机不一致。

  6. dcr2010
    2010年10月14日09:54 | #6

    SLAVE在Master宕机时 被选举为Master。当老的Master恢复工作。如何设置回去。步骤是?

  7. colin
    2010年10月17日14:54 | #7

    请教个问题:
    我的程序中使用了berkeleydb,服务器是linux,当服务器意外断电重启时,程序重新重启后,打开某一个databases时,有时候会卡在那儿,怎么回事?谢谢

  8. 2010年10月18日13:18 | #8

    根据你的描述,卡死的情况应该与recovery相关。你可以提供更详尽的应用场景给我,以做详细的分析,emily.fu@oracle.com。

  9. clusterlee
    2010年10月27日14:18 | #9

    多线程访问下,只支持 多线程读/单线程写 的模式?

  10. chaohuang
    2010年11月18日12:51 | #10

    @clusterlee
    见示例的注释部分:

    replication group:由若干个site组成的集合,其中有且只有一个site 可以写入数据到数据库当中, 称为master;其余的site只可以从中读取数据,称为replica。

    与多线程读写不相干。

  11. xman
    2010年12月22日11:29 | #11

    你好,

    请教一个问题。

    一个master,一个client。我在master 大量插入数据。然后把client 杀掉,然后再重启client。之后,log是都同步的。但是client端的数据文件的大小却不再增长了。

    我看DB文档支持 DB_REP_CONF_AUTOINIT

    It is possible that there is a time of the day when it is better to perform a replica re-initialization. Or, you simply might want to decide to bring the replica up to speed by restoring its databases using a hot-backup taken from the master. Either way, you can decide to prevent automatic-initialization of your replica. To do this specify DB_REP_CONF_AUTOINIT to DbEnv::rep_set_config() and then specify 0 to the onoff parameter.

    m_dbEnv->rep_set_config(DB_REP_CONF_AUTOINIT, 1)

    可是设置了这个参数也不起作用。一点作用都不起。

    遇到这种client断掉、死掉之后又重连的情况,应该怎么处理呢?

    谢谢。

  12. Toni
    2010年12月22日21:45 | #12

    @all
    最使用replication 的时候,replica已同步了master的log文件 但是为什么db文件一直都没有更新,后来我加入了范例中checkpoint线程,虽然db文件有更新,但是每次都要2、3分钟才能更新,看日志发现,master发送一个bulk log,在发送一个 sync,这个时候replica日志显示 延迟同步等待丢失的记录,当master在发送一个bulk log的时候 replica才同步了所有的db文件。 每次发送的时候中间都有时间间隔,不知道具体是怎么回事,有办法 直接全部同步到吗?

本文的评论功能被关闭了.
Դ