Berkeley DB示例程序详解(3.1)
/*
* 这个示例程序演示了使用Berkeley DB的replication功能的方法。
*
* Berkeley DB提供了一套基本API和一套replication manager API来使用
* 它的replication功能。
* 前者有更大的灵活性,用户可以选择各种网络协议来实现数据传输,并且使用各种
* 线程库来进行多线程编程。同时由于使用最基本的API, 用户可以定制选举和
* replication系统的内部消息处理方式,非常灵活。所以这也意味着用户需要更多
* 的代码来使用Berkeley DB replication功能,开发难度相对后者较大;
* 后者基于TCP/IP协议和pthread线程库(Windows上面使用win32线程库),并且按照通常
* 的需求,对选举和内部消息处理进行了通用的处理,并且通过让用户配置
* 策略(policy)和参数的方式提供一定的灵活性和可定制性。
*
* 本程序基于Berkeley DB的replication manager。
*
*
* 名词术语:
*
* site:一个(数据库环境, IP地址, 端口(port))的三元组。所以,多个site可以处于
* 不同的计算机上面,也可以在同一个计算机上面,只要配置了不同的端口并使用不同
* 的数据库环境目录。
*
* replication group:由若干个site组成的集合,其中有且只有一个site 可以写入
* 数据到数据库当中, 称为master;其余的site只可以从中读取数据,称为replica。
*
* master:接受读写请求。如果写入数据,那么这些写入的数据不仅会更新master
* 自己的数据库,还会通过Berkeley DB的
* replication功能,被传播到replication group当中的replica上面。一个
* replication group通过选举或者显式指定的方式,得到master。
*
* replica:接受读请求。从master写入的数据会自动传播到每一个replica上面,从而
* 将replica上面的数据更新。任何一个replica在选举的时候,都有可能成为新的master
*
* 一个replication group作为一个整体来看的话,它比一个单机有
* 更大的数据读写能力,因为所有的site都可以提供数据,自然读数据的吞吐量更大;
* 并且master可以被配置为只写入数据而不必理会读数据请求,由replica提供数据,
* 这样,写入的吞吐量也更大。
*
* replication group比单机的另一个优势,就是高可靠性(high availability),因为,
* 其中如果任何一个site脱离(断电,网络故障,应用程序错误等),这个组都可以
* 继续服务:如果一个replica脱离,那么
* 还有其他replica可以提供数据,并且这个脱离的replica可以重新加入该组,其数据
* 被自动更新到最新;如果master脱离,那么该组的所有replica举行选举,从中选
* 出新的master。
*/
int
main(argc, argv)
int argc;
char *argv[];
{
DB_ENV *dbenv;
SETUP_DATA setup_info;
repsite_t *site_list;
APP_DATA my_app_data;
thread_t ckp_thr, lga_thr;
supthr_args sup_args;
u_int32_t start_policy;
int i, ret, t_ret;
memset(&setup_info, 0, sizeof(SETUP_DATA));
setup_info.progname = progname;
memset(&my_app_data, 0, sizeof(APP_DATA));
dbenv = NULL;
ret = 0;
/*
* 默认使用选举的方法选出master。替代方案就是指定某一个site是master,
* 其他site是replica。
*/
start_policy = DB_REP_ELECTION;
/* 创建一个拥有replication功能的数据库环境,但还不打开。 */
if ((ret = create_env(progname, &dbenv)) != 0)
goto err;
dbenv->app_private = &my_app_data;
/*
* 配置replication消息处理回调函数。
* 有任何replication消息,Berkeley DB都会调用该函数来处理。
*/
(void)dbenv->set_event_notify(dbenv, event_callback);
/* Parse command line and perform common replication setup. */
if ((ret = common_rep_setup(dbenv, argc, argv, &setup_info)) != 0)
goto err;
/*
* Perform repmgr-specific setup based on command line options.
* 如果显式指定了master,那么就使用DB_REP_MASTER把本site设置为master,
* 并用DB_REP_CLIENT把其他site设置为replica。
*/
if (setup_info.role == MASTER)
start_policy = DB_REP_MASTER;
else if (setup_info.role == CLIENT)
start_policy = DB_REP_CLIENT;
/*
* 设置本地site信息,也就是IP地址和端口信息。因为site是一个
* (数据库环境,IP地址,port)的三元组,而数据库环境我们已经创建好
* 了。
*/
if ((ret = dbenv->repmgr_set_local_site(dbenv, setup_info.self.host,
setup_info.self.port, 0)) != 0) {
fprintf(stderr, “Could not set listen address (%d).\n”, ret);
goto err;
}
site_list = setup_info.site_list;
/*
* 添加其他site的信息到本site。如果显式指定master,那么一个replica
* 一定要知道master的信息;如果选举,那么每一个site要知道其他所有
* site的信息。
*
* 两个site可以成为peer关系,通过在下面的函数中设置
* DB_REPMGR_PEER来指定。一个replica可以被它的peer更新到最新,这样
* 就为master去掉了更新过时的replica的负担。
*/
for (i = 0; i < setup_info.remotesites; i++) {
if ((ret = dbenv->repmgr_add_remote_site(dbenv,
site_list[i].host, site_list[i].port, NULL,
site_list[i].peer ? DB_REPMGR_PEER : 0)) != 0) {
dbenv->err(dbenv, ret,
“Could not add site %s:%d”, site_list[i].host,
(int)site_list[i].port);
goto err;
}
}
/*
* Configure heartbeat timeouts so that repmgr monitors the
* health of the TCP connection. Master sites broadcast a heartbeat
* at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
* Client sites wait for message activity the length of the
* DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
* connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR
* timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
*/
if ((ret = dbenv->rep_set_timeout(dbenv, DB_REP_HEARTBEAT_SEND,
5000000)) != 0)
dbenv->err(dbenv, ret,
“Could not set heartbeat send timeout.\n”);
if ((ret = dbenv->rep_set_timeout(dbenv, DB_REP_HEARTBEAT_MONITOR,
10000000)) != 0)
dbenv->err(dbenv, ret,
“Could not set heartbeat monitor timeout.\n”);
/*
* The following repmgr features may also be useful to your
* application. See Berkeley DB documentation for more details.
* – Two-site strict majority rule – In a two-site replication
* group, require both sites to be available to elect a new
* master.
* – Timeouts – Customize the amount of time repmgr waits
* for such things as waiting for acknowledgements or attempting
* to reconnect to other sites.
* – Site list – return a list of sites currently known to repmgr.
*/
/* 打开数据库环境。*/
if ((ret = env_init(dbenv, setup_info.home)) != 0)
goto err;
/* Start checkpoint and log archive threads. */
sup_args.dbenv = dbenv;
sup_args.shared = &my_app_data.shared_data;
if ((ret = start_support_threads(dbenv, &sup_args, &ckp_thr,
&lga_thr)) != 0)
goto err;
if ((ret = dbenv->repmgr_start(dbenv, 3, start_policy)) != 0)
goto err;
if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) {
dbenv->err(dbenv, ret, “Client failed”);
goto err;
}
/* Finish checkpoint and log archive threads. */
if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0)
goto err;
/*
* We have used the DB_TXN_NOSYNC environment flag for improved
* performance without the usual sacrifice of transactional durability,
* as discussed in the “Transactional guarantees” page of the Reference
* Guide: if one replication site crashes, we can expect the data to
* exist at another site. However, in case we shut down all sites
* gracefully, we push out the end of the log here so that the most
* recent transactions don’t mysteriously disappear.
*/
if ((ret = dbenv->log_flush(dbenv, NULL)) != 0) {
dbenv->err(dbenv, ret, “log_flush”);
goto err;
}
err:
if (dbenv != NULL &&
(t_ret = dbenv->close(dbenv, 0)) != 0) {
fprintf(stderr, “failure closing env: %s (%d)\n”,
db_strerror(t_ret), t_ret);
if (ret == 0)
ret = t_ret;
}
return (ret);
}
/*
* 事件通知函数。当使用replication manager的时候,该事件已经被
* replication manager 适当地处理好了,所以在这个回调函数当中,
* 我们只需要简单地更新状态或者打印消息即可。
*/
static void
event_callback(dbenv, which, info)
DB_ENV *dbenv;
u_int32_t which;
void *info;
{
APP_DATA *app = dbenv->app_private;
SHARED_DATA *shared = &app->shared_data;
info = NULL; /* Currently unused. */
switch (which) {
case DB_EVENT_REP_CLIENT:
shared->is_master = 0;
shared->in_client_sync = 1;
break;
case DB_EVENT_REP_MASTER:
shared->is_master = 1;
shared->in_client_sync = 0;
break;
case DB_EVENT_REP_NEWMASTER:
shared->in_client_sync = 1;
break;
case DB_EVENT_REP_PERM_FAILED:
/*
* Did not get enough acks to guarantee transaction
* durability based on the configured ack policy. This
* transaction will be flushed to the master site’s
* local disk storage for durability.
*/
printf(
“Insufficient acknowledgements to guarantee transaction durability.\n”);
break;
case DB_EVENT_REP_STARTUPDONE:
shared->in_client_sync = 0;
break;
default:
dbenv->errx(dbenv, “ignoring event %d”, which);
}
}
请教一个EntityJoin的问题,EntityJoin join;ForwardCursor cursor = join.entities();
这个游标无法定位到最后一条记录,那多条件查询的结果倒排序该如何处理?
我看了 ReverseOrder 的例子,但是我不想破坏数据结构,请问还有其它处理方法吗?
请教一个关于BDB 的支持多key重复且排序(DB_DUPSORT)问题!我使用的是 BerkeleyDB 4.8.24 版本!
以下程序是在visual stdio 2008上编译的,主要是一个主数据库与一个次数据库,其中次数据库支持重复(DB_DUPSORT),比较函数由自己设(不使用系统的)但在实际测试中不会调自己设的回调函数(secondary_compare)来进行次索引的比较,但是如果次索引的字段改成u_char(本程序中用的为u_long)则会调到,这是为什么呢?有没有人知道的?
#include “db.h”
#include
typedef struct _testID
{
u_long PIndex;
u_long stime;
}TID;
u_long tmp;
//主索引比较函数
int primary_compare( DB* db, const DBT *key1, const DBT *key2)
{
u_long a = ((TID*)(key1 ->data))->PIndex;
u_long b = ((TID*)(key2 ->data))->PIndex;
return (a – b);
}
// secondary compare function
int secondary_compare( DB* db, const DBT *key1, const DBT *key2)
{
//在这个函数打上断点怎么老是执行不到?是不是前面我哪里设错了?
u_long a = *(u_long*)(key1->data);
u_long b = *(u_long*)(key2->data);
return (a – b);
}
// get
int ExtSKEYC(DB* dbp, const DBT* pkey, const DBT *pdata, DBT *skey)
{
skey->data = &((TID*)(pkey ->data))->stime;
skey->size = sizeof(u_long);
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
DB* db_primary = NULL;
DB* db_secondary = NULL;
// PRIMARY DB
int nRet = -1;
// CREATE
nRet = db_create(&db_primary, NULL, 0);
// COMPARE
nRet = db_primary->set_bt_compare(db_primary, primary_compare);
// open
nRet = db_primary->open(db_primary, NULL, “primary.db”, NULL, DB_BTREE, DB_CREATE,0);
// SECONDARY DB
nRet = db_create(&db_secondary, NULL, 0);
// flag
nRet = db_secondary->set_flags(db_secondary, DB_DUPSORT );
// compare
nRet = db_secondary->set_dup_compare(db_secondary, secondary_compare);
// open
nRet = db_secondary->open(db_secondary, NULL, “secondary.db”, NULL, DB_BTREE, DB_CREATE, 0);
// associate
nRet = db_primary->associate(db_primary, NULL, db_secondary, ExtSKEYC, 0);
// write ,以上每一步的返回值 nRet 都是0.
DBT key, data;
TID pindex = {0,0};
for( int i = 0; i set_dup_compare”;
data.size = strlen(“DB->set_dup_compare”) + 1;
nRet = db_primary->put( db_primary, NULL, &key, &data, 0);
}
getchar();
nRet = db_secondary->close(db_secondary, 0);
nRet = db_primary->close(db_primary, 0);
return 0;
}
不错,试试先
请教个问题:
replication开发中的repmgr_set_ack_policy函数赋值DB_REPMGR_ACKS_NONE有什么不利影响吗?
是否会出现主备间数据不一至,备机丢数的情况。谢谢!!
@和尚
使用了DB_REPMGR_ACKS_NONE意味着master不等待任何client确认消息就继续执行后续操作。 这样有可能使得master的日志信息还没完全同步到client, 造成二者数据在同一时间点上的不一致,也就是主备机不一致。
SLAVE在Master宕机时 被选举为Master。当老的Master恢复工作。如何设置回去。步骤是?
请教个问题:
我的程序中使用了berkeleydb,服务器是linux,当服务器意外断电重启时,程序重新重启后,打开某一个databases时,有时候会卡在那儿,怎么回事?谢谢
根据你的描述,卡死的情况应该与recovery相关。你可以提供更详尽的应用场景给我,以做详细的分析,emily.fu@oracle.com。
多线程访问下,只支持 多线程读/单线程写 的模式?
@clusterlee
见示例的注释部分:
replication group:由若干个site组成的集合,其中有且只有一个site 可以写入数据到数据库当中, 称为master;其余的site只可以从中读取数据,称为replica。
与多线程读写不相干。
你好,
请教一个问题。
一个master,一个client。我在master 大量插入数据。然后把client 杀掉,然后再重启client。之后,log是都同步的。但是client端的数据文件的大小却不再增长了。
我看DB文档支持 DB_REP_CONF_AUTOINIT
It is possible that there is a time of the day when it is better to perform a replica re-initialization. Or, you simply might want to decide to bring the replica up to speed by restoring its databases using a hot-backup taken from the master. Either way, you can decide to prevent automatic-initialization of your replica. To do this specify DB_REP_CONF_AUTOINIT to DbEnv::rep_set_config() and then specify 0 to the onoff parameter.
m_dbEnv->rep_set_config(DB_REP_CONF_AUTOINIT, 1)
可是设置了这个参数也不起作用。一点作用都不起。
遇到这种client断掉、死掉之后又重连的情况,应该怎么处理呢?
谢谢。
@all
最使用replication 的时候,replica已同步了master的log文件 但是为什么db文件一直都没有更新,后来我加入了范例中checkpoint线程,虽然db文件有更新,但是每次都要2、3分钟才能更新,看日志发现,master发送一个bulk log,在发送一个 sync,这个时候replica日志显示 延迟同步等待丢失的记录,当master在发送一个bulk log的时候 replica才同步了所有的db文件。 每次发送的时候中间都有时间间隔,不知道具体是怎么回事,有办法 直接全部同步到吗?