今天看啥  ›  专栏  ›  marchpure_312

Apache CarbonData 2.0 RC2:用50行代码构建全场景数据湖

marchpure_312  · CSDN  ·  · 2019-01-01 00:00

​​ ​ ​ ​ ​

​​ ​ ​ ​ ​ 今夏最火综艺《青春有你2》里面刘雨昕从默默无闻一直冲到第二名,让众人感叹,只有在唱、跳、舞蹈、Rap等多方面能力俱佳,才是真偶像担当。在看了5月份初发布的CarbonData 2.0 RC2之后,笔者不禁感觉CarbonData在新版本中所体现的全方面能力,和刘雨昕一样具有"C位出道"的巨大潜力。CarbonData作为目前为数不多由中国公司贡献的Apache顶级项目,从16年正式"出道"以来,一直努力践行着实践和探索的开源精神。笔者在看到CarbonData的成长、 ASF中华人力量的崛起之后 ,也相信国内的开源力量可以继续"不忘初心、砥砺前行",借用青春有你2里面的一句歌词:“OpenSource Made in China,出发,多远都可以到达!”
​​ ​ ​ ​ ​ 从CarbonData社区发布的版本信息看,CarbonData 2.0提供了一种新的融合数据存储方案,以一份数据同时支持多种应用场景:明细查询、交互分析、数据实时同步和更新、ETL、AI、时序聚合、空间检索等,实现EB级别数据规模,查询性能秒级响应,并通过计算存储分离优化,大大降低了数据湖成本。本文笔者将着重介绍如何快速构建一个明细查询、交互分析、ETL的数据湖,后续将会大家带来CarbonData在数据实时同步和更新、AI、时序时空聚合等场景中的应用,敬请关注。

前言

​​ ​ ​ ​ ​ 随着数据量的增大,如果基于传统的数据存储解决方案,即使用不同的数据库产品来分别处理不同业务类型,势必会带来成本和运维的巨大压力。以互联网行业用户行为数据为例,随着5G+AI的到来,数据量正呈现快速增长趋势,目前对于一个日活1000万的APP应用来说,平均每天约产生500亿条用户行为数据,一年的数据存储量约10PB。PB级别数据可以满足不同类型业务的需求,如明细查询类业务:用户行为查询、订单查询、交易查询;聚合分析类业务:A/B Test、聚合分析;ELT类业务:热销榜、PV/UV等。下表中,我们分别给出了明细查询、聚合分析、ETL业务的典型查询、可选数据存储方案和方案痛点。

类型 典型查询 可选数据库 痛点
明细查询 select * from fact where userid IS ‘1323’ HBase、ES等 成本高、运维难
聚合分析 select * from fact GROUPBY abtestid ClickHouse、Oracle等 不支持横向扩展
ETL select * from facttable JOIN dimensiontable Spark on Parquet、Hive on ORC等 无索引,速度慢

​​ ​ ​ ​ ​ 具体来说,(1) 成本高:以HBase为例,单台RegionServer可维护不超过10TB的数据,面对10PB的数据存储时,需要100台计算节点部署RegionServer,每台计算节点500元/月(4U16G),计算成本共计50万/月,每PB存储的云硬盘成本为70万/月,总成本=120万/月;(2) 计算慢:Spark on Parquet可以将数据存储在对象存储中,成本大大降低,每PB存储的对象存储成本为8万/月,上层的100台计算节点假设每天开机8小时,计算成本15万/月,总成本约23万/月,成本可以降低5倍。但是由于无索引,只能通过暴力扫描的方式进行查询和计算,在暴力计算时系统往往受限于对象存储带宽,假设对象存储带宽为20GB/s,查询需要14个小时。

​​ ​ ​ ​ ​ 由上可见,Nosql数据库虽然具有较好的数据索引机制,但是“太贵”,传统的Hadoop生态数据仓库将数据放在对象存储上,但是“太慢”,这两者各自的局限性,使得我们进行EB级别数据仓库选型时,面临着这一个鱼与熊掌不可兼得的选择题。

可以像NoSQL数据库一样,构建高效索引,又可以和Spark/Hive一样,享受高度可扩展性的数据并行处理,并能利用近乎无限的低成本的对象存储资源,满足这种“又方便又快又便宜”的任性就是CarbonData的使命。

