`
chenjingbo
  • 浏览: 456433 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

对HBase的一些简单DAO操作

阅读更多
这个是我自己封装的一些java操作HBase的CURD.当然由于刚开始写,肯定还有一些是遗忘的.后面等项目结束了,再来完善吧.没什么其他可以说的,直接看代码
package com.koubei.hidb.dao;

import java.io.IOException;
import java.util.Map;

/**
 * @author zhenghui 
 * @version 1.0
 * @data 2010-12-20 下午02:19:53
 * 
 *       对hbase的DAO操作(接口)
 */
public interface IHBaseDao {
	/**
	 * 创建表
	 * 
	 * @param tableName
	 *            表名,
	 * @param columns
	 *            family name组成的数组
	 * @throws IOException
	 */
	void createHTable(String tableName, String[] columns) throws IOException;

	/**
	 * 创建表
	 * 
	 * @param tableName
	 *            表名
	 * @throws IOException
	 */
	void createHTable(String tableName) throws IOException;

	/**
	 * 插入数据或者修改数据.对应于shell操作中的 put
	 * 
	 * @param tableName
	 *            要插入的table name
	 * @param row
	 *            行
	 * @param family
	 *            列簇
	 * @param qualifier
	 *            列
	 * @param value
	 *            值
	 * @throws IOException
	 */
	void insertAndUpdate(String tableName, String row, String family,
			String qualifier, String value) throws IOException;

	/**
	 * 删除列名(也就是删除family.属于修改表结构的操作.调用的时候请慎重)
	 * 
	 * @param tableName
	 * @param colName
	 * @throws IOException
	 */
	void removeFamily(String tableName, String colName) throws IOException;

	/**
	 * 删除某个family下的某个列的某行
	 * 
	 * @param tableName
	 *            表名
	 * @param rowID
	 *            行名
	 * @param colName
	 *            family name(列簇)
	 * @param cluster
	 *            列名
	 * @throws IOException
	 */
	void deleteColumn(String tableName, String rowID, String colName,
			String cluster) throws IOException;

	/**
	 * 获取某一行,某一列簇中的某一列的值
	 * 
	 * @param tableName
	 *            表名
	 * @param rowID
	 *            行
	 * @param colName
	 *            列簇
	 * @param cluster
	 *            列
	 * @return
	 * @throws IOException
	 */
	String getValue(String tableName, String rowID, String colName,
			String cluster) throws IOException;

	/**
	 * 获取某一列的所有行值
	 * 
	 * @param tableName
	 *            表名
	 * @param colName
	 *            列簇名
	 * @param cluster
	 *            列名
	 * @return
	 * @throws IOException
	 */
	Map<String, String> getColumnValue(String tableName, String colName,
			String cluster) throws IOException;

}


下面是实现
package com.koubei.hidb.dao;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * @author zhenghui E-mail:zhenghui.cjb@taobao.com
 * @version 1.0
 * @data 2010-12-21 上午10:55:17
 * 
 */
public class HbaseDao implements IHBaseDao {
	public final static String COLENDCHAR = String
			.valueOf(KeyValue.COLUMN_FAMILY_DELIMITER);// ":" 列簇和列之间的分隔符

	HBaseConfiguration conf;
	HBaseAdmin admin;

	public HBaseConfiguration getConf() {
		return conf;
	}

	public void setConf(HBaseConfiguration conf) {
		this.conf = conf;
	}

	public HBaseAdmin getAdmin() {
		return admin;
	}

	public void setAdmin(HBaseAdmin admin) {
		this.admin = admin;
	}

	public void createHTable(String tableName, String[] columns)
			throws IOException {
		try {
			if (admin.tableExists(tableName))
				return;// 判断表是否已经存在
			HTableDescriptor htdesc = this.createHTDesc(tableName);
			for (int i = 0; i < columns.length; i++) {
				String colName = columns[i];
				this.addFamily(htdesc, colName, false);
			}
			admin.createTable(htdesc);
		} catch (IOException e) {
			throw e;
		}
	}

	public void createHTable(String tableName) throws IOException {
		try {
			if (admin.tableExists(tableName))
				return;// 判断表是否已经存在
			HTableDescriptor htdesc = this.createHTDesc(tableName);
			admin.createTable(htdesc);
		} catch (IOException e) {
			throw e;
		}
	}

	public void deleteColumn(String tableName, String rowID, String colName,
			String cluster) throws IOException {
		try {
			Delete del = new Delete(rowID.getBytes());
			if (cluster == null || "".equals(cluster))
				del.deleteColumn(colName.getBytes());
			else
				del.deleteColumn(colName.getBytes(), cluster.getBytes());
			HTable hTable = this.getHTable(tableName);
			hTable.delete(del);
		} catch (IOException e) {
			throw e;
		}
	}

	public Map<String, String> getColumnValue(String tableName, String colName,
			String cluster) throws IOException {
		ResultScanner scanner = null;
		try {
			HTable hTable = this.getHTable(tableName);
			scanner = hTable.getScanner(colName.getBytes(), cluster.getBytes());
			Result rowResult = scanner.next();
			Map<String, String> resultMap = new HashMap<String, String>();
			String row;
			while (rowResult != null) {
				row = new String(rowResult.getRow());
				resultMap.put(row, new String(rowResult.getValue(colName
						.getBytes(), cluster.getBytes())));
				rowResult = scanner.next();
			}
			return resultMap;
		} catch (IOException e) {
			throw e;
		} finally {
			if (scanner != null) {
				scanner.close();// 一定要关闭
			}
		}
	}

	public String getValue(String tableName, String rowID, String colName,
			String cluster) throws IOException {
		try {
			HTable hTable = this.getHTable(tableName);
			Get get = new Get(rowID.getBytes());
			Result result = hTable.get(get);
			byte[] b = result.getValue(colName.getBytes(), cluster.getBytes());
			if (b == null)
				return "";
			else
				return new String(b);
		} catch (IOException e) {
			throw e;
		}
	}

	public void insertAndUpdate(String tableName, String row, String family,
			String qualifier, String value) throws IOException {
		HTable table = this.getHTable(tableName);
		Put p = new Put(Bytes.toBytes(row));
		p.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes
				.toBytes(value));
		table.put(p);

	}

	public void removeFamily(String tableName, String colName)
			throws IOException {
		try {
			String tmp = this.fixColName(colName);
			if (admin.isTableAvailable(tableName))
				admin.disableTable(tableName);
			this.admin.deleteColumn(tableName, tmp);
			this.admin.enableTable(tableName);
		} catch (IOException e) {
			throw e;
		}
	}

	/**
	 * 给表添加列,此时不带列族
	 * 
	 * @param htdesc
	 * @param colName
	 * @param readonly
	 *            是否只读
	 * @throws Exception
	 */
	private void addFamily(HTableDescriptor htdesc, String colName,
			final boolean readonly) {
		htdesc.addFamily(this.createHCDesc(colName));
		htdesc.setReadOnly(readonly);
	}

	/**
	 * 创建列的描述,添加后,该列会有一个冒号的后缀,用于存储(列)族, 将来如果需要扩展,那么就在该列后加入(列)族
	 * 
	 * @param colName
	 * @return
	 */
	private HColumnDescriptor createHCDesc(String colName) {
		String tmp = this.fixColName(colName);
		byte[] colNameByte = Bytes.toBytes(tmp);
		return new HColumnDescriptor(colNameByte);
	}

	/**
	 * 针对hbase的列的特殊情况进行处理,列的情况: course: or course:math, 就是要么带列族,要么不带列族(以冒号结尾)
	 * 
	 * @param colName
	 *            列
	 * @param cluster
	 *            列族
	 * @return
	 */
	private String fixColName(String colName, String cluster) {
		if (cluster != null && cluster.trim().length() > 0
				&& colName.endsWith(cluster)) {
			return colName;
		}
		String tmp = colName;
		int index = colName.indexOf(COLENDCHAR);
		// int leng = colName.length();
		if (index == -1) {
			tmp += COLENDCHAR;
		}
		// 直接加入列族
		if (cluster != null && cluster.trim().length() > 0) {
			tmp += cluster;
		}
		return tmp;
	}

	private String fixColName(String colName) {
		return this.fixColName(colName, null);
	}

	/**
	 * 创建表的描述
	 * 
	 * @param tableName
	 * @return
	 * @throws Exception
	 */
	private HTableDescriptor createHTDesc(final String tableName) {
		return new HTableDescriptor(tableName);
	}

	/**
	 * 取得某个表
	 * 
	 * @param tableName
	 * @return
	 * @throws Exception
	 */
	private HTable getHTable(String tableName) throws IOException {
		try {
			return new HTable(conf, tableName);
		} catch (IOException e) {
			throw e;
		}
	}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics