Skip to content

YCSB 测试

miyuan-ljr edited this page Nov 25, 2024 · 21 revisions

使用须知

由于 HBase Client 还正在开发中, 所以可能在使用这篇文档的时候可能会出现问题, 欢迎在 Issues 中提出你的问题.

如何运行 YCSB 测试

总览

你总共需要些什么?

  1. YCSB jar包
  2. 本文档提供的YCSB OBKV客户端
  3. YCSB 官方仓库的脚本

事前准备

获取 core-0.12.0.jar

为了运行 ycsb 测试, 我们需要把 obkv hbase 客户端变成 ycsb 能理解的对象. 这个时候我们需要按照 ycsb 提供的接口形式, 兼容我们的客户端(放心, 这一步我们已经为你做完了, 代码可见附录). 同时我们需要获取我们该如何实现我们客户端的信息, 于是我们需要下载 YCSB 源码, 然后编译出 core-0.12.0.jar.

YCSB OBKV 客户端

可见附录.

YCSB 脚本

下载 YCSB 源码的时候就已经一并下载好了, 具体的位置为 bin/ 下.

运行前准备

1. 编译 core-0.12.0.jar

在获取 YCSB 源码后, 执行以下指令切换分支到0.12.0

git checkout 0.12.0

执行以下命令编译并获取jar包.

mvn -pl com.yahoo.ycsb:core -am clean package -Dmaven.test.skip=true 

执行完后我们需要的 core-0.12.0.jar 位于 YCSB/core/target/core-0.12.0.jar

2. 编译 YCSB OBKV 客户端

添加 ycsb core依赖

将我们刚刚打包好的 core-0.12.0.jar 放到 obkv-hbase-client-java 根目录下, 并且修改 pom.xml 文件, 增加 com.yahoo.ycsb 依赖.

// ...
    <dependency>
            <groupId>org.powermock</groupId>
            <artifactId>powermock-module-junit4</artifactId>
            <version>${powermock.version}</version>
            <scope>test</scope>
    </dependency>
    <dependency>
            <groupId>com.yahoo.ycsb</groupId>
            <artifactId>core</artifactId>
            <version>0.12.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/core-0.12.0.jar</systemPath>
    </dependency>
</dependencies>

编写 obkv ycsb 客户端

客户端我们已经完成了, 复制附录代码至 src/main/java/com/alipay/oceanbase/hbase/util/.

接着就是编写我们 obkv ycsb 客户端. 首先确保pom.xml文件中引入了assembly插件,如果没有的话需要在pom文件的build中增加:

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

然后在根目录下运行如下命令:

// 可以不带 -Drelease
mvn assembly:assembly -Dmaven.test.skip=true -Drelease

如果上述命令执行失败或者是windows或者过程中遇到了其他问题可以尝试如下命令:

// 在最新的plugin中需要使用single,windows平台因为参数识别原因,部分参数需要加引号
mvn clean package assembly:single "-Dmaven.test.skip=true" "-Dcheckstyle.skip=true" -Drelease

顺利的话能得到打包出来的 OBKV YCSB 客户端 target/obkv-hbase-client-0.1.3-SNAPSHOT-jar-with-dependencies.jar, 后续我们将使用这个jar包来进行ycsb测试.

3. 修改 YCSB 脚本

获取 YCSB 脚本

在获取 YCSB 源码后, 执行以下指令切换分支到0.12.0:

git checkout 0.12.0

此处可以使用我们步骤1所操作的文件夹

修改 YCSB 脚本

首先我们需要让 YCSB 能识别我们的客户端, 我们需要修改 ycsb文件, 增加ob-hbase的客户端路径.

// ...
    "tarantool"    : "com.yahoo.ycsb.db.TarantoolClient",
    "voldemort"    : "com.yahoo.ycsb.db.VoldemortClient",
    "ob-hbase"     : "com.alipay.oceanbase.hbase.util.YcsbBench",
}

导入OBKV YCSB 客户端

根据上一步填写的客户端名称, 在根目录下创建文件夹 ob-binding/lib/, 同时将客户端放入, 结构应该如下:

YCSB/ob-binding/lib/obkv-hbase-client-0.1.3-SNAPSHOT-jar-with-dependencies.jar

YCSB文件夹内修改

  1. 将根目录下的pom.xml文件更名为pom.xml.backup
  2. 把我们在上面打包好的core-0.12.0.jar传入根目录下的lib文件夹(需自行创建)内
  3. 在lib下执行指令下载相关包
sudo wget https://repo1.maven.org/maven2/org/apache/htrace/htrace-core4/4.1.0-incubating/htrace-core4-4.1.0-incubating.jar
sudo wget https://repo1.maven.org/maven2/org/hdrhistogram/HdrHistogram/2.1.4/HdrHistogram-2.1.4.jar

至此, 所有的准备工作已经完成, 后续就可以进行测试了.

运行YCSB

OBKV HBase 参数

在执行测试之前, 我们需要了解一下 OBKV HBase 客户端的相关参数, 这些参数不仅用于连接上对应的OceanBase Server, 同时它也能控制一些客户端的有关性能的参数.

// 使用直连模式需要:
"hbase.oceanbase.paramURL"     -> config URL
"hbase.oceanbase.fullUserName" -> full user name like userName@tenantName#clusterName
"hbase.oceanbase.password"     -> password of user
"hbase.oceanbase.sysUserName"  -> SYS Tenant user name
"hbase.oceanbase.sysPassword"  -> password of sys user

// 使用ODP需要:
"hbase.oceanbase.odpMode"      -> whether to use ODP
"hbase.oceanbase.odpAddr"      -> ip addr of ODP
"hbase.oceanbase.odpPort"      -> port of ODP
"hbase.oceanbase.fullUserName" -> full user name like userName@tenantName#clusterName
"hbase.oceanbase.password"     -> password of user
"hbase.oceanbase.database"     -> database name

更多参数可以参考 com.alipay.oceanbase.hbase.constants, 这里只对影响登录的参数进行说明. 我们强烈建议你使用直连模式:D

Workload

workload 的配置是YCSB本身所携带的, 更多关于workload的描述可见 YCSB core-workload.

执行什么操作或者是多少数据量都是由workload决定的, 当然你也可以通过其他方式(比如命令行一个一个输入)来传参数.

注意:

  1. workload的requestdistribution可能会影响服务端的资源利用率(比如uniform/zipfian)

如何运行

运行脚本位于 YCSB 文件夹下的 YCSB/bin 目录下, 我们只需通过脚本执行命令即可, 需要注意的是我们需要把 OBKV YCSB Client的参数通过命令行输入的形式带入, 这里写一个例子:

./bin/ycsb run ob-hbase -P workloads/your_workload -s -threads 10 -p hbase.oceanbase.paramURL="$config_server_url" \
    -p hbase.oceanbase.fullUserName="$userName@tenantName#clusterName" -p hbase.oceanbase.password="$your_pwd" \
    -p hbase.oceanbase.sysUserName="sys" -p hbase.oceanbase.sysPassword="$your_sys_pwd" \
    -p hbase.oceanbase.table="$tableName" -p hbase.oceanbase.columnFamily="$cf"

更多关于脚本的信息可以参考 Running-a-Workload

至此YCSB应是跑起来了

凡事总须研究, 才会明白. 有问题可以直接在 Issue 中提出, 或者与我们取得联系:x

附录

YcsbBench.java

/*-
 * #%L
 * OBKV HBase Client Framework
 * %%
 * Copyright (C) 2022 OceanBase Group
 * %%
 * OBKV HBase Client Framework is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * #L%
 */
package com.alipay.oceanbase.hbase.util;

import com.alipay.oceanbase.hbase.OHTable;
import com.alipay.oceanbase.rpc.property.Property;
import com.yahoo.ycsb.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
import static com.yahoo.ycsb.Status.*;
import static org.apache.commons.lang.StringUtils.isNotBlank;

public class YcsbBench extends DB {
    public static final String COLUMN_FAMILY = "hbase.oceanbase.columnFamily";
    public static final String TABLE         = "hbase.oceanbase.table";
    private String             columnFamily;
    private byte[]             columnFamilyBytes;
    private String             table;
    public boolean             debug         = false;
    private OHTable            ohTable;

    /**
     * 初始化,可以从 java 启动参数中传入
     * @throws DBException exception
     */
    public void init() throws DBException {
        Properties props = getProperties();
        columnFamily = props.getProperty(COLUMN_FAMILY);
        table = props.getProperty(TABLE);
        boolean odpMode = false;
        Configuration conf = new Configuration();

        Preconditions.checkArgument(isNotBlank(props.getProperty(COLUMN_FAMILY)),
            "columnFamily is blank!");
        Preconditions.checkArgument(isNotBlank(props.getProperty(TABLE)), "table is blank!");
        Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_FULL_USER_NAME)),
            "full user name is blank!");

        if (props.getProperty(HBASE_OCEANBASE_ODP_MODE) != null) {
            odpMode = Boolean.parseBoolean(props.getProperty(HBASE_OCEANBASE_ODP_MODE));
        }
        if (odpMode) {
            conf.setBoolean(HBASE_OCEANBASE_ODP_MODE, true);
            conf.set(HBASE_OCEANBASE_FULL_USER_NAME,
                props.getProperty(HBASE_OCEANBASE_FULL_USER_NAME));
            conf.set(HBASE_OCEANBASE_PASSWORD, props.getProperty(HBASE_OCEANBASE_PASSWORD));
            Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_ODP_ADDR)),
                "odp addr is blank!");
            conf.set(HBASE_OCEANBASE_ODP_ADDR, props.getProperty(HBASE_OCEANBASE_ODP_ADDR));
            Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_ODP_PORT)),
                "odp port is blank!");
            conf.setInt(HBASE_OCEANBASE_ODP_PORT,
                Integer.parseInt(props.getProperty(HBASE_OCEANBASE_ODP_PORT)));
            Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_DATABASE)),
                "database name is blank!");
            conf.set(HBASE_OCEANBASE_DATABASE, props.getProperty(HBASE_OCEANBASE_DATABASE));
        } else {
            Preconditions.checkArgument(isNotBlank(props.getProperty(HBASE_OCEANBASE_PARAM_URL)),
                "param url is blank!");
            conf.set(HBASE_OCEANBASE_PARAM_URL, props.getProperty(HBASE_OCEANBASE_PARAM_URL));
            Preconditions.checkArgument(
                isNotBlank(props.getProperty(HBASE_OCEANBASE_SYS_USER_NAME)), "sys name is blank!");
            conf.set(HBASE_OCEANBASE_SYS_USER_NAME,
                props.getProperty(HBASE_OCEANBASE_SYS_USER_NAME));
            conf.set(HBASE_OCEANBASE_SYS_PASSWORD, props.getProperty(HBASE_OCEANBASE_SYS_PASSWORD));
            conf.set(HBASE_OCEANBASE_FULL_USER_NAME,
                props.getProperty(HBASE_OCEANBASE_FULL_USER_NAME));
            conf.set(HBASE_OCEANBASE_PASSWORD, props.getProperty(HBASE_OCEANBASE_PASSWORD));
        }
        // Some other useful property
        for (Property property : Property.values()) {
            String value = props.getProperty(property.getKey());
            if (value != null) {
                conf.set(property.getKey(), value);
            }
        }

        try {
            ohTable = new OHTable(conf, table);
        } catch (Exception e) {
            throw new DBException(e);
        }
        if ((getProperties().getProperty("debug") != null)
            && (getProperties().getProperty("debug").compareTo("true") == 0)) {
            debug = true;
        }
        columnFamilyBytes = Bytes.toBytes(columnFamily);
    }

    /**
     * 读取数据测试,目前无法测试批量读取
     * @param table table
     * @param key key
     * @param fields fields
     * @param result result
     * @return ans
     */
    @Override
    public Status read(String table, String key, Set<String> fields,
                       HashMap<String, ByteIterator> result) {
        Result r = null;
        try {
            if (debug) {
                System.out.println("Doing read from HBase columnfamily " + columnFamily);
                System.out.println("Doing read for key: " + key);
            }
            Get g = new Get(Bytes.toBytes(key));
            if (fields == null) {
                g.addFamily(columnFamilyBytes);
            } else {
                for (String field : fields) {
                    g.addColumn(columnFamilyBytes, Bytes.toBytes(field));
                }
            }
            r = ohTable.get(g);
        } catch (IOException e) {
            System.err.println("Error doing get: " + e);
            return SERVICE_UNAVAILABLE;
        } catch (ConcurrentModificationException e) {
            return SERVICE_UNAVAILABLE;
        }
        for (KeyValue kv : r.raw()) {
            result.put(Bytes.toString(kv.getQualifier()), new ByteArrayByteIterator(kv.getValue()));
            if (debug) {
                System.out.println("Result for field: " + Bytes.toString(kv.getQualifier())
                                   + " is: " + Bytes.toString(kv.getValue()));
            }
        }
        return OK;
    }

    /**
     * 目前的 obkv 接口会扫描大于等于 start 取前 recordcount,可能对性能测试有影响
     * @param table table
     * @param startkey startkey
     * @param recordcount recordcount
     * @param fields fields
     * @param result result
     * @return ans
     */
    @Override
    public Status scan(String table, String startkey, int recordcount, Set<String> fields,
                       Vector<HashMap<String, ByteIterator>> result) {
        Scan s = new Scan(Bytes.toBytes(startkey));
        //HBase has no record limit.  Here, assume recordcount is small enough to bring back in one call.
        //We get back recordcount records
        s.setCaching(recordcount);
        s.setMaxVersions(1);
        //add specified fields or else all fields
        if (fields == null) {
            s.addFamily(columnFamilyBytes);
        } else {
            for (String field : fields) {
                s.addColumn(columnFamilyBytes, Bytes.toBytes(field));
            }
        }
        //get results
        ResultScanner scanner = null;
        try {
            scanner = ohTable.getScanner(s);
            int numResults = 0;
            for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
                //get row key
                String key = Bytes.toString(rr.getRow());
                if (debug) {
                    System.out.println("Got scan result for key: " + key);
                }
                HashMap<String, ByteIterator> rowResult = new HashMap<String, ByteIterator>();
                for (KeyValue kv : rr.raw()) {
                    rowResult.put(Bytes.toString(kv.getQualifier()),
                        new ByteArrayByteIterator(kv.getValue()));
                }
                //add rowResult to result vector
                result.add(rowResult);
                numResults++;
                if (numResults >= recordcount) //if hit recordcount, bail out
                {
                    break;
                }
            } //done with row
        } catch (IOException e) {
            if (debug) {
                System.out.println("Error in getting/parsing scan result: " + e);// NOPMD
            }
            return SERVICE_UNAVAILABLE;
        } finally {
            scanner.close();
        }
        return OK;
    }

    /**
     * 更新操作,目前不支持批量接口测试
     * @param table table
     * @param key key
     * @param values values
     * @return ans
     */
    @Override
    public Status update(String table, String key, HashMap<String, ByteIterator> values) {
        if (debug) {
            System.out.println("Setting up put for key: " + key);// NOPMD
        }
        Put p = new Put(Bytes.toBytes(key));
        for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
            if (debug) {
                System.out.println("Adding field/value " + entry.getKey() + "/" + entry.getValue()// NOPMD
                                   + " to put request");// NOPMD
            }
            p.add(columnFamilyBytes, Bytes.toBytes(entry.getKey()), entry.getValue().toArray());
        }
        try {
            ohTable.put(p);
        } catch (IOException e) {
            if (debug) {
                System.err.println("Error doing put: " + e);// NOPMD
            }
            return SERVICE_UNAVAILABLE;
        } catch (ConcurrentModificationException e) {
            //do nothing for now...hope this is rare
            return SERVICE_UNAVAILABLE;
        }
        return OK;
    }

    @Override
    public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
        return update(table, key, values);
    }

    /**
     * 删除接口,目前不支持批量删除测试
     * @param table table
     * @param key key
     * @return ans
     */
    @Override
    public Status delete(String table, String key) {
        Delete delete = new Delete(Bytes.toBytes(key));
        delete.deleteFamily(columnFamilyBytes);
        try {
            ohTable.delete(delete);
            return OK;
        } catch (IOException e) {
            if (debug) {
                System.err.println("Error doing delete: " + e);// NOPMD
            }
            return ERROR;
        }
    }
}