🚀 SpringDataPlatform:企業級 Apache Flink 任務管理平台

🎯 專案概述

SpringDataPlatform 是一個功能完整的企業級大數據平台,專為 Apache Flink 任務管理而設計。這個全端專案整合了現代化的 Web 技術棧,提供直觀的使用者介面來管理和監控分散式數據處理工作流程。

🏗️ 系統架構

graph TB
    A[Vue.js 前端] --> B[Nginx 反向代理]
    B --> C[Spring Boot 後端]
    C --> D[Apache Flink 叢集]
    C --> E[Apache Zeppelin]
    D --> F[任務執行引擎]
    E --> G[互動式筆記本]
    
    subgraph "核心功能"
    H[JAR 任務提交]
    I[SQL 任務提交]
    J[任務狀態監控]
    K[叢集狀態監控]
    end
    
    C --> H
    C --> I
    C --> J
    C --> K

🛠️ 技術架構

前端技術棧

  • 框架:Vue.js 2.x
  • 路由:Vue Router
  • HTTP 客戶端:Axios
  • UI 增強:SweetAlert2
  • 語法高亮:Highlight.js
  • 建構工具:Vue CLI

後端技術棧

  • 主框架:Spring Boot
  • 數據處理:Apache Flink
  • 互動式分析:Apache Zeppelin
  • 容器化:Docker + Docker Compose
  • 反向代理:Nginx

🚀 核心功能特色

1️⃣ 多種任務提交方式

JAR 檔案提交

1# 透過 REST API 上傳和執行 JAR 檔案
2POST /api/flink/jobs/submit-jar
3Content-Type: multipart/form-data
4
5{
6  "jarFile": "your-flink-job.jar",
7  "mainClass": "com.example.FlinkJob",
8  "programArgs": "--input /data/input --output /data/output"
9}

SQL 任務提交

 1-- 透過平台直接提交 Flink SQL
 2CREATE TABLE source_table (
 3    id INT,
 4    name STRING,
 5    timestamp_col TIMESTAMP(3)
 6) WITH (
 7    'connector' = 'kafka',
 8    'topic' = 'input-topic',
 9    'properties.bootstrap.servers' = 'kafka:9092'
10);
11
12INSERT INTO sink_table
13SELECT id, UPPER(name), timestamp_col
14FROM source_table
15WHERE id > 1000;

2️⃣ 即時監控儀表板

 1// Vue.js 監控組件核心邏輯
 2export default {
 3  data() {
 4    return {
 5      jobs: [],
 6      clusterStatus: {},
 7      monitoring: true
 8    }
 9  },
10  methods: {
11    async fetchJobStatus() {
12      try {
13        const response = await this.$axios.get('/api/flink/jobs');
14        this.jobs = response.data;
15        this.updateJobMetrics();
16      } catch (error) {
17        this.$swal('錯誤', '無法獲取任務狀態', 'error');
18      }
19    },
20    
21    updateJobMetrics() {
22      this.jobs.forEach(job => {
23        // 即時更新任務指標
24        this.fetchJobMetrics(job.id);
25      });
26    },
27    
28    startMonitoring() {
29      this.monitoringInterval = setInterval(() => {
30        this.fetchJobStatus();
31      }, 5000); // 每5秒更新一次
32    }
33  }
34}

3️⃣ 互動式數據探索

整合 Apache Zeppelin 提供 Jupyter-like 的互動式數據分析環境:

 1// Zeppelin 筆記本中的 Flink 程式碼範例
 2%flink.ssql
 3
 4// 建立實時數據流
 5val env = StreamExecutionEnvironment.getExecutionEnvironment
 6val dataStream = env
 7  .socketTextStream("localhost", 9999)
 8  .flatMap(_.toLowerCase.split("\\W+"))
 9  .filter(_.nonEmpty)
10  .map((_, 1))
11  .keyBy(0)
12  .timeWindow(Time.seconds(10))
13  .sum(1)
14
15dataStream.print()
16env.execute("Real-time Word Count")

🔧 系統部署與配置

Docker 容器化部署

 1# docker-compose.yml 核心配置
 2version: '3.8'
 3
 4services:
 5  # 前端 Vue.js 應用
 6  frontend:
 7    build: 
 8      context: ./frontend/data-platform-ui
 9    ports:
10      - "8080:8080"
11    depends_on:
12      - backend
13    networks:
14      - data-platform
15
16  # 後端 Spring Boot API
17  backend:
18    build:
19      context: ./backend/DataPlatform
20    ports:
21      - "9999:9999"
22    environment:
23      - FLINK_JOBMANAGER_URL=http://flink-jobmanager:8081
24      - ZEPPELIN_URL=http://zeppelin:8082
25    depends_on:
26      - flink-jobmanager
27      - zeppelin
28    networks:
29      - data-platform
30
31  # Apache Flink JobManager
32  flink-jobmanager:
33    image: flink:1.15.2-scala_2.12
34    ports:
35      - "8081:8081"
36    command: jobmanager
37    environment:
38      - |
39        FLINK_PROPERTIES=
40        jobmanager.rpc.address: flink-jobmanager
41        parallelism.default: 2                
42    networks:
43      - data-platform
44
45  # Apache Flink TaskManager
46  flink-taskmanager:
47    image: flink:1.15.2-scala_2.12
48    depends_on:
49      - flink-jobmanager
50    command: taskmanager
51    scale: 2
52    environment:
53      - |
54        FLINK_PROPERTIES=
55        jobmanager.rpc.address: flink-jobmanager
56        taskmanager.numberOfTaskSlots: 2
57        parallelism.default: 2        
58    networks:
59      - data-platform
60
61  # Apache Zeppelin
62  zeppelin:
63    image: apache/zeppelin:0.10.1
64    ports:
65      - "8082:8080"
66    volumes:
67      - ./zeppelin/notebook:/opt/zeppelin/notebook
68      - ./zeppelin/conf:/opt/zeppelin/conf
69    networks:
70      - data-platform
71
72networks:
73  data-platform:
74    driver: bridge

快速啟動指令

 1# 標準部署
 2docker-compose up -d
 3
 4# 重新建構並啟動
 5docker-compose up --build
 6
 7# MacBook M1 專用版本
 8docker-compose -f docker-compose-m1.yml up -d
 9
10# 查看服務狀態
11docker-compose ps
12
13# 查看日誌
14docker-compose logs -f backend

📊 REST API 設計

任務管理 API

 1@RestController
 2@RequestMapping("/api/flink")
 3public class FlinkJobController {
 4    
 5    @PostMapping("/jobs/submit-jar")
 6    public ResponseEntity<JobSubmitResult> submitJarJob(
 7            @RequestParam("file") MultipartFile jarFile,
 8            @RequestParam("mainClass") String mainClass,
 9            @RequestParam(value = "args", required = false) String programArgs) {
10        
11        try {
12            // 儲存上傳的 JAR 檔案
13            String jarPath = saveUploadedFile(jarFile);
14            
15            // 提交任務到 Flink 叢集
16            JobSubmitResult result = flinkService.submitJarJob(
17                jarPath, mainClass, programArgs
18            );
19            
20            return ResponseEntity.ok(result);
21        } catch (Exception e) {
22            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
23                .body(new JobSubmitResult(false, e.getMessage()));
24        }
25    }
26    
27    @PostMapping("/jobs/submit-sql")
28    public ResponseEntity<JobSubmitResult> submitSqlJob(
29            @RequestBody SqlJobRequest request) {
30        
31        try {
32            JobSubmitResult result = flinkService.submitSqlJob(request.getSql());
33            return ResponseEntity.ok(result);
34        } catch (Exception e) {
35            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
36                .body(new JobSubmitResult(false, e.getMessage()));
37        }
38    }
39    
40    @GetMapping("/jobs")
41    public ResponseEntity<List<JobInfo>> getAllJobs() {
42        List<JobInfo> jobs = flinkService.getAllJobs();
43        return ResponseEntity.ok(jobs);
44    }
45    
46    @GetMapping("/jobs/{jobId}/status")
47    public ResponseEntity<JobStatus> getJobStatus(@PathVariable String jobId) {
48        JobStatus status = flinkService.getJobStatus(jobId);
49        return ResponseEntity.ok(status);
50    }
51    
52    @DeleteMapping("/jobs/{jobId}")
53    public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
54        flinkService.cancelJob(jobId);
55        return ResponseEntity.ok().build();
56    }
57}

叢集監控 API

 1@RestController
 2@RequestMapping("/api/cluster")
 3public class ClusterController {
 4    
 5    @GetMapping("/status")
 6    public ResponseEntity<ClusterStatus> getClusterStatus() {
 7        ClusterStatus status = clusterService.getClusterStatus();
 8        return ResponseEntity.ok(status);
 9    }
10    
11    @GetMapping("/metrics")
12    public ResponseEntity<ClusterMetrics> getClusterMetrics() {
13        ClusterMetrics metrics = clusterService.getClusterMetrics();
14        return ResponseEntity.ok(metrics);
15    }
16    
17    @GetMapping("/taskmanagers")
18    public ResponseEntity<List<TaskManager>> getTaskManagers() {
19        List<TaskManager> taskManagers = clusterService.getTaskManagers();
20        return ResponseEntity.ok(taskManagers);
21    }
22}

🎨 前端介面設計

Vue.js 組件架構

 1// JobManagement.vue - 任務管理主組件
 2<template>
 3  <div class="job-management">
 4    <div class="toolbar">
 5      <el-button type="primary" @click="showSubmitDialog">
 6        <i class="el-icon-plus"></i>
 7        提交新任務
 8      </el-button>
 9      <el-button @click="refreshJobs">
10        <i class="el-icon-refresh"></i>
11        重新整理
12      </el-button>
13    </div>
14    
15    <el-table :data="jobs" v-loading="loading">
16      <el-table-column prop="id" label="任務 ID" width="200"/>
17      <el-table-column prop="name" label="任務名稱"/>
18      <el-table-column prop="status" label="狀態">
19        <template slot-scope="scope">
20          <el-tag :type="getStatusType(scope.row.status)">
21            {{ scope.row.status }}
22          </el-tag>
23        </template>
24      </el-table-column>
25      <el-table-column prop="startTime" label="開始時間"/>
26      <el-table-column label="操作" width="200">
27        <template slot-scope="scope">
28          <el-button size="mini" @click="viewJob(scope.row)">
29            查看
30          </el-button>
31          <el-button 
32            size="mini" 
33            type="danger" 
34            @click="cancelJob(scope.row.id)">
35            取消
36          </el-button>
37        </template>
38      </el-table-column>
39    </el-table>
40  </div>
41</template>
42
43<script>
44export default {
45  name: 'JobManagement',
46  data() {
47    return {
48      jobs: [],
49      loading: false
50    }
51  },
52  
53  mounted() {
54    this.fetchJobs();
55    this.startPolling();
56  },
57  
58  methods: {
59    async fetchJobs() {
60      this.loading = true;
61      try {
62        const response = await this.$http.get('/api/flink/jobs');
63        this.jobs = response.data;
64      } catch (error) {
65        this.$message.error('獲取任務清單失敗');
66      } finally {
67        this.loading = false;
68      }
69    },
70    
71    startPolling() {
72      setInterval(() => {
73        this.fetchJobs();
74      }, 5000);
75    },
76    
77    getStatusType(status) {
78      const statusMap = {
79        'RUNNING': 'success',
80        'FINISHED': 'info',
81        'CANCELED': 'warning',
82        'FAILED': 'danger'
83      };
84      return statusMap[status] || 'info';
85    }
86  }
87}
88</script>

🔍 核心業務邏輯

 1@Service
 2@Slf4j
 3public class FlinkJobService {
 4    
 5    private final RestTemplate restTemplate;
 6    private final FileStorageService fileStorageService;
 7    
 8    @Value("${flink.jobmanager.url}")
 9    private String flinkJobManagerUrl;
10    
11    public JobSubmitResult submitJarJob(String jarPath, String mainClass, String args) {
12        try {
13            // 準備任務提交請求
14            MultiValueMap<String, Object> requestBody = new LinkedMultiValueMap<>();
15            requestBody.add("jarfile", new FileSystemResource(jarPath));
16            
17            // 建構程式參數
18            Map<String, Object> programArgs = new HashMap<>();
19            programArgs.put("entry-class", mainClass);
20            if (StringUtils.hasText(args)) {
21                programArgs.put("program-args", args);
22            }
23            requestBody.add("programArgs", objectMapper.writeValueAsString(programArgs));
24            
25            // 提交任務到 Flink
26            HttpHeaders headers = new HttpHeaders();
27            headers.setContentType(MediaType.MULTIPART_FORM_DATA);
28            
29            HttpEntity<MultiValueMap<String, Object>> requestEntity = 
30                new HttpEntity<>(requestBody, headers);
31            
32            ResponseEntity<FlinkJobSubmitResponse> response = restTemplate.exchange(
33                flinkJobManagerUrl + "/jars/upload",
34                HttpMethod.POST,
35                requestEntity,
36                FlinkJobSubmitResponse.class
37            );
38            
39            if (response.getStatusCode() == HttpStatus.OK) {
40                String jobId = response.getBody().getJobId();
41                log.info("Successfully submitted job: {}", jobId);
42                return new JobSubmitResult(true, jobId, "任務提交成功");
43            } else {
44                return new JobSubmitResult(false, null, "任務提交失敗");
45            }
46            
47        } catch (Exception e) {
48            log.error("Error submitting jar job", e);
49            return new JobSubmitResult(false, null, "任務提交異常: " + e.getMessage());
50        }
51    }
52    
53    public JobSubmitResult submitSqlJob(String sql) {
54        try {
55            // 準備 SQL 任務請求
56            Map<String, Object> sqlRequest = new HashMap<>();
57            sqlRequest.put("statement", sql);
58            sqlRequest.put("execution_type", "sync");
59            
60            HttpHeaders headers = new HttpHeaders();
61            headers.setContentType(MediaType.APPLICATION_JSON);
62            
63            HttpEntity<Map<String, Object>> requestEntity = 
64                new HttpEntity<>(sqlRequest, headers);
65            
66            ResponseEntity<FlinkSqlResponse> response = restTemplate.exchange(
67                flinkJobManagerUrl + "/sql/execute",
68                HttpMethod.POST,
69                requestEntity,
70                FlinkSqlResponse.class
71            );
72            
73            if (response.getStatusCode() == HttpStatus.OK) {
74                String sessionId = response.getBody().getSessionId();
75                log.info("Successfully submitted SQL job: {}", sessionId);
76                return new JobSubmitResult(true, sessionId, "SQL 任務提交成功");
77            } else {
78                return new JobSubmitResult(false, null, "SQL 任務提交失敗");
79            }
80            
81        } catch (Exception e) {
82            log.error("Error submitting SQL job", e);
83            return new JobSubmitResult(false, null, "SQL 任務提交異常: " + e.getMessage());
84        }
85    }
86}

