jonvi 发表于 2016-12-8 10:48:10

hadoop pig入门总结

 
在这里贴一个pig源码的分析,做pig很长时间没做笔记,不包含任何细节,以后有机会再说吧
  http://blackproof.iyunv.com/blog/1769219
 
hadoop pig入门总结


[*]pig简介
[*]pig数据类型
[*]pig latin语法
[*]pig udf自定义
[*]pig derived衍生
[*]推荐书籍 programming pig
[*]推荐网站 http://pig.apache.org/docs/r0.10.0/basic.html

pig简介
pig是hadoop上层的衍生架构,与hive类似。对比hive(hive类似sql,是一种声明式的语言),pig是一种过程语言,类似于存储过程一步一步得进行数据转化。
 
pig数据类型


[*]double > float > long > int > bytearray
[*]tuple|bag|map|chararray > bytearray

double float long int chararray bytearray都相当于pig的基本类型
tuple相当于数组 ,但是可以类型不一,举例('dirkzhang','dallas',41)
Bag相当于tuple的一个集合,举例{('dirk',41),('kedde',2),('terre',31)},在group的时候会生成bag
Map相当于哈希表,key为chararray,value为任意类型,例如['name'#dirk,'age'#36,'num'#41
nulls 表示的不只是数据不存在,他更表示数据是unkown
 
pig latin语法
 

1:load

LOAD 'data' ;
       例如:
      load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}'    USING com.xxxx.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);
 
Table = load ‘url’ as (id,name…..);    //table和load之间除了等号外 还必须有个空格 不然会出错,url一定要带引号,且只能是单引号。
 

2:filter

       alias = FILTER alias BY expression;

       Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and  和or连接各条件
       例如:
       filter = filter load20 by ( MONTH_ID == '1210' and  DAY_ID == '18' and  PROV_ID == '010' );
       
 

3:group

alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] ;
          pig的分组,不仅是数据上的分组,在数据的schema形式上也进行分组为groupcolumn:bag
         Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括号必须加
         可以使用ALL实现对所有字段的分组
 

4:foreach

alias = FOREACH alias GENERATE expression ….];
 
alias = FOREACH nested_alias {
alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
GENERATE expression ….]
};
 
一般跟generate一块使用
         Table = foreach Table generate (id,name);括号可加可不加。
avg = foreach Table generate group, AVG(age);  MAX ,MIN..
 
在进行数据过滤时,建议尽早使用foreach generate将多余的数据过滤掉,减少数据交换
 

5:join

Inner  join Syntax



alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) ;



Outer join Syntax



alias = JOIN left-alias BY left-alias-column , right-alias BY right-alias-column ;



 
join/left join / right join
daily = load 'A' as (id,name, sex);
divs  = load 'B' as (id,name, sex);
 
join
jnd   = join daily by (id, name), divs by (id, name);       
 
left join
jnd   = join daily by (id, name) left outer, divs by (id, name);
也可以同时多个变量,但只用于inner join
A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;
 

6: union

alias = UNION alias, alias [, alias …];
 
union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与 变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量 没有模式。
 
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;
 
C: {x: int,y: float}
 
A = load 'input1' as (x:double, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;
C: {x: double,y: double}
 
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;
Schema for C unknown.
 
注意:在pig 1.0中 执行不了最后一种union。
 
如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字
A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;
C: {w: chararray,x: int,y: double,z: chararray}
 
join和union之后alias的别名会变
 

7:Dump

     dump alias
用于在屏幕上显示数据。
 

8:Order by

alias = ORDER alias BY { * | field_alias [, field_alias …] } ;
         A = order Table by id desc;
 

9:distinct

         A = distinct alias;
 

10:limit

         A = limit alias 10;
 

11:sample

SAMPLE alias size;
 
随机抽取指定比例(0到1)的数据。
some = sample divs 0.1;
 

13:cross

alias = CROSS alias, alias [, alias …] ;
 
将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。
--cross.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;
 
 

15:split

Syntax
SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];
 
A = LOAD 'data' AS (f1:int,f2:int,f3:int);
DUMP A;
(1,2,3)
(4,5,6)
(7,8,9)
SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);
 
DUMP X;
(1,2,3)
(4,5,6)
 
DUMP Y;
(4,5,6)
 
DUMP Z;
(1,2,3)
(7,8,9)
 

16:store

         Store  … into … Using…
 
 
pig在别名维护上:
1、join
如e = join d by name,b by name;
    g = foreach e generate $0 as one:chararray, $1 as two:int, $2 as      three:chararray,$3 asfour:int;
    他生成的schemal:
 
        e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}
 
g: {one: chararray,two: int,three: chararray,four: int}
2、group
   B = GROUP A BY age;
 

----------------------------------------------------------------------
| B   | group: int | A: bag({name: chararray,age: int,gpa: float}) |
----------------------------------------------------------------------
|       | 18         | {(John, 18, 4.0), (Joe, 18, 3.8)}             |
|       | 20         | {(Bill, 20, 3.9)}                           |
----------------------------------------------------------------------
 (18,{(John,18,4.0F),(Joe,18,3.8F)})
 
 
pig udf自定义
pig支持嵌入user defined function,一个简单的udf 继承于evalFunc,通常用在filter,foreach中

Java代码 