​​ ​ ​ ​ ​ 接下来的内容,我们通过演示介绍CarbonData 2.0 RC2中的数据湖构建方法。。

一、CarbonData数据湖架构

​​ ​ ​ ​ ​ 过去的几个月,Apache CarbonData 2.0一直在努力实现一种低成本全场景数据湖,力求做到"一份数据到处使用"的愿景。下图给出了一个基于CarbonData数据湖系统架构图,数据首先由Kafka完成数据收集,并由Flink完成数据清洗、预处理 。其次,Flink将数据直接写入对象存储。最后,CarbonData对对象存储的数据构建索引,并供上层多种计算引擎计算和查询,用户可使用Spark或者Presto对CarbonData数据进行明细查询和聚合分析类业务,使用Spark或者Hive对CarbonData数据进行聚合分析和ETL类业务。
在这里插入图片描述

​​ ​ ​ ​ ​ 从数据存储的角度看,数据主要包含三要素:元数据、数据、索引,这三要素同样也是构建数据湖的关键。下面,我们主要演示基于CarbonData的数据湖如何解决这几个问题:

  • 如何构建元数据?

  • 如何写入数据?

  • 如何建索引?

​​ ​ ​ ​ ​ 下面,我们首先演示如何一键式搭建Kafka+Flink+CarbonData的开发环境,其次介绍CarbonData数据湖中构建元数据、写入数据、建索引的方法。

二、一键式搭建Kafka+Flink+CarbonData演示环境

  • 准备1台Linux弹性云服务器
  • 下载一键式安装脚本
curl -k -O http://carbondata-publish.obs.myhuaweicloud.com/quick_start_kafka_flink_carbondata.sh
  • 1
  • 快速安装Kafka、Flink、Spark、CarbonData
source quick_start_kafka_flink_carbondata.sh
  • 1
  • 进入Kafka工作目录,新建Topic,并写入数据
创建Topic
kafka_2.12-2.5.0/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

写入数据
kafka_2.12-2.5.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>c001,23,china,2016-02-23 09:01:30,china sz,computer design
>c003,23,japan,2016-02-23 08:01:30,japan to,sport speech
>c002,23,india,2016-02-23 07:01:30,india mm,draw write
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

​​ ​ ​ ​ ​ 接下来,我们演示如果使用Flink消费Kafka中的数据,并将数据写入CarbonData数据湖。

三、如何构建元数据?

​​ ​ ​ ​ ​ Spark中创建CarbonData表可以快速构建元数据,元数据存储路径为${table location}/Metadata/schema。

  • 建表,并构建排序索引,语法举例如下:
CREATE TABLE person(id STRING, age INT, country STRING, timestamp timestamp, address STRING, skill STRING) 
STORED AS carbondata
TBLPROPERTIES('sort_scope'='GLOBAL_SORT','sort_columns'='id, age');
  • 1
  • 2
  • 3

​​ ​ ​ ​ ​ 这里,TBLPROPERTIES(‘sort_scope’=‘GLOBAL_SORT’,‘sort_columns’=‘id, age’)代表着数据将按照(id, age)有序存储,当在处理id详单查询或者id、age联合查询时,可以通过二分查找的方式进行数据的快速定位和筛选,相比无排序时,数据查询的时间复杂度从O(N)下降为O(logN)。

四、如何写入数据?

​​ ​ ​ ​ ​ 下面给出了基于Flink消费Kafka的数据,并将CarbonData数据落盘的代码示例。 下载代码

