-
Notifications
You must be signed in to change notification settings - Fork 21
YCSB 测试
由于 HBase Client 还正在开发中, 所以可能在使用这篇文档的时候可能会出现问题, 欢迎在 Issues 中提出你的问题.
你总共需要些什么?
- YCSB jar包
- 本文档提供的YCSB OBKV客户端
- YCSB 官方仓库的脚本
为了运行 ycsb 测试, 我们需要把 obkv hbase 客户端变成 ycsb 能理解的对象. 这个时候我们需要按照 ycsb 提供的接口形式, 兼容我们的客户端(放心, 这一步我们已经为你做完了, 代码可见附录). 同时我们需要获取我们该如何实现我们客户端的信息, 于是我们需要下载 YCSB 源码, 然后编译出 core-0.12.0.jar.
可见附录.
下载 YCSB 源码的时候就已经一并下载好了, 具体的位置为 bin/ 下.
在获取 YCSB 源码后, 执行以下指令切换分支到0.12.0
git checkout 0.12.0
执行以下命令编译并获取jar包.
mvn -pl com.yahoo.ycsb:core -am clean package -Dmaven.test.skip=true
执行完后我们需要的 core-0.12.0.jar 位于 YCSB/core/target/core-0.12.0.jar
将我们刚刚打包好的 core-0.12.0.jar 放到 obkv-hbase-client-java 根目录下, 并且修改 pom.xml 文件, 增加 com.yahoo.ycsb 依赖.
// ...
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>0.12.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/core-0.12.0.jar</systemPath>
</dependency>
</dependencies>
客户端我们已经完成了, 复制附录代码至 src/main/java/com/alipay/oceanbase/hbase/util/
.
接着就是编写我们 obkv ycsb 客户端. 首先确保pom.xml文件中引入了assembly插件,如果没有的话需要在pom文件的build中增加:
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
然后在根目录下运行如下命令:
// 可以不带 -Drelease
mvn assembly:assembly -Dmaven.test.skip=true -Drelease
如果上述命令执行失败或者是windows或者过程中遇到了其他问题可以尝试如下命令:
// 在最新的plugin中需要使用single,windows平台因为参数识别原因,部分参数需要加引号
mvn clean package assembly:single "-Dmaven.test.skip=true" "-Dcheckstyle.skip=true" -Drelease
顺利的话能得到打包出来的 OBKV YCSB 客户端 target/obkv-hbase-client-0.1.3-SNAPSHOT-jar-with-dependencies.jar, 后续我们将使用这个jar包来进行ycsb测试.
在获取 YCSB 源码后, 执行以下指令切换分支到0.12.0:
git checkout 0.12.0
此处可以使用我们步骤1所操作的文件夹
首先我们需要让 YCSB 能识别我们的客户端, 我们需要修改 ycsb文件, 增加ob-hbase的客户端路径.
// ...
"tarantool" : "com.yahoo.ycsb.db.TarantoolClient",
"voldemort" : "com.yahoo.ycsb.db.VoldemortClient",
"ob-hbase" : "com.alipay.oceanbase.hbase.util.YcsbBench",
}
根据上一步填写的客户端名称, 在根目录下创建文件夹 ob-binding/lib/
, 同时将客户端放入, 结构应该如下:
YCSB/ob-binding/lib/obkv-hbase-client-0.1.3-SNAPSHOT-jar-with-dependencies.jar
- 将根目录下的
pom.xml
文件更名为pom.xml.backup
- 把我们在上面打包好的
core-0.12.0.jar
传入根目录下的lib
文件夹(需自行创建)内 - 在lib下执行指令下载相关包
sudo wget https://repo1.maven.org/maven2/org/apache/htrace/htrace-core4/4.1.0-incubating/htrace-core4-4.1.0-incubating.jar
sudo wget https://repo1.maven.org/maven2/org/hdrhistogram/HdrHistogram/2.1.4/HdrHistogram-2.1.4.jar
至此, 所有的准备工作已经完成, 后续就可以进行测试了.
在执行测试之前, 我们需要了解一下 OBKV HBase 客户端的相关参数, 这些参数不仅用于连接上对应的OceanBase Server, 同时它也能控制一些客户端的有关性能的参数.
// 使用直连模式需要:
"hbase.oceanbase.paramURL" -> config URL
"hbase.oceanbase.fullUserName" -> full user name like userName@tenantName#clusterName
"hbase.oceanbase.password" -> password of user
"hbase.oceanbase.sysUserName" -> SYS Tenant user name
"hbase.oceanbase.sysPassword" -> password of sys user
// 使用ODP需要:
"hbase.oceanbase.odpMode" -> whether to use ODP
"hbase.oceanbase.odpAddr" -> ip addr of ODP
"hbase.oceanbase.odpPort" -> port of ODP
"hbase.oceanbase.fullUserName" -> full user name like userName@tenantName#clusterName
"hbase.oceanbase.password" -> password of user
"hbase.oceanbase.database" -> database name
更多参数可以参考 com.alipay.oceanbase.hbase.constants
, 这里只对影响登录的参数进行说明. 我们强烈建议你使用直连模式:D
workload 的配置是YCSB本身所携带的, 更多关于workload的描述可见 YCSB core-workload.
执行什么操作或者是多少数据量都是由workload决定的, 当然你也可以通过其他方式(比如命令行一个一个输入)来传参数.
注意:
- workload的requestdistribution可能会影响服务端的资源利用率(比如uniform/zipfian)
运行脚本位于 YCSB 文件夹下的 YCSB/bin
目录下, 我们只需通过脚本执行命令即可, 需要注意的是我们需要把 OBKV YCSB Client的参数通过命令行输入的形式带入, 这里写一个例子:
./bin/ycsb run ob-hbase -P workloads/your_workload -s -threads 10 -p hbase.oceanbase.paramURL="$config_server_url" \
-p hbase.oceanbase.fullUserName="$userName@tenantName#clusterName" -p hbase.oceanbase.password="$your_pwd" \
-p hbase.oceanbase.sysUserName="sys" -p hbase.oceanbase.sysPassword="$your_sys_pwd" \
-p hbase.oceanbase.table="$tableName" -p hbase.oceanbase.columnFamily="$cf"
更多关于脚本的信息可以参考 Running-a-Workload
凡事总须研究, 才会明白. 有问题可以直接在 Issue 中提出, 或者与我们取得联系:x
/*-
* #%L
* OBKV HBase Client Framework
* %%
* Copyright (C) 2022 OceanBase Group
* %%
* OBKV HBase Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/
package com.alipay.oceanbase.hbase.util;
import com.alipay.oceanbase.hbase.OHTable;
import com.alipay.oceanbase.rpc.property.Property;
import com.yahoo.ycsb.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
import static com.yahoo.ycsb.Status.*;
import static org.apache.commons.lang.StringUtils.isNotBlank;
public class YcsbBench extends DB {
public static final String COLUMN_FAMILY = "hbase.oceanbase.columnFamily";
public static final String TABLE = "hbase.oceanbase.table";
private String columnFamily;
private byte[] columnFamilyBytes;
private String table;
public boolean debug = false;
private OHTable ohTable;
/**
* 初始化,可以从 java 启动参数中传入
* @throws DBException exception
*/
public void init() throws DBException {
Properties props = getProperties();
columnFamily = props.getProperty(COLUMN_FAMILY);
table = props.getProperty(TABLE);
boolean odpMode = false;
Configuration conf = new Configuration();
Preconditions.checkArgument(isNotBlank(props.getProperty(COLUMN_FAMILY)),
"columnFamily is blank!");
Preconditions.checkArgument(isNotBlank(props.getProperty(TABLE)), "table is blank!");
Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_FULL_USER_NAME)),
"full user name is blank!");
if (props.getProperty(HBASE_OCEANBASE_ODP_MODE) != null) {
odpMode = Boolean.parseBoolean(props.getProperty(HBASE_OCEANBASE_ODP_MODE));
}
if (odpMode) {
conf.setBoolean(HBASE_OCEANBASE_ODP_MODE, true);
conf.set(HBASE_OCEANBASE_FULL_USER_NAME,
props.getProperty(HBASE_OCEANBASE_FULL_USER_NAME));
conf.set(HBASE_OCEANBASE_PASSWORD, props.getProperty(HBASE_OCEANBASE_PASSWORD));
Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_ODP_ADDR)),
"odp addr is blank!");
conf.set(HBASE_OCEANBASE_ODP_ADDR, props.getProperty(HBASE_OCEANBASE_ODP_ADDR));
Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_ODP_PORT)),
"odp port is blank!");
conf.setInt(HBASE_OCEANBASE_ODP_PORT,
Integer.parseInt(props.getProperty(HBASE_OCEANBASE_ODP_PORT)));
Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_DATABASE)),
"database name is blank!");
conf.set(HBASE_OCEANBASE_DATABASE, props.getProperty(HBASE_OCEANBASE_DATABASE));
} else {
Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_PARAM_URL)),
"param url is blank!");
conf.set(HBASE_OCEANBASE_PARAM_URL, props.getProperty(HBASE_OCEANBASE_PARAM_URL));
Preconditions.checkArgument(
isNotBlank(props.getProperty(HBASE_OCEANBASE_SYS_USER_NAME)), "sys name is blank!");
conf.set(HBASE_OCEANBASE_SYS_USER_NAME,
props.getProperty(HBASE_OCEANBASE_SYS_USER_NAME));
conf.set(HBASE_OCEANBASE_SYS_PASSWORD, props.getProperty(HBASE_OCEANBASE_SYS_PASSWORD));
conf.set(HBASE_OCEANBASE_FULL_USER_NAME,
props.getProperty(HBASE_OCEANBASE_FULL_USER_NAME));
conf.set(HBASE_OCEANBASE_PASSWORD, props.getProperty(HBASE_OCEANBASE_PASSWORD));
}
// Some other useful property
for (Property property : Property.values()) {
String value = props.getProperty(property.getKey());
if (value != null) {
conf.set(property.getKey(), value);
}
}
try {
ohTable = new OHTable(conf, table);
} catch (Exception e) {
throw new DBException(e);
}
if ((getProperties().getProperty("debug") != null)
&& (getProperties().getProperty("debug").compareTo("true") == 0)) {
debug = true;
}
columnFamilyBytes = Bytes.toBytes(columnFamily);
}
/**
* 读取数据测试,目前无法测试批量读取
* @param table table
* @param key key
* @param fields fields
* @param result result
* @return ans
*/
@Override
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
Result r = null;
try {
if (debug) {
System.out.println("Doing read from HBase columnfamily " + columnFamily);
System.out.println("Doing read for key: " + key);
}
Get g = new Get(Bytes.toBytes(key));
if (fields == null) {
g.addFamily(columnFamilyBytes);
} else {
for (String field : fields) {
g.addColumn(columnFamilyBytes, Bytes.toBytes(field));
}
}
r = ohTable.get(g);
} catch (IOException e) {
System.err.println("Error doing get: " + e);
return SERVICE_UNAVAILABLE;
} catch (ConcurrentModificationException e) {
return SERVICE_UNAVAILABLE;
}
for (KeyValue kv : r.raw()) {
result.put(Bytes.toString(kv.getQualifier()), new ByteArrayByteIterator(kv.getValue()));
if (debug) {
System.out.println("Result for field: " + Bytes.toString(kv.getQualifier())
+ " is: " + Bytes.toString(kv.getValue()));
}
}
return OK;
}
/**
* 目前的 obkv 接口会扫描大于等于 start 取前 recordcount,可能对性能测试有影响
* @param table table
* @param startkey startkey
* @param recordcount recordcount
* @param fields fields
* @param result result
* @return ans
*/
@Override
public Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
Scan s = new Scan(Bytes.toBytes(startkey));
//HBase has no record limit. Here, assume recordcount is small enough to bring back in one call.
//We get back recordcount records
s.setCaching(recordcount);
s.setMaxVersions(1);
//add specified fields or else all fields
if (fields == null) {
s.addFamily(columnFamilyBytes);
} else {
for (String field : fields) {
s.addColumn(columnFamilyBytes, Bytes.toBytes(field));
}
}
//get results
ResultScanner scanner = null;
try {
scanner = ohTable.getScanner(s);
int numResults = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
//get row key
String key = Bytes.toString(rr.getRow());
if (debug) {
System.out.println("Got scan result for key: " + key);
}
HashMap<String, ByteIterator> rowResult = new HashMap<String, ByteIterator>();
for (KeyValue kv : rr.raw()) {
rowResult.put(Bytes.toString(kv.getQualifier()),
new ByteArrayByteIterator(kv.getValue()));
}
//add rowResult to result vector
result.add(rowResult);
numResults++;
if (numResults >= recordcount) //if hit recordcount, bail out
{
break;
}
} //done with row
} catch (IOException e) {
if (debug) {
System.out.println("Error in getting/parsing scan result: " + e);// NOPMD
}
return SERVICE_UNAVAILABLE;
} finally {
scanner.close();
}
return OK;
}
/**
* 更新操作,目前不支持批量接口测试
* @param table table
* @param key key
* @param values values
* @return ans
*/
@Override
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
if (debug) {
System.out.println("Setting up put for key: " + key);// NOPMD
}
Put p = new Put(Bytes.toBytes(key));
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
if (debug) {
System.out.println("Adding field/value " + entry.getKey() + "/" + entry.getValue()// NOPMD
+ " to put request");// NOPMD
}
p.add(columnFamilyBytes, Bytes.toBytes(entry.getKey()), entry.getValue().toArray());
}
try {
ohTable.put(p);
} catch (IOException e) {
if (debug) {
System.err.println("Error doing put: " + e);// NOPMD
}
return SERVICE_UNAVAILABLE;
} catch (ConcurrentModificationException e) {
//do nothing for now...hope this is rare
return SERVICE_UNAVAILABLE;
}
return OK;
}
@Override
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
return update(table, key, values);
}
/**
* 删除接口,目前不支持批量删除测试
* @param table table
* @param key key
* @return ans
*/
@Override
public Status delete(String table, String key) {
Delete delete = new Delete(Bytes.toBytes(key));
delete.deleteFamily(columnFamilyBytes);
try {
ohTable.delete(delete);
return OK;
} catch (IOException e) {
if (debug) {
System.err.println("Error doing delete: " + e);// NOPMD
}
return ERROR;
}
}
}