Spring Cloud Data Flow 入门
What
Microservice based Streaming and Batch data processing for Cloud Foundry and Kubernetes
简单解释一下Streaming和Batch处理的区别
Streaming: 应用始终处于启动状态,有数据就处理。通过消息传递中间件消耗或生成无限量的数据。Batch(Task): 需要处理数据时启动应用,处理完成后关闭应用,只处理有限量的数据。
思考一个问题。如何动态启动一个未启动的Spring项目?
答案是配置了一个maven jar包地址,或者是一个docker image地址,在任务触发时,下载对应的程序(镜像)并启动。

Concept
- Application: 指向某一个jar或docker的
Stream或Batch程序 - Task: 通过DSL组装多个
Application,定义处理任务 - Stream: 通过DSL组装多个
Application,按照source->processor->sink定义数据处理流 - Job: 每个
Task的执行记录,包括入参及结果记录
组件

- Data Flow Server: 定义、校验并执行
Stream和Batch,监听应用状态,记录执行记录。- 基于 DSL 定义
Stream和Batch - 通过 jar包 或者 docker镜像 注册应用
- 执行
Stream和Batch并记录执行记录,监听应用状态
- 基于 DSL 定义
- Skipper Server: 负责流处理发布
- 部署
Streams到一或多个平台 - 基于蓝绿更新策略升级或回滚
Stream - 持久化
Stream的描述信息,包括历史版本
- 部署
- Database: Data Flow Server和Skipper Server依赖关系数据库。
- 默认通过内嵌的
H2数据库启动,支持多种关系型数据库 - 在服务启动时自动创建表结构
- Task可以通过配置共享外部数据库作为持久化
- 默认通过内嵌的
- Messaging Middleware: 用于消息驱动模型的消息中间件。
- 支持
Kafka、RabbitMQ Stream必须通过消息中间件驱动
- 支持
- Monitor: 监控系统运行指标。
- 通过
Prometheus或InfluxDB存储 - 通过
Grafana展示
- 通过

使用
上面的概念和组件有点复杂?我们先快速启动一个应用,通过界面看一下如何使用
安装
可以通过一句docker命令启动
1 | docker run --rm -it -p 9393:9393 springcloud/spring-cloud-dataflow-server:2.6.3 |
通过 http://localhost:9393/dashboard/#/about 访问管理界面

注: Schedules未开启的原因是单机版的安装方式不支持定时任务。
创建 Application
提供了三种方式,但是要注意,不支持直接上传jar包或者发现运行中的应用,只能指定maven仓库地址(或者docker地址)进行拉取

选用第二种(import from an HTTP URI location),并选择官方Task Apps Maven示例,本质上是从官方拉取下列配置,在 第三种(import from properties) 中输入下列信息也是同样的效果。
1 | task.timestamp=maven://org.springframework.cloud.task.app:timestamp-task:2.1.1.RELEASE |
每个application需要指定应用地址和metadata信息地址。metadata会在使用过程中明确参数输入表单。如果未指定metadata地址,会尝试通过应用进行提取。
导入的应用可以在应用列表中查看,可以通过show details查看应用的metadata

metadata通过KV形式展示

注意: 这里有一个坑。如果你按照官方示例 https://github.com/spring-cloud/spring-cloud-task/tree/master/spring-cloud-task-samples/timestamp 新建了一个Spring Cloud Task, 会发现没办法查看metadata信息,原因是因为官方项目中缺少配置文件
1 | classpath*:/META-INF/dataflow-configuration-metadata.properties |
内容为configuration-properties.classes执行配置类
1 | configuration-properties.classes=wang.yuheng.SyncPrestoProperties |
Spring Cloud Data Flow 对metadata解析过程如下:


另一个坑是,如果是maven地址,会先从**localRepository(.m2)**获取jar文件解析metadata。而如果你设置的是docker,每次都会通过网络进行docker鉴权并下载,无法直接判断本地是否存在,所以速度会比maven慢很多。
Task

创建 Task
可以通过页面或者DSL定义Task

创建好的Task会存在于Tasks列表中,可以在列表页查看Task详情或者执行任务(生成job)

如果通过Cloud方式进行部署,可以指定定时任务,相比普通任务,可以通过cron expression设置定时


运行 Task
可以在Task列表中选择要执行的Task,并通过KV List的方式指定入参及properties。明确一下这2个概念
- properties: 配置参数,比如数据库配置
- Arguments: 方法入参,即
String[] args

每次执行会生成一个Job记录,可以在Task或者Job列表中查看执行状态、日志等信息

审计留痕
Audit Records页面会看到触发数据及执行记录等信息,包括不同方式(Restful、Shell、WebUI)触发的行为。开启登录验证后可以查看操作人信息。

Stream

使用Stream需要依赖skipper-server和消息中间件,通过docker-compose启动
1 | version: '3' |
创建 Stream
同样可以通过页面或者DSL按照source->processor->sink定义Stream。每个source->processor->sink是一个Stream,可以同时创建多个独立的Stream


发布 Stream
Stream 创建成功后,需要通过skipper-server发布至Runtime,基于消息队列进行驱动并执行数据处理
注意: 如果要使用流,需要找一个支持流的持久化,如 Redis
在列表页可以查看Stream详情,或者发布Stream。

发布时需要指定每个Application的properties,并且指定资源限制。如果是发布至k8s环境,会根据集群配置进行分配。

细节: 会按照label在kafka中创建对应的topic

发布后可以在Runtime查看对应的Stream状态及详情


通过docker-compose启动,所以会在skipper-server这台机器上运行相关Jar包作为消息Consumer

Shell
上述操作均可通过命令行 or Restful进行调用,并且配置都可以导出为对应的配置文件(DSL、Task、Stream等)

基于此就可配合当前使用的CICD完成devops。
想象一个场景,在你提交代码后,自动发布至某个环境等待运行。
1 | 1. push code |
观点
现实骨感,未来部分期待,如果你正在 All in Spring Cloud。
整体基于Spring Cloud,但是又拥抱docker。如果只是docker就可以去掉很多限制,就像是k8s和 Spring Cloud 组件本身就存在诸多重复
优点
- 基于Spring微服务,无切换成本,可独立开发、测试
- 完整的闭环,提供了从服务定制、管理、运行、监控全生命周期解决方案
- 拖拽式UI操作界面,配合DSL,配置简单,页面看起来很现代(你知道我在讽刺谁)
中立
- 未提供特定的计算引擎集群,类似 Flink、Spark 等
- 不能覆盖工作流场景
- 稳定性,目测现阶段上生产可以很快成为contributors
- 资源占用(看到有吐槽,但是未测试,不发表评论)
- 前端使用页面angularjs编写
缺点
- 仅基于Spring微服务,比如一行命令 or 一句sql 必须通过Spring Cloud Task(or Stream) 编写。通过Java编写job,你需要一个高版本的JDK
- 依赖maven repo (可能提供了http、ftp等其他方式,但是笔者没找到。。)
- 如果涉及到大数据处理,还是要依靠Hadoop中的模块。那么为什么混用,而不是直接使用全家桶呢?