[*]public class MyUDF extends EvalFunc<String> {  
[*]  
[*]    @Override  
[*]    public String exec(Tuple input) throws IOException {  
[*]        if(input == null || input.size() ==0)  
[*]            return null;  
[*]        try {  
[*]            String val = (String) input.get(0);  
[*]            return new StringBuffer(val).append(" pig").toString();  
[*]        } catch (Exception e) {  
[*]            throw new IOException(e.getMessage());  
[*]        }  
[*]    }  
[*]  
[*]}  



 
pig支持udf in loader and store
udf loader 需要继承于LoadFunc
udf storer 需要继承于StoreFunc
这类似于hadoop中写inputformat和outputformat
其中vertica就是写了一个DB版本的
 
这里贴一个简单的loader的例子:

Java代码 




[*]public class MyLoader extends LoadFunc{  
[*]  
[*]    protected RecordReader recordReader = null;  
[*]      
[*]    private PreparedStatement ps;  
[*]    private Connection conn;  
[*]    private final String jdbcURL;  
[*]    private final String user;  
[*]    private final String pwd;  
[*]    private final String querySql;  
[*]    private ResultSet rs;  
[*]      
[*]    public MyLoader(String driver,String jdbcURL,String user,String pwd,String querySql){  
[*]        try {  
[*]            Class.forName(driver);  
[*]        } catch (Exception e) {  
[*]            // TODO: handle exception  
[*]        }  
[*]        this.jdbcURL = jdbcURL;  
[*]        this.user = user;  
[*]        this.pwd = pwd;  
[*]        this.querySql = querySql;  
[*]    }  
[*]      
[*]    @Override  
[*]    public InputFormat getInputFormat() throws IOException {  
[*]        return new PigTextInputFormat();  
[*]    }  
[*]  
[*]    @Override  
[*]    public Tuple getNext() throws IOException {  
[*]        // TODO 重要的读取过程  
[*]        Text val = null;  
[*]        boolean next = false;  
[*]        try {  
[*]            next = rs.next();  
[*]        } catch (Exception e) {  
[*]            // TODO: handle exception  
[*]        }  
[*]        if(!next)  
[*]            return null;  
[*]        ResultSetMetaData rsmd;  
[*]        try {  
[*]//          rsmd = result  
[*]        } catch (Exception e) {  
[*]            // TODO: handle exception  
[*]        }  
[*]          
[*]        return null;  
[*]    }  
[*]  
[*]    @Override  
[*]    public void prepareToRead(RecordReader arg0, PigSplit arg1)  
[*]            throws IOException {  
[*]        this.recordReader = arg0;  
[*]    }  
[*]  
[*]    @Override  
[*]    public void setLocation(String arg0, Job arg1) throws IOException {  
[*]        //no idea  
[*]    }  
[*]      
[*]    public ResourceSchema getSchema(String location,Job job) throws IOException{  
[*]        Configuration conf = job.getConfiguration();  
[*]        Schema schema = new Schema();  
[*]        try {  
[*]            //TODO:reader from database table  
[*]//          Connection conn = DriverManager.getConnection(this.jdbcURL, this.user, this.pwd);  
[*]            FieldSchema fieldName = new FieldSchema("name", DataType.CHARARRAY);  
[*]            FieldSchema fieldPosition = new FieldSchema("position", DataType.INTEGER);  
[*]            schema.add(fieldName);  
[*]            schema.add(fieldPosition);  
[*]        } catch (Exception e) {  
[*]            //TODO log exception  
[*]        }  
[*]          
[*]        return null;  
[*]    }  
[*]      
[*]    public void prepareToRead(){  
[*]          
[*]    }  
[*]  
[*]}  



 其中getNext方法就是如何处理reader读取出的数据
        getSchema可以固定读取数据的schema
        setLocation可以处理输入的数据源
        prepareToRead是读取数据之前,可以在此做标识,等等
        
 
pig 衍生
1.penny:
1. Penny的描述
Penny是pig的贡献项目,是pig的调试和监控工具,而且支持根据API自定义penny的监视器和协作器,已实现不同的功能;
2. Penny的总架构
Penny将监视器插入到pig的工作操作中,主要用于监视pig数据流的变化,监视器可以调用协作器,完成各种功能。
3. Penny的总类图关系
ParsePigScript负责根据用户监视器生成新计划newPlan,在ToolsPigServer中根据以前的脚本执行新计划。在执行新计划时,当监视器监视对象数据发生变化,出发监视器,运行自定义的业务,也可以将数据流变化传回协作器里处理,总类图如下: 
4. Penny的使用
Penny的使用需要自定义两个类,一个类继承于监视器基类MonitorAgent,另一个继承于协作器基类Coordinator。然后根据上边类图,就可以使用PennyServer和ParsePigScript进行监控和调试
 5.在pig中就可以找到penny这个贡献的源码
 
Vertica:
   vertica是pig loader和storer的udf
   附件里是vertica,来自github,和vertica的介绍使用文档
   贴一篇将vertica的帖子 http://blackproof.iyunv.com/blog/1791995
 
推荐书籍
    programming pig
 
推荐网址
   http://pig.apache.org/docs/r0.10.0/basic.html 官网
 
  pig pen开发工具,这个我现在玩得还不熟,就不介绍了,有兴趣的可以去搜搜玩玩
 
我在工作中pig的使用,主要是数据的ETL,所以比较适合。在选择pig hive还是其他非hadoop架构,如redis,这还是一个需要继续尝试探索的问题。
页: [1]
查看完整版本: hadoop pig入门总结