object FlinkToCarbonDemo {
  def main(args: Array[String]): Unit = {
    // 1. kafka地址和Topic名称
    val parameter = ParameterTool.fromArgs(args)
    val kafkaTopic = parameter.get("TOPIC")
    val kafkaBootstrapServices = parameter.get("KAFKASERVICES")
    val kafkaProperties = new Properties();
    kafkaProperties.put("bootstrap.servers", kafkaBootstrapServices)
    kafkaProperties.put("auto.commit.interval.ms", "3000")

    // 2. 设置表明database名称、表名、表存储位置、临时文件本地存储目录
    val tableName = parameter.get("TABLENAME")
    val tablePath = parameter.get("TABLEPATH")
    val tempPath = parameter.get("TEMPPATH")

    //  writerProperties代表写操作鉴权信息,当写数据到S3时,需要在writerProperties中配置桶名、AKSK
    val writerProperties = newWriterProperties(tempPath)
    //  carbonProperties代表CarbonDataSDK属性信息,支持配置sdk内存、数据自定义格式等
    val carbonProperties = newCarbonProperties()

    //  Flink定时将数据上传到CarbonData表空间目录是,时间周期阈值为Flink Checkpoint时间
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    environment.enableCheckpointing(180000)

    // 3. 配置source stream. 这里主要是基于自定义DeserializeSchema完成Kafka数据解析。
    //    Flink中集成的CarbonDataSDK默认输入数据为字符数组,因此需要将Record解析为Array[AnyRef]或者Array[String]
    val stream: DataStream[Array[AnyRef]] = environment.addSource(new FlinkKafkaConsumer011[Array[AnyRef]](
      kafkaTopic, new DeserializeSchema(), kafkaProperties
    ))

    // 4. 配置stream sink,这里首先构建carbondatasdk,可选类型为Local和S3.
    val factory = CarbonWriterFactory.builder("Local").build(
      "default", tableName, tablePath, new Properties, writerProperties, carbonProperties)
    val streamSink = StreamingFileSink.forBulkFormat(
      new Path(ProxyFileSystem.DEFAULT_URI),
      factory
    ).build()
    stream.addSink(streamSink)

    // Execute the environment
    environment.execute()
    streamSink.close()
  }

  private def newWriterProperties(dataTempPath: String) = {
    val properties = new Properties
    properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
    properties
  }

  private def newCarbonProperties() = {
    val properties = new Properties
    properties
  }
}

// 基于自定义DeserializeSchema完成Kafka数据解析示例。例如Kafka中Record为"c001,23,china,2016-02-23 09:01:30,china sz,computer design"时,解析形式为['c001','23','china']的字符数组
class DeserializeSchema extends KafkaDeserializationSchema[Array[AnyRef]] {
  override def isEndOfStream(t: Array[AnyRef]): Boolean = {
    false
  }

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): Array[AnyRef] = {
    new String(consumerRecord.value()).split(",").map(_.trim).asInstanceOf[Array[AnyRef]]
  }

  override def getProducedType: TypeInformation[Array[AnyRef]] = TypeInformation.of(new TypeHint[Array[AnyRef]] {})
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

​​ ​ ​ ​ ​ 当Flink成功消费Kafka数据之后,在{$tablePath}/stage_data/,可以看到成功落盘的CarbonData数据。

​​ ​ ​ ​ ​ 在Flink目录中中放置了示例JAR文件,可以直接启动作业尝试Flink入库CarbonData表数据,作业启动示例如下所示:

bin/flink run -d -p 1 -c org.apache.carbon.flink.FlinkToCarbonDemo examples/flinktocarbondemo-1.0.jar \
 --TOPIC test \
 --KAFKASERVICES localhost:9092 \
 --TABLENAME person \
 --TABLEPATH "/user/hive/warehouse/person" \
 --TEMPPATH "/tmp"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

五、如何构建索引?

  • Spark中执行"INSERT INTO STAGE"命令触发构建索引。
INSERT INTO person STAGE; 
  • 1
  • Spark中尝试查询数据:
SELECT * FROM person;
  • 1

​​ ​ ​ ​ ​ 就此,我们完成了Kafka到Flink到CarbonData的端到端数据湖的简单构建。

结语

​​ ​ ​ ​ ​ 本文主要介绍了如何快速构建Kafka到Flink到CarbonData的端到端数据湖DataPipe。后续,我们将继续介绍:(1)如何在数据湖中使用Spark、Hive、Presto访问同一份数据;(2)如何将Mysql等关系型数据库数据同步到CarbonData数据湖中;(3)如何在TensorFlow等AI计算引擎中使用CarbonData。敬请关注。

​​ ​ ​ ​ ​ 欢迎大家添加微信ID:xu601450868,加入CarbonData技术交流群。




原文地址:访问原文地址
快照地址: 访问文章快照