🎯 專案概述
SpringDataPlatform 是一個功能完整的企業級大數據平台,專為 Apache Flink 任務管理而設計。這個全端專案整合了現代化的 Web 技術棧,提供直觀的使用者介面來管理和監控分散式數據處理工作流程。
🏗️ 系統架構
graph TB
A[Vue.js 前端] --> B[Nginx 反向代理]
B --> C[Spring Boot 後端]
C --> D[Apache Flink 叢集]
C --> E[Apache Zeppelin]
D --> F[任務執行引擎]
E --> G[互動式筆記本]
subgraph "核心功能"
H[JAR 任務提交]
I[SQL 任務提交]
J[任務狀態監控]
K[叢集狀態監控]
end
C --> H
C --> I
C --> J
C --> K
🛠️ 技術架構
前端技術棧
- 框架:Vue.js 2.x
- 路由:Vue Router
- HTTP 客戶端:Axios
- UI 增強:SweetAlert2
- 語法高亮:Highlight.js
- 建構工具:Vue CLI
後端技術棧
- 主框架:Spring Boot
- 數據處理:Apache Flink
- 互動式分析:Apache Zeppelin
- 容器化:Docker + Docker Compose
- 反向代理:Nginx
🚀 核心功能特色
1️⃣ 多種任務提交方式
JAR 檔案提交
1# 透過 REST API 上傳和執行 JAR 檔案
2POST /api/flink/jobs/submit-jar
3Content-Type: multipart/form-data
4
5{
6 "jarFile": "your-flink-job.jar",
7 "mainClass": "com.example.FlinkJob",
8 "programArgs": "--input /data/input --output /data/output"
9}
SQL 任務提交
1-- 透過平台直接提交 Flink SQL
2CREATE TABLE source_table (
3 id INT,
4 name STRING,
5 timestamp_col TIMESTAMP(3)
6) WITH (
7 'connector' = 'kafka',
8 'topic' = 'input-topic',
9 'properties.bootstrap.servers' = 'kafka:9092'
10);
11
12INSERT INTO sink_table
13SELECT id, UPPER(name), timestamp_col
14FROM source_table
15WHERE id > 1000;
2️⃣ 即時監控儀表板
1// Vue.js 監控組件核心邏輯
2export default {
3 data() {
4 return {
5 jobs: [],
6 clusterStatus: {},
7 monitoring: true
8 }
9 },
10 methods: {
11 async fetchJobStatus() {
12 try {
13 const response = await this.$axios.get('/api/flink/jobs');
14 this.jobs = response.data;
15 this.updateJobMetrics();
16 } catch (error) {
17 this.$swal('錯誤', '無法獲取任務狀態', 'error');
18 }
19 },
20
21 updateJobMetrics() {
22 this.jobs.forEach(job => {
23 // 即時更新任務指標
24 this.fetchJobMetrics(job.id);
25 });
26 },
27
28 startMonitoring() {
29 this.monitoringInterval = setInterval(() => {
30 this.fetchJobStatus();
31 }, 5000); // 每5秒更新一次
32 }
33 }
34}
3️⃣ 互動式數據探索
整合 Apache Zeppelin 提供 Jupyter-like 的互動式數據分析環境:
1// Zeppelin 筆記本中的 Flink 程式碼範例
2%flink.ssql
3
4// 建立實時數據流
5val env = StreamExecutionEnvironment.getExecutionEnvironment
6val dataStream = env
7 .socketTextStream("localhost", 9999)
8 .flatMap(_.toLowerCase.split("\\W+"))
9 .filter(_.nonEmpty)
10 .map((_, 1))
11 .keyBy(0)
12 .timeWindow(Time.seconds(10))
13 .sum(1)
14
15dataStream.print()
16env.execute("Real-time Word Count")
🔧 系統部署與配置
Docker 容器化部署
1# docker-compose.yml 核心配置
2version: '3.8'
3
4services:
5 # 前端 Vue.js 應用
6 frontend:
7 build:
8 context: ./frontend/data-platform-ui
9 ports:
10 - "8080:8080"
11 depends_on:
12 - backend
13 networks:
14 - data-platform
15
16 # 後端 Spring Boot API
17 backend:
18 build:
19 context: ./backend/DataPlatform
20 ports:
21 - "9999:9999"
22 environment:
23 - FLINK_JOBMANAGER_URL=http://flink-jobmanager:8081
24 - ZEPPELIN_URL=http://zeppelin:8082
25 depends_on:
26 - flink-jobmanager
27 - zeppelin
28 networks:
29 - data-platform
30
31 # Apache Flink JobManager
32 flink-jobmanager:
33 image: flink:1.15.2-scala_2.12
34 ports:
35 - "8081:8081"
36 command: jobmanager
37 environment:
38 - |
39 FLINK_PROPERTIES=
40 jobmanager.rpc.address: flink-jobmanager
41 parallelism.default: 2
42 networks:
43 - data-platform
44
45 # Apache Flink TaskManager
46 flink-taskmanager:
47 image: flink:1.15.2-scala_2.12
48 depends_on:
49 - flink-jobmanager
50 command: taskmanager
51 scale: 2
52 environment:
53 - |
54 FLINK_PROPERTIES=
55 jobmanager.rpc.address: flink-jobmanager
56 taskmanager.numberOfTaskSlots: 2
57 parallelism.default: 2
58 networks:
59 - data-platform
60
61 # Apache Zeppelin
62 zeppelin:
63 image: apache/zeppelin:0.10.1
64 ports:
65 - "8082:8080"
66 volumes:
67 - ./zeppelin/notebook:/opt/zeppelin/notebook
68 - ./zeppelin/conf:/opt/zeppelin/conf
69 networks:
70 - data-platform
71
72networks:
73 data-platform:
74 driver: bridge
快速啟動指令
1# 標準部署
2docker-compose up -d
3
4# 重新建構並啟動
5docker-compose up --build
6
7# MacBook M1 專用版本
8docker-compose -f docker-compose-m1.yml up -d
9
10# 查看服務狀態
11docker-compose ps
12
13# 查看日誌
14docker-compose logs -f backend
📊 REST API 設計
任務管理 API
1@RestController
2@RequestMapping("/api/flink")
3public class FlinkJobController {
4
5 @PostMapping("/jobs/submit-jar")
6 public ResponseEntity<JobSubmitResult> submitJarJob(
7 @RequestParam("file") MultipartFile jarFile,
8 @RequestParam("mainClass") String mainClass,
9 @RequestParam(value = "args", required = false) String programArgs) {
10
11 try {
12 // 儲存上傳的 JAR 檔案
13 String jarPath = saveUploadedFile(jarFile);
14
15 // 提交任務到 Flink 叢集
16 JobSubmitResult result = flinkService.submitJarJob(
17 jarPath, mainClass, programArgs
18 );
19
20 return ResponseEntity.ok(result);
21 } catch (Exception e) {
22 return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
23 .body(new JobSubmitResult(false, e.getMessage()));
24 }
25 }
26
27 @PostMapping("/jobs/submit-sql")
28 public ResponseEntity<JobSubmitResult> submitSqlJob(
29 @RequestBody SqlJobRequest request) {
30
31 try {
32 JobSubmitResult result = flinkService.submitSqlJob(request.getSql());
33 return ResponseEntity.ok(result);
34 } catch (Exception e) {
35 return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
36 .body(new JobSubmitResult(false, e.getMessage()));
37 }
38 }
39
40 @GetMapping("/jobs")
41 public ResponseEntity<List<JobInfo>> getAllJobs() {
42 List<JobInfo> jobs = flinkService.getAllJobs();
43 return ResponseEntity.ok(jobs);
44 }
45
46 @GetMapping("/jobs/{jobId}/status")
47 public ResponseEntity<JobStatus> getJobStatus(@PathVariable String jobId) {
48 JobStatus status = flinkService.getJobStatus(jobId);
49 return ResponseEntity.ok(status);
50 }
51
52 @DeleteMapping("/jobs/{jobId}")
53 public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
54 flinkService.cancelJob(jobId);
55 return ResponseEntity.ok().build();
56 }
57}
叢集監控 API
1@RestController
2@RequestMapping("/api/cluster")
3public class ClusterController {
4
5 @GetMapping("/status")
6 public ResponseEntity<ClusterStatus> getClusterStatus() {
7 ClusterStatus status = clusterService.getClusterStatus();
8 return ResponseEntity.ok(status);
9 }
10
11 @GetMapping("/metrics")
12 public ResponseEntity<ClusterMetrics> getClusterMetrics() {
13 ClusterMetrics metrics = clusterService.getClusterMetrics();
14 return ResponseEntity.ok(metrics);
15 }
16
17 @GetMapping("/taskmanagers")
18 public ResponseEntity<List<TaskManager>> getTaskManagers() {
19 List<TaskManager> taskManagers = clusterService.getTaskManagers();
20 return ResponseEntity.ok(taskManagers);
21 }
22}
🎨 前端介面設計
Vue.js 組件架構
1// JobManagement.vue - 任務管理主組件
2<template>
3 <div class="job-management">
4 <div class="toolbar">
5 <el-button type="primary" @click="showSubmitDialog">
6 <i class="el-icon-plus"></i>
7 提交新任務
8 </el-button>
9 <el-button @click="refreshJobs">
10 <i class="el-icon-refresh"></i>
11 重新整理
12 </el-button>
13 </div>
14
15 <el-table :data="jobs" v-loading="loading">
16 <el-table-column prop="id" label="任務 ID" width="200"/>
17 <el-table-column prop="name" label="任務名稱"/>
18 <el-table-column prop="status" label="狀態">
19 <template slot-scope="scope">
20 <el-tag :type="getStatusType(scope.row.status)">
21 {{ scope.row.status }}
22 </el-tag>
23 </template>
24 </el-table-column>
25 <el-table-column prop="startTime" label="開始時間"/>
26 <el-table-column label="操作" width="200">
27 <template slot-scope="scope">
28 <el-button size="mini" @click="viewJob(scope.row)">
29 查看
30 </el-button>
31 <el-button
32 size="mini"
33 type="danger"
34 @click="cancelJob(scope.row.id)">
35 取消
36 </el-button>
37 </template>
38 </el-table-column>
39 </el-table>
40 </div>
41</template>
42
43<script>
44export default {
45 name: 'JobManagement',
46 data() {
47 return {
48 jobs: [],
49 loading: false
50 }
51 },
52
53 mounted() {
54 this.fetchJobs();
55 this.startPolling();
56 },
57
58 methods: {
59 async fetchJobs() {
60 this.loading = true;
61 try {
62 const response = await this.$http.get('/api/flink/jobs');
63 this.jobs = response.data;
64 } catch (error) {
65 this.$message.error('獲取任務清單失敗');
66 } finally {
67 this.loading = false;
68 }
69 },
70
71 startPolling() {
72 setInterval(() => {
73 this.fetchJobs();
74 }, 5000);
75 },
76
77 getStatusType(status) {
78 const statusMap = {
79 'RUNNING': 'success',
80 'FINISHED': 'info',
81 'CANCELED': 'warning',
82 'FAILED': 'danger'
83 };
84 return statusMap[status] || 'info';
85 }
86 }
87}
88</script>
🔍 核心業務邏輯
Flink 任務服務層
1@Service
2@Slf4j
3public class FlinkJobService {
4
5 private final RestTemplate restTemplate;
6 private final FileStorageService fileStorageService;
7
8 @Value("${flink.jobmanager.url}")
9 private String flinkJobManagerUrl;
10
11 public JobSubmitResult submitJarJob(String jarPath, String mainClass, String args) {
12 try {
13 // 準備任務提交請求
14 MultiValueMap<String, Object> requestBody = new LinkedMultiValueMap<>();
15 requestBody.add("jarfile", new FileSystemResource(jarPath));
16
17 // 建構程式參數
18 Map<String, Object> programArgs = new HashMap<>();
19 programArgs.put("entry-class", mainClass);
20 if (StringUtils.hasText(args)) {
21 programArgs.put("program-args", args);
22 }
23 requestBody.add("programArgs", objectMapper.writeValueAsString(programArgs));
24
25 // 提交任務到 Flink
26 HttpHeaders headers = new HttpHeaders();
27 headers.setContentType(MediaType.MULTIPART_FORM_DATA);
28
29 HttpEntity<MultiValueMap<String, Object>> requestEntity =
30 new HttpEntity<>(requestBody, headers);
31
32 ResponseEntity<FlinkJobSubmitResponse> response = restTemplate.exchange(
33 flinkJobManagerUrl + "/jars/upload",
34 HttpMethod.POST,
35 requestEntity,
36 FlinkJobSubmitResponse.class
37 );
38
39 if (response.getStatusCode() == HttpStatus.OK) {
40 String jobId = response.getBody().getJobId();
41 log.info("Successfully submitted job: {}", jobId);
42 return new JobSubmitResult(true, jobId, "任務提交成功");
43 } else {
44 return new JobSubmitResult(false, null, "任務提交失敗");
45 }
46
47 } catch (Exception e) {
48 log.error("Error submitting jar job", e);
49 return new JobSubmitResult(false, null, "任務提交異常: " + e.getMessage());
50 }
51 }
52
53 public JobSubmitResult submitSqlJob(String sql) {
54 try {
55 // 準備 SQL 任務請求
56 Map<String, Object> sqlRequest = new HashMap<>();
57 sqlRequest.put("statement", sql);
58 sqlRequest.put("execution_type", "sync");
59
60 HttpHeaders headers = new HttpHeaders();
61 headers.setContentType(MediaType.APPLICATION_JSON);
62
63 HttpEntity<Map<String, Object>> requestEntity =
64 new HttpEntity<>(sqlRequest, headers);
65
66 ResponseEntity<FlinkSqlResponse> response = restTemplate.exchange(
67 flinkJobManagerUrl + "/sql/execute",
68 HttpMethod.POST,
69 requestEntity,
70 FlinkSqlResponse.class
71 );
72
73 if (response.getStatusCode() == HttpStatus.OK) {
74 String sessionId = response.getBody().getSessionId();
75 log.info("Successfully submitted SQL job: {}", sessionId);
76 return new JobSubmitResult(true, sessionId, "SQL 任務提交成功");
77 } else {
78 return new JobSubmitResult(false, null, "SQL 任務提交失敗");
79 }
80
81 } catch (Exception e) {
82 log.error("Error submitting SQL job", e);
83 return new JobSubmitResult(false, null, "SQL 任務提交異常: " + e.getMessage());
84 }
85 }
86}
🚀 生產環境部署
Nginx 配置
1# nginx.conf
2upstream backend {
3 server backend:9999;
4}
5
6upstream frontend {
7 server frontend:8080;
8}
9
10server {
11 listen 80;
12 server_name localhost;
13
14 # 前端靜態資源
15 location / {
16 proxy_pass http://frontend;
17 proxy_set_header Host $host;
18 proxy_set_header X-Real-IP $remote_addr;
19 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
20 }
21
22 # 後端 API
23 location /api/ {
24 proxy_pass http://backend;
25 proxy_set_header Host $host;
26 proxy_set_header X-Real-IP $remote_addr;
27 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
28
29 # WebSocket 支援
30 proxy_http_version 1.1;
31 proxy_set_header Upgrade $http_upgrade;
32 proxy_set_header Connection "upgrade";
33 }
34
35 # Flink Web UI 代理
36 location /flink/ {
37 proxy_pass http://flink-jobmanager:8081/;
38 proxy_set_header Host $host;
39 proxy_set_header X-Real-IP $remote_addr;
40 }
41
42 # Zeppelin 代理
43 location /zeppelin/ {
44 proxy_pass http://zeppelin:8080/;
45 proxy_set_header Host $host;
46 proxy_set_header X-Real-IP $remote_addr;
47 }
48}
環境變數配置
1# .env 檔案
2# Flink 組態
3FLINK_JOBMANAGER_URL=http://flink-jobmanager:8081
4FLINK_TASKMANAGER_SLOTS=2
5FLINK_PARALLELISM_DEFAULT=2
6
7# Zeppelin 組態
8ZEPPELIN_NOTEBOOK_DIR=/opt/zeppelin/notebook
9ZEPPELIN_INTERPRETER_DIR=/opt/zeppelin/interpreter
10
11# Spring Boot 組態
12SPRING_PROFILES_ACTIVE=production
13SERVER_PORT=9999
14LOGGING_LEVEL_ROOT=INFO
15
16# Vue.js 組態
17VUE_APP_API_BASE_URL=http://localhost/api
18VUE_APP_FLINK_WEB_URL=http://localhost/flink
19VUE_APP_ZEPPELIN_URL=http://localhost/zeppelin
🔗 相關技術連結
- 專案 GitHub:SpringDataPlatform
- Apache Flink 官方文件:https://flink.apache.org/
- Apache Zeppelin 官方文件:https://zeppelin.apache.org/
- Vue.js 官方文件:https://vuejs.org/
- Spring Boot 官方文件:https://spring.io/projects/spring-boot
🎯 總結
SpringDataPlatform 是一個功能豐富的企業級大數據平台,成功整合了現代 Web 技術與分散式計算框架。透過直觀的使用者介面,數據工程師和分析師可以輕鬆管理 Flink 任務、監控執行狀態,並進行即時數據分析。
專案亮點:
✅ 多元任務提交:支援 JAR 檔案和 SQL 兩種提交方式
✅ 即時監控:完整的任務和叢集狀態監控
✅ 互動分析:整合 Zeppelin 提供 Jupyter-like 體驗
✅ 容器化部署:Docker Compose 一鍵部署
✅ 企業級架構:模組化設計,易於擴展和維護
這個專案展示了如何運用現代技術棧構建可擴展的大數據處理平台,為企業數位轉型提供強而有力的技術支援。 🚀✨