Flink入門(五)——DataSet Api編程指南

来源:https://www.cnblogs.com/tree1123/archive/2020/01/06/12155955.html
-Advertisement-
Play Games

Apache Flink Apache Flink 是一個兼顧高吞吐、低延遲、高性能的分散式處理框架。在實時計算崛起的今天,Flink正在飛速發展。由於性能的優勢和兼顧批處理,流處理的特性,Flink可能正在顛覆整個大數據的生態。 DataSet API 首先要想運行Flink,我們需要下載並解壓F ...


file

Apache Flink 是一個兼顧高吞吐、低延遲、高性能的分散式處理框架。在實時計算崛起的今天,Flink正在飛速發展。由於性能的優勢和兼顧批處理,流處理的特性,Flink可能正在顛覆整個大數據的生態。

file

DataSet API

首先要想運行Flink,我們需要下載並解壓Flink的二進位包,下載地址如下:https://flink.apache.org/downloads.html

我們可以選擇Flink與Scala結合版本,這裡我們選擇最新的1.9版本Apache Flink 1.9.0 for Scala 2.12進行下載。

下載成功後,在windows系統中可以通過Windows的bat文件或者Cygwin來運行Flink。

在linux系統中分為單機,集群和Hadoop等多種情況。

請參考:Flink入門(三)——環境與部署

Flink的編程模型,Flink提供了不同的抽象級別以開發流式或者批處理應用,本文我們來介紹DataSet API ,Flink最常用的批處理編程模型。

file

Flink中的DataSet程式是實現數據集轉換的常規程式(例如,Filter,映射,連接,分組)。數據集最初是從某些來源創建的(例如,通過讀取文件或從本地集合創建)。結果通過接收器返回,接收器可以例如將數據寫入(分散式)文件或標準輸出(例如命令行終端)。Flink程式可以在各種環境中運行,獨立運行或嵌入其他程式中。執行可以在本地JVM中執行,也可以在許多電腦的集群上執行。

示常式序

以下程式是WordCount的完整工作示例。您可以複製並粘貼代碼以在本地運行它。

Java

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

Scala

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

數據集轉換

數據轉換將一個或多個DataSet轉換為新的DataSet。程式可以將多個轉換組合到複雜的程式集中。

DataSet API 中最重要的就是這些運算元,我們將數據接入後,通過這些運算元對數據進行處理,得到我們想要的結果。

Java版運算元如下:

轉換 描述
Map 採用一個數據元並生成一個數據元。data.map(new MapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
FlatMap 採用一個數據元並生成零個,一個或多個數據元。data.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) { for (String s : value.split(" ")) { out.collect(s); } } });
MapPartition 在單個函數調用中轉換並行分區。該函數將分區作為Iterable流來獲取,並且可以生成任意數量的結果值。每個分區中的數據元數量取決於並行度和先前的 運算元操作。data.mapPartition(new MapPartitionFunction<String, Long>() { public void mapPartition(Iterable<String> values, Collector<Long> out) { long c = 0; for (String s : values) { c++; } out.collect(c); } });
Filter 計算每個數據元的布爾函數,並保存函數返回true的數據元。 重要信息:系統假定該函數不會修改應用謂詞的數據元。違反此假設可能會導致錯誤的結果。data.filter(new FilterFunction<Integer>() { public boolean filter(Integer value) { return value > 1000; } });
Reduce 通過將兩個數據元重覆組合成一個數據元,將一組數據元組合成一個數據元。Reduce可以應用於完整數據集或分組數據集。data.reduce(new ReduceFunction<Integer> { public Integer reduce(Integer a, Integer b) { return a + b; } });如果將reduce應用於分組數據集,則可以通過提供CombineHintto 來指定運行時執行reduce的組合階段的方式 setCombineHint。在大多數情況下,基於散列的策略應該更快,特別是如果不同鍵的數量與輸入數據元的數量相比較小(例如1/10)。
ReduceGroup 將一組數據元組合成一個或多個數據元。ReduceGroup可以應用於完整數據集或分組數據集。data.reduceGroup(new GroupReduceFunction<Integer, Integer> { public void reduce(Iterable<Integer> values, Collector<Integer> out) { int prefixSum = 0; for (Integer i : values) { prefixSum += i; out.collect(prefixSum); } } });
Aggregate 將一組值聚合為單個值。聚合函數可以被認為是內置的reduce函數。聚合可以應用於完整數據集或分組數據集。Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);您還可以使用簡寫語法進行最小,最大和總和聚合。Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
Distinct 返回數據集的不同數據元。它相對於數據元的所有欄位或欄位子集從輸入DataSet中刪除重覆條目。data.distinct();使用reduce函數實現Distinct。您可以通過提供CombineHintto 來指定運行時執行reduce的組合階段的方式 setCombineHint。在大多數情況下,基於散列的策略應該更快,特別是如果不同鍵的數量與輸入數據元的數量相比較小(例如1/10)。
Join 通過創建在其鍵上相等的所有數據元對來連接兩個數據集。可選地使用JoinFunction將數據元對轉換為單個數據元,或使用FlatJoinFunction將數據元對轉換為任意多個(包括無)數據元。請參閱鍵部分以瞭解如何定義連接鍵。result = input1.join(input2) .where(0) // key of the first input (tuple field 0) .equalTo(1); // key of the second input (tuple field 1)您可以通過Join Hints指定運行時執行連接的方式。提示描述了通過分區或廣播進行連接,以及它是使用基於排序還是基於散列的演算法。有關可能的提示和示例的列表,請參閱“ 轉換指南”。 如果未指定提示,系統將嘗試估算輸入大小,並根據這些估計選擇最佳策略。// This executes a join by broadcasting the first data set // using a hash table for the broadcast data result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) .where(0).equalTo(1);請註意,連接轉換僅適用於等連接。其他連接類型需要使用OuterJoin或CoGroup表示。
OuterJoin 在兩個數據集上執行左,右或全外連接。外連接類似於常規(內部)連接,並創建在其鍵上相等的所有數據元對。此外,如果在另一側沒有找到匹配的Keys,則保存“外部”側(左側,右側或兩者都滿)的記錄。匹配數據元對(或一個數據元和null另一個輸入的值)被賦予JoinFunction以將數據元對轉換為單個數據元,或者轉換為FlatJoinFunction以將數據元對轉換為任意多個(包括無)數據元。請參閱鍵部分以瞭解如何定義連接鍵。input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins .where(0) // key of the first input (tuple field 0) .equalTo(1) // key of the second input (tuple field 1) .with(new JoinFunction<String, String, String>() { public String join(String v1, String v2) { // NOTE: // - v2 might be null for leftOuterJoin // - v1 might be null for rightOuterJoin // - v1 OR v2 might be null for fullOuterJoin } });
CoGroup reduce 運算元操作的二維變體。將一個或多個欄位上的每個輸入分組,然後關聯組。每對組調用轉換函數。請參閱keys部分以瞭解如何定義coGroup鍵。data1.coGroup(data2) .where(0) .equalTo(1) .with(new CoGroupFunction<String, String, String>() { public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) { out.collect(...); } });
Cross 構建兩個輸入的笛卡爾積(交叉乘積),創建所有數據元對。可選擇使用CrossFunction將數據元對轉換為單個數據元DataSet<Integer> data1 = // [...] DataSet<String> data2 = // [...] DataSet<Tuple2<Integer, String>> result = data1.cross(data2);註:交叉是一個潛在的非常計算密集型 運算元操作它甚至可以挑戰大的計算集群!建議使用crossWithTiny()crossWithHuge()來提示系統的DataSet大小。
Union 生成兩個數據集的並集。DataSet<String> data1 = // [...] DataSet<String> data2 = // [...] DataSet<String> result = data1.union(data2);
Rebalance 均勻地Rebalance 數據集的並行分區以消除數據偏差。只有類似Map的轉換可能會遵循Rebalance 轉換。DataSet<String> in = // [...] DataSet<String> result = in.rebalance() .map(new Mapper());
Hash-Partition 散列分區給定鍵上的數據集。鍵可以指定為位置鍵,表達鍵和鍵選擇器函數。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByHash(0) .mapPartition(new PartitionMapper());
Range-Partition Range-Partition給定鍵上的數據集。鍵可以指定為位置鍵,表達鍵和鍵選擇器函數。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByRange(0) .mapPartition(new PartitionMapper());
Custom Partitioning 手動指定數據分區。 註意:此方法僅適用於單個欄位鍵。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
Sort Partition 本地按指定順序對指定欄位上的數據集的所有分區進行排序。可以將欄位指定為元組位置或欄位表達式。通過鏈接sortPartition()調用來完成對多個欄位的排序。DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING) .mapPartition(new PartitionMapper());
First-n 返回數據集的前n個(任意)數據元。First-n可以應用於常規數據集,分組數據集或分組排序數據集。分組鍵可以指定為鍵選擇器函數或欄位位置鍵。DataSet<Tuple2<String,Integer>> in = // [...] // regular data set DataSet<Tuple2<String,Integer>> result1 = in.first(3); // grouped data set DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0) .first(3); // grouped-sorted data set DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0) .sortGroup(1, Order.ASCENDING) .first(3);

數據源

數據源創建初始數據集,例如來自文件或Java集合。創建數據集的一般機制是在InputFormat後面抽象的 。Flink附帶了幾種內置格式,可以從通用文件格式創建數據集。他們中的許多人在ExecutionEnvironment上都有快捷方法。

基於文件的:

  • readTextFile(path)/ TextInputFormat- 按行讀取文件並將其作為字元串返回。
  • readTextFileWithValue(path)/ TextValueInputFormat- 按行讀取文件並將它們作為StringValues返回。StringValues是可變字元串。
  • readCsvFile(path)/ CsvInputFormat- 解析逗號(或其他字元)分隔欄位的文件。返回元組或POJO的DataSet。支持基本java類型及其Value對應作為欄位類型。
  • readFileOfPrimitives(path, Class)/ PrimitiveInputFormat- 解析新行(或其他字元序列)分隔的原始數據類型(如String或)的文件Integer
  • readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat- 解析新行(或其他字元序列)分隔的原始數據類型的文件,例如StringInteger使用給定的分隔符。
  • readSequenceFile(Key, Value, path)/ SequenceFileInputFormat- 創建一個JobConf並從類型為SequenceFileInputFormat,Key class和Value類的指定路徑中讀取文件,並將它們作為Tuple2 <Key,Value>返回。

基於集合:

  • fromCollection(Collection) - 從Java Java.util.Collection創建數據集。集合中的所有數據元必須屬於同一類型。
  • fromCollection(Iterator, Class) - 從迭代器創建數據集。該類指定迭代器返回的數據元的數據類型。
  • fromElements(T ...) - 根據給定的對象序列創建數據集。所有對象必須屬於同一類型。
  • fromParallelCollection(SplittableIterator, Class) - 並行地從迭代器創建數據集。該類指定迭代器返回的數據元的數據類型。
  • generateSequence(from, to) - 並行生成給定間隔中的數字序列。

通用:

  • readFile(inputFormat, path)/ FileInputFormat- 接受文件輸入格式。
  • createInput(inputFormat)/ InputFormat- 接受通用輸入格式。

例子

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

// read text file from a HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                           .types(Integer.class, String.class, Double.class);

// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
                           .types(String.class, Double.class);

// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");

// read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

// generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

// Note: Flink's program compiler needs to infer the data types of the data items which are returned
// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
// manually provide the type information as shown in the examples above.

收集數據源和接收器

通過創建輸入文件和讀取輸出文件來完成分析程式的輸入並檢查其輸出是很麻煩的。Flink具有特殊的數據源和接收器,由Java集合支持以簡化測試。一旦程式經過測試,源和接收器可以很容易地被讀取/寫入外部數據存儲(如HDFS)的源和接收器替換。

在開發中,我們經常直接使用接收器對數據源進行接收。

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

// Create a DataSet from a list of elements
DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataSet from any Java collection
List<Tuple2<String, Integer>> data = ...
DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataSet from an Iterator
Iterator<Long> longIt = ...
DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);

廣播變數

除了常規的 運算元操作輸入之外,廣播變數還允許您為 運算元操作的所有並行實例提供數據集。這對於輔助數據集或與數據相關的參數化非常有用。然後,運算元可以將數據集作為集合訪問。

// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcast DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

分散式緩存

Flink提供了一個分散式緩存,類似於Apache Hadoop,可以在本地訪問用戶函數的並行實例。此函數可用於共用包含靜態外部數據的文件,如字典或機器學習的回歸模型。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

以上就是DataSet API 的使用,其實和spark非常的相似,我們將數據接入後,可以利用各種運算元對數據進行處理。

Flink Demo代碼

Flink系列文章:

Flink入門(一)——Apache Flink介紹
Flink入門(二)——Flink架構介紹

Flink入門(三)——環境與部署

Flink入門(四)——編程模型

更多實時計算,Flink,Kafka等相關技術博文,歡迎關註實時流式計算

file


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 故障現象: 修改nginx配置參數後,使用nginx -t檢查配置,出現告警提示 nginx: [warn] conflicting server name "aaa.7yule.cn" on 0.0.0.0:80, ignored 故障原因: 自己新增配置中的功能變數名稱在其他配置中存在,一個功能變數名稱做了兩個 ...
  • 一、下載Oracle 11g R2 for Windows。 官方網站: 二、解壓兩個壓縮包到同一個目錄下,即‘database’,然後點擊‘setup.exe’文件開始安裝。 三、執行安裝程式後,選“是”。 四、稍微等待一會,就會出現如下圖所示的安裝畫面,取消下圖所示的選中,然後單擊"下一步"繼續 ...
  • 1.1 Hive簡介 1.1.1 什麼是Hive Hive是基於Hadoop的一個數據倉庫工具,可以將結構化的數據文件映射為一張資料庫表,並提供類SQL查詢功能。 1.1.2 為什麼使用Hive 直接使用hadoop所面臨的問題 人員學習成本太高 項目周期要求太短 MapReduce實現複雜查詢邏輯 ...
  • 概述: 事務是由一系列語句構成的邏輯工作單元。事務和存儲過程等批處理有一定程度上的相似之處, 通常都是為了完成一定業務邏輯而將一條或者多條語句“封裝”起來,使它們與其他語句之間出現一個邏輯上的邊界,並形成相對獨立的一個工作單元。 當使用事務修改多個數據表時,如果在處理的過程中出現了某種錯誤,例如系統 ...
  • 一、Profile目的: Oracle系統中的profile可以用來對用戶所能使用的資料庫資源進行限制,使用Create Profile命令創建一個Profile,用它來實現對資料庫資源的限制使用,如果把該profile分配給用戶,則該用戶所能使用的資料庫資源都在該profile的限制之內。具體管理 ...
  • binlog相關 MySQL 的二進位日誌 binlog 可以說是 MySQL 最重要的日誌,它記錄了所有的 DDL 和 DML 語句(除了數據查詢語句select、show等),以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進位日誌是事務安全型的。binlog 的主要目的是複製和恢復 ...
  • ©Copyright 蕃薯耀 2020-01-07 https://www.cnblogs.com/fanshuyao/ 第一步: 下載 instantclient(instantclient-basic-win32-11.2.0.1.0.zip) 下載地址: https://pan.baidu.c ...
  • MAXSETSIZE跟MAXPIECESIZE用法 區別:maxpiecesize設置的是備份完成後的備份片大小,對備份整體的大小沒有影響,比如一個G的備份完成文件,maxpiecesize設置為100M,最後就會變成10個100M的備份文件。Maxsetsize設置限定的是整體大小,個人認為這個功 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...