Spark Basic RDD 操作示例

2019-10-18 15:02 来源:未知
import jdk.nashorn.internal.ir.annotations.Ignore;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @ClassName: KuduUtil
 * @Description:用于操作kudu的示例代码
 * @author jason.li
 * @date 2018年1月11日 下午3:45:06
 */
@Ignore
public class KuduUtil {
    private static final String KUDU_MASTER = "10.1.0.20:7051";
    private static String tableName = "TestKudu";

    @Test
    public void kuduCreateTableTest(){
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            List<ColumnSchema> columns = new ArrayList(2);
            columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING)
                    .key(true)
                    .build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING)
                    .build());
            List<String> rangeKeys = new ArrayList<>();
            rangeKeys.add("key");
            Schema schema = new Schema(columns);
            client.createTable(tableName, schema,
                    new CreateTableOptions().setRangePartitionColumns(rangeKeys));
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Test
    public void kuduSaveTest(){
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try{
            KuduTable table = client.openTable(tableName);
            KuduSession session = client.newSession();
            System.out.println("-------start--------"+System.currentTimeMillis());
            for (int i = 30000; i < 31000; i++) {
                Insert insert = table.newInsert();
                PartialRow row = insert.getRow();
                row.addString(0, i+"");
                row.addString(1, "aaa");
                OperationResponse operationResponse =  session.apply(insert);
            }
            System.out.println("-------end--------"+System.currentTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Test
    public void kuduUpdateTest(){

        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
        KuduTable table = client.openTable(tableName);
            KuduSession session = client.newSession();
                Update update = table.newUpdate();
                PartialRow row = update.getRow();
                row.addString("key", 4+"");
                row.addString("value", "value " + 10);
            OperationResponse operationResponse =  session.apply(update);

           System.out.print(operationResponse.getRowError());

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    @Test
    public void kuduSearchTest(){
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();

        try {
            KuduTable table = client.openTable(tableName);
        List<String> projectColumns = new ArrayList<>(1);
        projectColumns.add("value");
        KuduScanner scanner = client.newScannerBuilder(table)
                .setProjectedColumnNames(projectColumns)
                .build();
        while (scanner.hasMoreRows()) {
            RowResultIterator results = scanner.nextRows();
            while (results.hasNext()) {
                RowResult result = results.next();
                System.out.println(result.getString(0));
            }
        }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Test
    public void kuduDelTabletest(){
        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
        try {
            client.deleteTable(tableName);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Test
    public void searchBysparkSql(){
        SparkSession sparkSession = getSparkSession();
        List<StructField> fields = Arrays.asList(
                DataTypes.createStructField("key", DataTypes.StringType, true),
                DataTypes.createStructField("value", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(fields);
        Dataset ds =  sparkSession.read().format("org.apache.kudu.spark.kudu").
                schema(schema).option("kudu.master","10.1.0.20:7051").option("kudu.table","TestKudu").load();
        ds.registerTempTable("abc");
        sparkSession.sql("select * from abc").show();
    }

    @Test
    public void checkTableExistByKuduContext(){
        SparkSession sparkSession = getSparkSession();
        KuduContext context = new KuduContext("10.1.0.20:7051",sparkSession.sparkContext());
        System.out.println(tableName +" is exist = "context.tableExists(tableName));
    }

    public SparkSession getSparkSession(){
        SparkConf conf = new SparkConf().setAppName("test")
                .setMaster("local[*]")
                .set("spark.driver.userClassPathFirst", "true");

        conf.set("spark.sql.crossJoin.enabled", "true");
        SparkContext sparkContext = new SparkContext(conf);
        SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate();
        return sparkSession;
    }
}

两个 RDD 的 transformation

若是有三个 CR-VDD, 分别包蕴了 {1, 2, 3} 和 {3, 4, 5}:

函数 目的 示例 结果
union() 并集,生成一个包含了两个 RDD 元素的 RDD rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() 交集,生成 RDD 包含了在两个 RDD 中同时出现的元素 rdd.intersection(other) {3}
subtract() 移除一个 RDD 中的内容 rdd.subtract(other) {1, 2}
cartesian() 以另一个 RDD 的 笛卡尔积 rdd.cartesian(other) {(1, 3), (1, 4), (1, 5), (2, 3), (2, 4)}, ..., (3, 5)
package org.apache.kudu.spark.kudu
@org.apache.yetus.audience.InterfaceStability.Unstable
class DefaultSource() extends scala.AnyRef with org.apache.spark.sql.sources.RelationProvider with org.apache.spark.sql.sources.CreatableRelationProvider with org.apache.spark.sql.sources.SchemaRelationProvider {
  val TABLE_KEY : java.lang.String = { /* compiled code */ }
  val KUDU_MASTER : java.lang.String = { /* compiled code */ }
  val OPERATION : java.lang.String = { /* compiled code */ }
  val FAULT_TOLERANT_SCANNER : java.lang.String = { /* compiled code */ }
  val SCAN_LOCALITY : java.lang.String = { /* compiled code */ }
  def defaultMasterAddrs : scala.Predef.String = { /* compiled code */ }
  override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String]) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
  override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, mode : org.apache.spark.sql.SaveMode, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], data : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
  override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], schema : org.apache.spark.sql.types.StructType) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
}

基本 RDD 的 transformation

假设有一个 劲客DD ,在这之中的要素有 {1, 2, 3, 3}:

函数 目的 示例 结果
map() 将函数应用到 RDD 中的每一个元素并以 RDD 的形式返回结果 rdd.map(x => x+1) {2, 3, 4, 4}
flatMap() 将函数应用到 RDD 中的每一个元素,并以 RDD 的形式返回 iterator 的内容。通常用于提取词语。 rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() 返回一个 RDD, 该 RDD 中仅包含了能够通过 filter() 函数的元素 rdd.filter(x => x != 1) {2, 3, 3}
distinct() 去除重复项 rdd.distinct() {1, 2, 3}

上文提到,使用kudu等列式存款和储蓄将数据以update格局写入kudu.
上边说一下java操作kudu的相关demo。java操作kudu在git上有相关demo,而spark操作kudu并不曾。cloudera官方网站的操作中只关乎了scala版本。本文列举java操作kudu的全示例,仅供入门参照他事他说加以考察。(难过的是sparksql查询kudu的java达成,官方没有亲自去做,google也不佳用)

Transformation

  <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.5.0-cdh5.13.1</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client-tools -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>1.5.0-cdh5.13.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.6.0</version>
        </dependency>

Action

举例有二个 奥迪Q7DD ,在那之中的成分有 {1, 2, 3, 3}:

函数 目的 示例 结果
collect() 返回 RDD 中的所有元素 rdd.collect() {1, 2, 3, 3}
count() RDD 中的元素数目 rdd.count() 4
countByValue() RDD 中每个元素出现的次数 rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) 返回 RDD 中的 num 个元素 rdd.take(2) {1, 2}
top(num) 返回 RDD 中的前 num 个元素 rdd.top(2) {3, 3}
takeOrdered(num)(ordering) 基于 ordering 返回 num 个元素 rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(withReplacement, num, [seed]) 随机返回 num 个元素 rdd.takeSample(false, 1) 不确定
reduce(func) 并行地组合 RDD 中的元素(比如,sum) rdd.reduce((x, y) => x + y) 9
fold(zero)(func) reduce() 一样只是需要提供一个 0 rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqop, combop) reduce() 相似,不过用于返回不同类型 rdd.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) (9, 4)
foreach(func) 将 func 应用到 RDD 中的每一个元素 rdd.foreach(func)

上述内容参见 <<Learning Spark>>, 其代码示例可在 GitHub 上找到 learning-spark.

正文用的是cloudera版本,增加:

2)效率列表:
使用kuduClient创建表;
运用kuduClient增多数据;
动用kuduClient更新数据;
使用kuduClient查询数据;
使用kuduClient删除表;
利用sparksql查询数据;
运用spark---kuduContext剖断表存在

1)pom依赖

3)代码示例:

 <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

ps:sparksql查询数据在cloudera官方网址独有scala版本。google也难找到java版的有血有肉写法。查看源码,实际上通过format来内定包路线,拟订的路径下饱含完成了sparksql的DefaultSource就可以。如spark.kudu包中设有DefaultSource类便足以被sparksql识别。
触类旁通,别的的库也得以由此此格局访谈。同期要推而广之集成二个可以供sparksql查询的库也能够经过此办法落实。

TAG标签:
版权声明:本文由32450新蒲京网站发布于葡萄游戏厅_棋牌游戏,转载请注明出处:Spark Basic RDD 操作示例