`
wangmengbk
  • 浏览: 288706 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark 使用Java 写入 HBase

阅读更多

实例代码:

package com.bigdata.spark.hbase;

 

import java.io.IOException;

import java.util.List;

import java.util.regex.Pattern;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.MasterNotRunningException;

import org.apache.hadoop.hbase.ZooKeeperConnectionException;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.broadcast.Broadcast;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SQLContext;

 

import scala.Tuple2;

 

import com.bigdata.spark.SparkManager;

import com..bigdata.spark.pojo.Customer;

import com.bigdata.spark.sql.DataFrameManger;

 

public class SparkHbaseTest {

private  static final Pattern SPACE = Pattern.compile("\\|");

/**

* @param args

*/

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();

sparkConf.setAppName("sparkHbase");

sparkConf.setMaster("local");

try {

String tableName="customer";

//获取 JavaSparkContext

JavaSparkContext jsc = SparkManager.getInstance().getJavaSparkContxt(sparkConf);

//使用HBaseConfiguration.create()生成Configuration

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.property.clientPort", "4180");  

conf.set("hbase.zookeeper.quorum", "192.168.68.84");  

//conf.set("hbase.master", "192.168.68.84:60000");

conf.set(TableInputFormat.INPUT_TABLE, tableName);

 

createTable(conf,tableName);

 

final Broadcast<String> broadcastTableName = jsc.broadcast(tableName);

 

SQLContext sqlContext = SparkManager.getInstance().getSQLContext(jsc);

   

   System.out.println("=== Data source: RDD ===");

   //将文件内容信息转换为 java bean

   JavaRDD<String> jrdd = SparkManager.getInstance().getJavaRDD(jsc,"hdfs://192.168.68.84:9000/storm/app_hdfsBolt-5-1-1434442141499.log");

   JavaRDD<Customer> customerRdd = jrdd.map(

     new Function<String, Customer>() {

       @Override

       public Customer call(String line) {

         String[] parts = line.split("\\|");

         Customer customer = new Customer();

         customer.setId(parts[0]);

         customer.setCustomcode(parts[1]);

         customer.setCode(parts[2]);

         customer.setPrice(Double.parseDouble(parts[3]));

         return customer;

       }

     });

   //将 JavaRDD<Customer> 对象 序列化一个  tcustomer 表

   DataFrame schemaCustomer = DataFrameManger.getInstance().getDataFrameByFile(sqlContext, customerRdd, Customer.class);

   schemaCustomer.registerTempTable("tcustomer");

   

// 从表 tcustomer获取  符合条件的数据

   DataFrame teenagers = sqlContext.sql("SELECT id,customcode,code,price FROM tcustomer  WHERE price >= 110 AND price <= 500");

 //从结果做获取数据,  

  List<Object> clist = teenagers.toJavaRDD().map(new Function<Row, Object>() {

 

@Override

public Object call(Row row) throws Exception {

Customer cust = new Customer();

cust.setId(row.getString(0));

cust.setCustomcode(row.getString(1));

cust.setCode(row.getString(2));

cust.setPrice(row.getDouble(3));

return cust;

}

   

}).collect();

  

  //将数据转换为 javapairRDD 对象

  JavaPairRDD<String, Customer> jpairCust = teenagers.toJavaRDD().mapToPair(new PairFunction<Row, String, Customer>() {

@Override

public Tuple2<String, Customer> call(Row row) throws Exception {

Customer cust = new Customer();

cust.setId(row.getString(0));

cust.setCustomcode(row.getString(1));

cust.setCode(row.getString(2));

cust.setPrice(row.getDouble(3));

return new Tuple2<String, Customer>(row.getString(0), cust);

}

});

  

  jpairCust.foreach(new VoidFunction<Tuple2<String,Customer>>() {

 

@Override

public void call(Tuple2<String, Customer> t) throws Exception {

insertData(broadcastTableName.value(), t._2());

System.out.println("key:"+t._1()+"==code:"+t._2().getCustomcode());

}

});

  

   for (Object obj: clist) {

   Customer ct = (Customer) obj;

     System.out.println(ct.getId()+" "+ct.getCustomcode()+" "+ct.getCode()+" "+ct.getPrice());

   }

   jsc.stop();

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

 

}

 

/**

* 创建表

* @param conf

* @param tableName

*/

private static void createTable(Configuration conf,String tableName){

System.out.println("start create table ......");  

       try {  

           HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);  

           if (hBaseAdmin.tableExists(tableName)) {// 如果存在要创建的表,那么先删除,再创建  

               hBaseAdmin.disableTable(tableName);  

               hBaseAdmin.deleteTable(tableName);  

               System.out.println(tableName + " is exist,detele....");  

           }  

           HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);  

           tableDescriptor.addFamily(new HColumnDescriptor("column1"));  

           tableDescriptor.addFamily(new HColumnDescriptor("column2"));  

           tableDescriptor.addFamily(new HColumnDescriptor("column3"));

           tableDescriptor.addFamily(new HColumnDescriptor("column4"));

           hBaseAdmin.createTable(tableDescriptor);  

       } catch (MasterNotRunningException e) {  

           e.printStackTrace();  

       } catch (ZooKeeperConnectionException e) {  

           e.printStackTrace();  

       } catch (IOException e) {  

           e.printStackTrace();  

       }  

       System.out.println("end create table ......");  

}

 

/** 

     * 插入数据 

     * @param tableName 

* @throws IOException 

     */  

    public static void insertData(String tableName,Customer cust) throws IOException {  

        System.out.println("start insert data ......");

        Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.property.clientPort", "4180");  

conf.set("hbase.zookeeper.quorum", "192.168.68.84");  

conf.set(TableInputFormat.INPUT_TABLE, tableName);

        HTable table = new HTable(conf,tableName);  

        Put put = new Put("Customer".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  

        put.add("column1".getBytes(), null, cust.getId().getBytes());// 本行数据的第一列  

        put.add("column2".getBytes(), null, cust.getCustomcode().getBytes());// 本行数据的第二列  

        put.add("column3".getBytes(), null, cust.getCode().getBytes());// 本行数据的第三列

        put.add("column4".getBytes(), null, Double.toString(cust.getPrice()).getBytes());// 本行数据的第四列

        try {  

            table.put(put);  

        } catch (IOException e) {  

            e.printStackTrace();  

        }  

        System.out.println("end insert data ......");  

    } 

 

}

 

分享到:
评论
1 楼 jjjssh 2017-12-04  
SparkManager 代码都没全,你从来拷贝来的

相关推荐

    node-v6.3.1-linux-ppc64.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    计算机专业词汇+英语+计算机能不学英语吗?

    计算机专业英语词汇非常丰富,涉及计算机硬件、软件、网络、程序设计语言等多个方面。由于篇幅限制,我无法直接列出完整的1000个计算机专业英语词汇,但我可以为您提供一些常见的计算机专业英语词汇作为示例: file - 文件 command - 命令,指令 use - 使用,用途 program - 程序 line - (数据,程序)行,线路 if - 如果(连词) display - 显示,显示器 set - 设置(动词),集合(名词) key - 键,关键字,关键码 list - 列表,显示(名词),打印(动词) by - 凭,靠,沿(介词) press - 按,压(动词) with - 用,与,随着(介词) format - 格式 change - 更换,改变,变动(动词)

    35-35.渗透测试SQL注入之SQL注入防御

    35-35.渗透测试SQL注入之SQL注入防御

    node-v6.3.1-linux-armv7l.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    奇怪的数列是指具有非传统或非常规特征的数列

    奇怪的数列 奇怪的数列是指具有非传统或非常规特征的数列

    65-65.渗透测试-反序列化漏洞的出现.mp4

    65-65.渗透测试-反序列化漏洞的出现.mp4

    华为OD机试D卷 - 在字符串中找出连续最长的数字串(含“+-”号) - 免费看解析和代码.html

    私信博主免费获取真题解析以及代码

    企业数字化转型大数据湖一体化平台项目建设方案qy.pptx

    企业数字化转型大数据湖一体化平台项目建设方案qy.pptx

    泰尔指数模型(数据+stata代码).txt

    详细介绍:https://blog.csdn.net/li514006030/article/details/138682636

    《软件性能测试、分析与调优实践之路-第二版》ppt 课件总结

    《软件性能测试、分析与调优实践之路-第二版》ppt 课件总结

    基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip

    基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip 基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip基于 SpringCloud 和 Vue3 的OA系统源码+数据库.zip

    node-v6.8.1-linux-armv6l.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    node-v4.8.0-sunos-x86.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    专业Visio模板:卷积神经网络(CNN)结构图设计资源包下载

    Visio是一款功能强大的图表和矢量图形应用程序,它被广泛用于创建各种类型的图表,包括复杂的卷积神经网络(CNN)结构图。使用Visio绘制的CNN结构图模板,可以帮助研究人员、学生和专业人士更高效地设计和展示他们的神经网络模型。 该模板通常包含了一系列预定义的形状和符号,如卷积层、池化层、全连接层、激活函数等,这些元素可以直接拖拽到画布上使用。用户可以通过调整这些元素的大小、颜色和连接方式来定制自己的网络结构图。此外,模板可能还提供了一些辅助功能,比如自动布局、数据流方向指示和层次结构的清晰展示。 通过使用Visio的卷积神经网络结构图模板,用户可以节省大量手动绘制的时间,并确保图表的专业性和一致性。这不仅适用于学术报告和论文,也适用于项目演示和技术文档。然而,请注意,我不能提供实际的下载链接,但用户可以根据描述在网络上搜索并找到相应的Visio模板资源。

    node-v6.12.2-linux-armv7l.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    node-v6.12.0-linux-armv6l.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    基于单片机AT89S52的智能岩体声发射监测仪的设计.docx

    基于单片机AT89S52的智能岩体声发射监测仪的设计.docx

    node-v0.10.48-x64.msi

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    40-40.XSS跨站脚本攻击植入方式.mp4

    40-40.XSS跨站脚本攻击植入方式.mp4

    node-v5.10.0-linux-x86.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

Global site tag (gtag.js) - Google Analytics