🚀 生產環境部署

Nginx 配置

 1# nginx.conf
 2upstream backend {
 3    server backend:9999;
 4}
 5
 6upstream frontend {
 7    server frontend:8080;
 8}
 9
10server {
11    listen 80;
12    server_name localhost;
13    
14    # 前端靜態資源
15    location / {
16        proxy_pass http://frontend;
17        proxy_set_header Host $host;
18        proxy_set_header X-Real-IP $remote_addr;
19        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
20    }
21    
22    # 後端 API
23    location /api/ {
24        proxy_pass http://backend;
25        proxy_set_header Host $host;
26        proxy_set_header X-Real-IP $remote_addr;
27        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
28        
29        # WebSocket 支援
30        proxy_http_version 1.1;
31        proxy_set_header Upgrade $http_upgrade;
32        proxy_set_header Connection "upgrade";
33    }
34    
35    # Flink Web UI 代理
36    location /flink/ {
37        proxy_pass http://flink-jobmanager:8081/;
38        proxy_set_header Host $host;
39        proxy_set_header X-Real-IP $remote_addr;
40    }
41    
42    # Zeppelin 代理
43    location /zeppelin/ {
44        proxy_pass http://zeppelin:8080/;
45        proxy_set_header Host $host;
46        proxy_set_header X-Real-IP $remote_addr;
47    }
48}

環境變數配置

 1# .env 檔案
 2# Flink 組態
 3FLINK_JOBMANAGER_URL=http://flink-jobmanager:8081
 4FLINK_TASKMANAGER_SLOTS=2
 5FLINK_PARALLELISM_DEFAULT=2
 6
 7# Zeppelin 組態
 8ZEPPELIN_NOTEBOOK_DIR=/opt/zeppelin/notebook
 9ZEPPELIN_INTERPRETER_DIR=/opt/zeppelin/interpreter
10
11# Spring Boot 組態
12SPRING_PROFILES_ACTIVE=production
13SERVER_PORT=9999
14LOGGING_LEVEL_ROOT=INFO
15
16# Vue.js 組態
17VUE_APP_API_BASE_URL=http://localhost/api
18VUE_APP_FLINK_WEB_URL=http://localhost/flink
19VUE_APP_ZEPPELIN_URL=http://localhost/zeppelin

🔗 相關技術連結

🎯 總結

SpringDataPlatform 是一個功能豐富的企業級大數據平台,成功整合了現代 Web 技術與分散式計算框架。透過直觀的使用者介面,數據工程師和分析師可以輕鬆管理 Flink 任務、監控執行狀態,並進行即時數據分析。

專案亮點

多元任務提交:支援 JAR 檔案和 SQL 兩種提交方式
即時監控:完整的任務和叢集狀態監控
互動分析:整合 Zeppelin 提供 Jupyter-like 體驗
容器化部署:Docker Compose 一鍵部署
企業級架構:模組化設計,易於擴展和維護

這個專案展示了如何運用現代技術棧構建可擴展的大數據處理平台,為企業數位轉型提供強而有力的技術支援。 🚀✨