金源娱乐

Kafka数。据迁移MaxCompute最佳实践

  • 金源娱乐
  • 金源娱乐
  • 金源娱乐
  • 金源娱乐app
  • 金源娱乐
  • 金源娱乐
  • 金源娱乐ע
  • 金源娱乐¼
  • 金源娱乐
  • 金源娱乐Ƹ
  • 金源娱乐淨
  • 金源娱乐
  • 金源娱乐ֱ
  • 金源娱乐ֻ
  • 金源娱乐԰
  • 金源娱乐׿
  • 金源娱乐Ƶ
  • 201906月17日

    Kafka数。据迁移MaxCompute最佳实践

    原标题:Kafka数。据迁移MaxCompute最佳实践

    搭建Kafka集群

    进走数。据迁移前,您必要保证本身的Kafka集群环境平常。本文操纵阿里云EMR服务自动化搭建Kafka集群,详细过程请参见:Kafka 迅速入门。

    本文操纵的EMR Kafka版本新闻如下:

    EMR版本: EMR-3.12.1

    集群类型: Kafka

    柔件新闻: Ganglia 3.7.2 ZooKeeper 3.4.12 Kafka 2.11-1.0.1 Kafka-Manager 1.3.3.16

    Kafka集群操纵特有网络,区域为华东1(杭州),主实例组ECS计算资源配置公网及内网IP,详细配置如下图所示。

    创建MaxCompute 项现在

    开通MaxCompute服务并创建益项现在,本文中在华东1(杭州)区域创建项现在bigdata_DOC,同。时启动DataWorks有关服务,如下图所示。细目请参见开通MaxCompute。

    Kafka是一款分布式发布与订阅的新闻中间件,具有高性能、高吞量的特点被普及操纵,每秒能处理上百万的新闻。Kafka适用于流式数。据处理,主要答用于用户走为跟踪、日志搜集等场景。

    一个典型的Kafka集群包含若干个创造者(Producer)、Broker、消,耗者(Consumer)以及一个Zookeeper集群。Kafka集群经过Zookeeper管理自身集群的配置并进走服务协同。。

    Topic是Kafka集群上最常用的新闻的荟萃,是一个新闻存储逻辑概念。物理磁盘不存储Topic,而是将Topic中详细的新闻按分区(Partition)存储在集群中各个节点的磁盘上。每个Topic能够有众个创造者向它发送新闻,也能够有众个消,耗者向它拉取(消,耗)新闻。

    每个新闻被增补到分区时,会分配一个offset(偏移量,从0开起编号),是新闻在一个分区中的唯一编号。

    操作步骤

    准备测试外与数。据

    Kafka集群创建测试数。据

    为保证您能够顺当登陆EMR集群Header主机及MaxCompute和DataWorks能够顺当和EMR集群Header主机通信,请您最先配置EMR集群Header主机坦然组,放通TCP 22及TCP 9092端口。

    登录EMR集群Header主机地址

    进入EMR Hadoop限制台集群管理 > 主机列外页面,确认EMR集群Header主机地址,并经过SSH连接长途登录。

    创建测试Topic

    操纵kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka --create命令创建测试所操纵的Topic testkafka。您能够操纵kafka-topics.sh --list --zookeeper emr-header-1:2181/kafka-1.0.1命令查望已创建的Topic。

    [root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka --create

    Created topic "testkafka".

    [root@emr-header-1 ~]# kafka-topics.sh --list --zookeeper emr-header-1:2181/kafka-1.0.1

    __consumer_offsets

    _emr-client-metrics

    _schemas

    connect-configs

    connect-offsets

    connect-status

    testkafka

    写入测试数。据

    您能够操纵kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka命令模拟创造者向Topic testkafka中写入数。据。原由Kafka用于处理流式数。据,您能够赓续不息的向其中写入数。据。 为保证测试效果,提出您写入10条以上的数。据。

    [root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka

    [root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning

    123

    abc

    创建MaxCompute外

    为保证MaxCompute能够顺当授与Kafka数。据,请您最先在MaxCompute上创建外。本例中为测试便利,操纵非分区外。

    登陆DataWorks创建外,细目请参见外管理。

    您能够单击DDL模式进走建外,建外语句举例如下。

    CREATE TABLE testkafka (

    ) ;

    其中的每一列,对答于DataWorks数。据集成Kafka Reader的默认列,您能够自立命名。细目请参见配置Kafka Reader:

    __key__外示新闻的key。

    __value__外示新闻的完善内容 。

    __partition__外示现在新闻所在分区。

    __headers__外示现在新闻headers新闻。

    __offset__外示现在新闻的偏移量。

    __timestamp__外示现在新闻的时间戳。

    新建自定义资源组

    原由现在DataWorks的默认资源组无法完善赞许Kafka插件,您必要操纵自定义资源组完善数。据同。步。自定义资源组细目请参见新添义务资源。

    在本文中,为撙节资源,直接操纵EMR集群Header主机行为自定义资源组。完善后,请期待服务器状态变为可用。

    新建并运走同。步义务

    在您的营业流程中右键单击数。据集成,选择新建数。据集成节点 > 数。据同。步。

    新建数。据同。步节点后,您必要选择数。据来源的数。据源为Kafka,数。据往向的数。据源为ODPS,并且操纵默认数。据源odps_first。选择数。据往向外为您新建的testkafka。完善上述配置后,请点击下图框中的按钮,转换为脚本模式。

    脚本配置如下,代码释义请参见配置Kafka Reader。

    {

    }

    您能够经过在Header主机上操纵kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list命令查望group.id参数。,及消,耗者的Group名称。

    [root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list

    Note: This will not show information about old Zookeeper-based consumers.

    _emr-client-metrics-handler-group

    console-consumer-69493

    console-consumer-83505

    console-consumer-21030

    console-consumer-45322

    console-consumer-14773

    以console-consumer-83505为例,您能够按照该参数。在Header主机上操纵kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505命令确认beginOffset及endOffset参数。。

    [root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505

    Note: This will not show information about old Zookeeper-based consumers.

    Consumer group 'console-consumer-83505' has no active members.

    TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

    testkafka 6 0 0 0 - - -

    test 6 3 3 0 - - -

    testkafka 0 0 0 0 - - -

    testkafka 1 1 1 0 - - -

    testkafka 5 0 0 0 - - -

    完善脚本配置后,请最先切换义务资源组为您刚创建的资源组,然后点击运走。

    完善运走后,您能够在运走日志中查望运走效果,如下为成功运走的日志。

    您能够经过新建一个数。据开发义务运走SQL语句,查望现在外中是否已存在从Kafka同。步过来的数。据。本例中操纵select * from testkafka;语句,完善后点击运走即可。

    实走效果如下,本例中为保证效果,在testkafka Topic中输入了众条数。据,您能够查验是否和您输入的数。据整齐。

    --------------------------------------------

    本文作者:付帅

    原文链接:https://yq.aliyun.com/articles/704008?utm_content=g_1000061170

    本文为云栖社区原创内容,未经批准不得转载。返回搜狐,查望更众

    义务编辑: