微服務技術棧:流量整形算法,服務熔斷與降級

本文源碼:GitHub·點這裏 || GitEE·點這裏

一、流量控制

1、基本概念

流量控制的核心作用是限制流出某一網絡的某一連接的流量與突發,使這類報文以比較均勻的速度流動發送,達到保護系統相對穩定的目的。通常是將請求放入緩衝區或隊列內,然後基於特定策略處理請求,勻速或者批量處理,該過程也稱流量整形。

流量控制的核心算法有以下兩種:漏桶算法和令牌桶算法。

2、漏桶算法

基礎描述

漏桶算法是流量整形或速率限制時經常使用的一種算法,它的主要目的是控制數據注入到網絡的速率,平滑網絡上的突發流量。漏桶算法提供了一種機制,通過它,突發流量可以被整形以便為網絡提供一個穩定的流量。

漏桶算法基本思路:請求(水流)先進入到容器(漏桶)里,漏桶以一定的速度出水,這裏就是指流量流出的策略,當流量流入速度過大容器無法承接就會直接溢出,通過該過程限制數據的傳輸速率。

核心要素

通過上述流程,不難發現漏桶算法涉及下面幾個要素:

容器容量

容器的大小直接決定能承接流量的多少,容器一但接近飽和,要麼溢出,要麼加快流速;

流出速度

流量流出的速度取決於服務的請求處理能力,接口支撐的併發越高,流速就可以越大;

時間控制

基於時間記錄,判斷流量流出速度,控制勻速模式,

注意:需要一個基本的判定策略,漏桶算法在系統能承接當前併發流量時,不需要啟用。

3、令牌桶算法

基礎描述

令牌桶可自行以恆定的速率源源不斷地產生令牌。如果令牌不被消耗,或者被消耗的速度小於產生的速度,令牌就會不斷地增多,直到把桶填滿。後面再產生的令牌就會從桶中溢出。

令牌桶算法雖然根本目的也是控制流量速度,但是當令牌桶內的令牌足夠多時,則允許流量階段性的併發。傳送到令牌桶的數據包需要消耗令牌。不同大小的數據包,消耗的令牌數量不一樣。

核心要素

令牌桶

存放按照特定的速率生成的令牌,以此控制流量速度。

匹配規則

這裏的匹配規則更多是服務於分佈式系統,例如服務A是系統的核心交易,當出現併發時,基於令牌桶最匹配規則,只允許交易請求通過,例如:常見雙十一期間,各大電商平台提示,為保證核心交易,邊緣服務的數據延遲或暫停等。

注意:令牌桶算法和漏桶算法的目的雖然相同,但是實現策略是相反的,不過都存在一個問題,為保證大部分請求流量成功,會犧牲小部分請求。

二、限流組件

1、Nginx代理組件

Nginx反向代理實際運行方式是指以代理服務器來接收客戶端連接請求,然後將請求轉發給內部網絡上的服務器,並將從服務器上得到的結果返回給客戶端,此時代理服務器對外就表現為一個服務器。

流量限制是Nginx作為代理服務中一個非常實用的功能,通過配置方式來限制用戶在給定時間內HTTP請求的數量,兩個主要的配置指令limit_req_zonelimit_req,以此保護高併發下系統的穩定。

2、CDN邊緣節點

CDN邊緣節點,準確的說並不是用來處理流量限制的,而是存放靜態頁面。內容緩存為CDN網絡節點,位於用戶接入點,是面向最終用戶的內容提供設備,可緩存靜態Web內容和流媒體內容,實現內容的邊緣傳播和存儲,以便用戶的就近訪問,這樣避免用戶大量刷新數據服務器,節省骨幹網帶寬,減少帶寬需求量。

在高併發場景下,尤其是倒計時搶購類似業務,在活動開始前後用戶會產生大量刷新頁面的操作,基於CDN節點,這些請求不會下沉到數據的服務接口上。也可以基於頁面做一些請求攔截,比如點擊頁面單位時間內只放行一定量的請求,以此也可以實現一個限流控制。

三、熔斷器組件

所謂熔斷器機制,即類似電流的保險器,當然電壓過高會自動跳閘,從而保護電路系統。微服務架構中服務保護也是這個策略,當服務被判斷異常,會從服務列表斷開,等待恢復在重新連接。服務熔斷降級的策略實現有如下幾個常用的組件。

1、Hystrix組件

基礎簡介

Hystrix當前處於維護模式,即不再更新,作為SpringCloud微服務組件中,最原生的一個熔斷組件,很多思路還是有必要了解一下。例如:服務熔斷,阻止故障的連鎖反應,快速失敗並迅速恢復,服務降級等。

某個微服務發生故障時,要快速切斷服務,提示用戶,後續請求,不調用該服務,直接返回,釋放資源,這就是服務熔斷。

熔斷器策略

服務器高併發下,壓力劇增的時候,根據當業務情況以及流量,對一些服務和頁面有策略的降級(可以理解為關閉不必要的服務),以此緩解服務器資源的壓力以保障核心任務的正常運行。熔斷生效后,會在指定的時間后調用請求來測試依賴是否恢復,依賴的應用恢復后關閉熔斷。

基本流程:

首先判斷服務熔斷器開關狀態,服務如果未熔斷則放行請求;如果服務處於熔斷中則直接返回。

每次調用都執行兩個函數markSuccess(duration)和markFailure(duration) 來統計在一定的時間段內的調用是成功和失敗次數。

基於上述的成功和失敗次數的計算策略,來判斷是否應該打開熔斷器,如果錯誤率高於一定的閾值,就會觸發熔斷機制。

熔斷器有一個生命周期,周期過後熔斷器器進入半開狀態,允許放行一個試探請求;否則,不允許放行。

2、Sentinel組件

基礎簡介

基於微服務的模式,服務和服務之間的穩定性變得越來越重要。Sentinel以流量為切入點,從流量控制、熔斷降級、系統負載保護等多個維度保護服務的穩定性。

Sentinel可以針對不同的調用關係,以不同的運行指標(如QPS、併發調用數、系統負載等)為基準,收集資源的路徑,並將這些資源的調用路徑以樹狀結構存儲起來,用於根據調用路徑對資源進行流量控制。

流量整形策略

直接拒絕模式是默認的流量控制方式,即請求超出任意規則的閾值后,新的請求就會被立即拒絕。

啟動預熱模式:當流量激增的時候,控制流量通過的速率,讓通過的流量緩慢增加,在一定時間內逐漸增加到閾值上限,給冷系統一個預熱的時間,避免冷系統被壓垮。

勻速排隊方式會嚴格控制請求通過的間隔時間,也即是讓請求以均勻的速度通過,對應的是漏桶算法。

熔斷策略

Sentinel本質上是基於熔斷器模式,支持基於異常比率的熔斷降級,在調用達到一定量級並且失敗比率達到設定的閾值時自動進行熔斷,此時所有對該資源的調用都會被阻塞,直到過了指定的時間窗口后才啟發性地恢復。

四、源代碼地址

GitHub·地址
https://github.com/cicadasmile/husky-spring-cloud
GitEE·地址
https://gitee.com/cicadasmile/husky-spring-cloud

推薦閱讀:微服務架構系列

序號 標題
01 微服務架構:項目技術選型簡介,架構圖解說明
02 微服務架構:業務架構設計,系統分層管理
03 微服務架構:數據庫選型簡介,業務數據規劃設計
04 微服務架構:中間件集成,公共服務封裝
05 微服務架構:SpringCloud 基礎組件應用設計
06 微服務架構:通過業務、應用、技術、存儲,聊聊架構
07 微服務技術棧:常見註冊中心組件,對比分析

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

Linux系統如何使用Fuser命令

本文不再更新,可能存在內容過時的情況,實時更新請訪問原地址:Linux系統如何使用Fuser命令;

什麼是Fuser命令?

fuser命令是一個非常聰明的unix實用程序,用於查找正在使用某個文件、目錄或socket的進程。 它還提供有關擁有該進程的用戶和訪問類型的信息。。fuser工具显示了使用指定文件或文件系統的每個進程的進程ID(PID)。

安裝

如果你的精簡版運行fuser提示如下信息:

-bash: fuser: command not found

請執行如下命令安裝:

[winbert@winbert-server ~]$ sudo yum -y install psmisc

如何使用fuser命令?

man命令可用於查看任何命令的幫助手冊,但是學習新知識(尤其是linux命令)的最佳方法是通過閱讀真實的示例,並且不斷地在終端中鍵入命令。 在終端中運行以下命令,以獲取有關fuser實用程序的使用選項的信息。

[winbert@winbert-server ~]$ fuser
No process specification given
Usage: fuser [-fMuvw] [-a|-s] [-4|-6] [-c|-m|-n SPACE] [-k [-i] [-SIGNAL]] NAME...
       fuser -l
       fuser -V
Show which processes use the named files, sockets, or filesystems.

  -a,--all              display unused files too
  -i,--interactive      ask before killing (ignored without -k)
  -k,--kill             kill processes accessing the named file
  -l,--list-signals     list available signal names
  -m,--mount            show all processes using the named filesystems or block device
  -M,--ismountpoint     fulfill request only if NAME is a mount point
  -n,--namespace SPACE  search in this name space (file, udp, or tcp)
  -s,--silent           silent operation
  -SIGNAL               send this signal instead of SIGKILL
  -u,--user             display user IDs
  -v,--verbose          verbose output
  -w,--writeonly        kill only processes with write access
  -V,--version          display version information
  -4,--ipv4             search IPv4 sockets only
  -6,--ipv6             search IPv6 sockets only
  -                     reset options

  udp/tcp names: [local_port][,[rmt_host][,[rmt_port]]]

如何查看使用某個目錄的進程

fuser序可以與-v選項一起使用,該選項以詳細模式運行該工具。 verbose選項用於在計算機屏幕上生成詳細輸出,因此用戶可以實時查看實用程序正在執行的操作。

[winbert@winbert-server ~]$ fuser -v .
                     USER        PID ACCESS COMMAND
/home/winbert:       winbert    1435 ..c.. bash

上面的輸出显示,以詳細模式運行時,fuser會提供有關USERPIDACCESSCOMMAND的信息。 ACCESS下的c字符表示訪問類型,表示“當前目錄”。 訪問類型很多,例如e(正在運行的可執行文件),r(根目錄),f(打開文件。在默認显示模式下省略f),F(用於寫入的打開文件,在默認显示模式下省略F)和 m(mmap文件或共享庫)。

查看使用你tcp或udp套接字的進程?

有時您需要使用TCP和UDP套接字查找進程。 為了查找這些進程,需要使用-n選項。 -n選項用於選擇相應的名稱空間。

[root@huidukongjian-h4 docker]# fuser -v -n tcp 80
                     USER        PID ACCESS COMMAND
80/tcp:              root      27411 F.... docker-proxy

默認情況下,fuser將同時在IPv6和IPv4套接字中查找,但是可以使用-4-6選項更改默認選項。 -4選項代表IPv4-6選項代表IPv6。 請注意,fuser僅將PID輸出到stdout,其他所有內容都發送到stderr。

fuser -v -n tcp 80命令的結果显示,使用docker的進程的進程ID為27411,而用於啟動該進程的命令為docker-proxy。 進程ID(PID)可以以多種方式使用,其中之一是進程終止。 與PID一起使用時,kill命令根據該進程ID終止進程。 fuser還可用於終止訪問特定文件的進程。 在以下命令中,-k選項用於終止正在使用在端口123上運行的tcp偵聽器的進程。為確保用戶不會殺死錯誤的進程,使用-i選項詢問用戶是否 在終止進程之前進行確認。

fuser -k  123/tcp

使用帶有-i選項的fuser -k命令在終止進程之前要求用戶進行確認。 用戶可以用y回答“是”,或者用N回答不殺死進程。

fuser -i -k 123/tcp
123/tcp:             12216
Kill process 12216 ? (y/N)
Use The -6 Option To Look For IPv6 Sockets.

以下命令以詳細模式使用fuser,並嘗試查找在端口123上運行的IPv6套接字。

fuser -v -n tcp -6 123

查找佔用某個文件系統的進程

-m選項可與fuser命令一起使用,以查找訪問文件文件系統上文件的進程。 此選項需要文件名作為輸入參數。 -m選項非常有用,尤其是當用於發現正在訪問文件系統的進程並標識要殺死的進程時。

以下命令显示所有訪問“ example.txt”所在的文件系統的進程。 仔細查看-m選項如何與fuser一起使用。

[root@huidukongjian-h4 docker]# fuser -v -m data/v2/config.json 
                     USER        PID ACCESS COMMAND
/root/docker-v2/data/v2/config.json:
                     root     kernel mount /
                     root          1 .rce. systemd
                     root          2 .rc.. kthreadd
                     root          3 .rc.. rcu_gp
                     root          4 .rc.. rcu_par_gp
                     root          6 .rc.. kworker/0:0H-kbl
                     root          8 .rc.. mm_percpu_wq
                     root          9 .rc.. ksoftirqd/0
                     root         10 .rc.. rcu_sched
                     root         11 .rc.. migration/0
                     root         12 .rc.. watchdog/0
                     root         13 .rc.. cpuhp/0
                     root         16 .rc.. netns
                     root         17 .rc.. kauditd
                     root         18 .rc.. khungtaskd
                     root         19 .rc.. oom_reaper
                     root         20 .rc.. writeback
                     root         21 .rc.. kcompactd0
                     root         22 .rc.. ksmd
                     root         23 .rc.. khugepaged
                     root         24 .rc.. crypto
                     root         25 .rc.. kintegrityd
                     root         26 .rc.. kblockd
                     root         27 .rc.. tpm_dev_wq
                     root         28 .rc.. md
                     root         29 .rc.. edac-poller
                     root         30 .rc.. watchdogd
                     root         42 .rc.. kswapd0
                     root         93 .rc.. kthrotld
                     root         94 .rc.. acpi_thermal_pm
                     root         95 .rc.. kmpath_rdacd
                     root         96 .rc.. kaluad
                     root         97 .rc.. ipv6_addrconf
                     root         98 .rc.. kstrp
                     root        326 .rc.. scsi_eh_0
                     root        327 .rc.. scsi_tmf_0
                     root        329 .rc.. kworker/0:1H-kbl
                     root        361 .rc.. ata_sff
                     root        363 .rc.. scsi_eh_1
                     root        365 .rc.. scsi_tmf_1
                     root        366 .rc.. scsi_eh_2
                     root        367 .rc.. scsi_tmf_2
                     root        387 .rc.. xfsalloc
                     root        390 .rc.. xfs_mru_cache
                     root        391 .rc.. xfs-buf/vda1
                     root        394 .rc.. xfs-data/vda1
                     root        395 .rc.. xfs-conv/vda1
                     root        396 .rc.. xfs-cil/vda1
                     root        397 .rc.. xfs-reclaim/vda
                     root        398 .rc.. xfs-log/vda1
                     root        399 .rc.. xfs-eofblocks/v
                     root        400 .rc.. xfsaild/vda1
                     root        486 .rce. systemd-journal
                     rpc         541 .rce. rpcbind
                     root        543 Frce. auditd
                     root        545 .rce. sedispatch
                     root        558 .rc.. rpciod
                     root        559 .rc.. kworker/u3:0
                     root        561 .rc.. xprtiod
                     root        582 Frce. sssd
                     polkitd     585 .rce. polkitd
                     root        589 .rce. rngd
                     dbus        593 frce. dbus-daemon
                     chrony      612 .rce. chronyd
                     root        652 Frce. sssd_be
                     root        668 Frce. sssd_nss
                     root        671 .rc.. ttm_swap
                     root        672 .rc.. nfit
                     root        675 frce. systemd-logind
                     root        683 Frce. gssproxy
                     root        740 frce. NetworkManager
                     root        743 Frce. tuned
                     root        814 frce. systemd-udevd
                     root        889 frce. sshd
                     root        890 Frce. rsyslogd
                     root        895 frce. agetty
                     root        898 frce. crond
                     root        899 frce. agetty
                     root      21821 .rc.. kworker/u2:0-flu
                     root      25475 frce. sshd
                     root      25480 .rce. systemd
                     root      25485 frce. (sd-pam
                     root      25491 frce. sshd
                     root      25492 frce. bash
                     root      25705 Frce. containerd
                     root      25706 Frce. dockerd
                     root      26375 .rc.. kworker/u2:1-eve
                     root      27251 Fr.e. containerd-shim
                     root      27267 F...m v2
                     root      27273 Fr.e. containerd-shim
                     root      27295 ....m sh
                     root      27400 .rce. docker-proxy
                     root      27411 .rce. docker-proxy
                     root      27416 Fr.e. containerd-shim
                     root      27432 ....m sh
                     root      27478 ....m sh
                     root      27479 F...m nginx
                     root      27480 ....m sleep
                     (unknown)  27481 F...m nginx
                     root      27561 ....m sleep
                     root      27705 .rc.. kworker/0:0-xfs-
                     root      27765 .rc.. kworker/0:1-xfs-
                     root      27836 .rc.. kworker/0:2-even
                     root      27860 frce. sshd
                     root      27883 frce. sshd
                     sshd      27884 frce. sshd

fuser還可用於將特定指令發送到某個進程。 當與-k選項一起使用時,fuser命令將KILL指令發送給進程。 有很多指令可以發送給運行中的進程,-l選項有助於查找可以與fuser一起使用的指令列表。

[root@huidukongjian-h4 docker]# fuser -l
HUP INT QUIT ILL TRAP ABRT BUS FPE KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT
CHLD CONT STOP TSTP TTIN TTOU URG XCPU XFSZ VTALRM PROF WINCH POLL PWR SYS

本文不再更新,可能存在內容過時的情況,實時更新請訪問原地址:Linux系統如何使用Fuser命令;

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

※別再煩惱如何寫文案,掌握八大原則!

聯合國專家談循環經濟 勿落入回收迷思 「最難的是政治問題」

環境資訊中心記者 孫文臨報導

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

歐洲各國豪雨不斷 洪水土石流頻傳

摘錄自2019年11月27日公視報導

美澳野火肆虐,在歐洲則是飽受洪災侵襲,包括義大利、法國和希臘,當地從上個週末以來就豪雨不斷;各地接連發生洪水和土石流災情,至少已經傳出有九人死亡。

有的汽車卡在樹上,有的掉進泥池裡,所有東西都被大水沖得四散各地,連路面也跟著崩塌毀損,每戶民宅裡外都是一片狼藉。這是希臘西部這幾天遭受暴雨襲擊過後的景象,面對天災,居民有苦難言。

當地居民說,「水勢洶洶而來,當我們呼叫人在地下室的孩子時,什麼東西都沒有了,門啊、窗啊,洗衣機都在庭院裡了,你去那邊看看庭院的狀況,原本停了五台車,現在一台也不剩。」類似的豪雨災情,同樣出現在義大利北部,靠近山區的一座高架橋不敵洪水與土石流,應聲斷裂、坍塌,所幸目前沒有發現人員傷亡。

另一個為強降雨所苦的地區是法國南部,其中,瓦爾省境內受到洪患影響最大的幾座城鎮,短短48小時之內就累積了三個月的降雨量。許多地方河川潰堤,導致嚴重淹水,而等到星期一部分地區水位退去之後,有4500個住戶停電。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

※別再煩惱如何寫文案,掌握八大原則!

人類首次測到藍鯨心跳! 「最高每分鐘34下」解開世界最大體積動物之謎

摘錄自2019年11月27日ETtoday新聞雲報導

藍鯨是目前世界上體型最大的動物,也是地球史上已知動物中體型最大的,由於體型過大,使得科學家一直很難完整得知牠的生理特徵,近來卻有科學家首次測到藍鯨的心跳,也因此解開了更多關於藍鯨的祕密。

這項研究近來發表在《美國國家科學院院刊》(Proceedings of the National Academy of Sciences of the United States of America)上,一組海洋生物學家透過在藍鯨背部安裝吸盤的方式,成功在加州外海測到了心跳。研究人員長達九個小時持續觀察一條藍鯨發現,牠浮出水面時心跳最高可達每分鐘34下,沉入水面時心跳最低卻可降到每分鐘兩下。

科學家解釋,這是因為藍鯨潛入水中時,身體會重新分配氧氣,其心臟和大腦會需要比較多氧氣,肌肉、皮膚和其他器官所吸收的氧氣量較少,因此每呼吸一次便能更長時間停留在水中。

一條成年藍鯨身長可超過30公尺,讓科學家們一直很好奇,要有多大的力量才能為世界上體型最大的動物提供動力,2015年他們從藍鯨的解剖標本發現,其心臟竟然重達180公斤,看起來和一台高爾夫球車差不多大。

加利福尼亞大學助理教授戈德博根(Jeremy Goldbogen)表示,測得藍鯨的心跳能幫助他們了解,藍鯨為什麼沒有長得比現在更大,因為藍鯨身體所需要的動能要求,已經到達心臟所能承受的最大值,光是張開大口這個動作,就會讓其心臟運作達到極限。

若是地球上有體型比藍鯨更大的動物存在,其心臟的跳動速度會需要更快,但科學家認為從目前數據看起來這是不可能的事,也因此解釋了為何目前地球上沒有發現比藍鯨體積更大的動物存在之謎。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

日本第一例!技術員確診人猴共通「疱疹B病毒感染症」 出現發燒、頭痛症狀

摘錄自2019年11月29日ETtoday綜合報導

日本鹿兒島市28日傳出,「新日本科學」公司鹿兒島市動物實驗設施內,一名技術員被診斷出患有「疱疹B病毒感染症」,是日本境內第一例,而這個相當罕見的病症,在全球僅有約50例確診。

根據台灣衛生福利部疾病管制署網站解釋,疱疹B病毒為人畜共通傳染性病原體,會對感染者的中樞神經系統進行破壞。疱疹B病毒於1932年在美國出現第一例,研究人員B博士(Dr. B)當時被一隻看起來相當健康的恆河猴咬傷,15天後便因急性腦脊髓炎死亡,病毒的名稱也採B博士名字命名為「疱疹B病毒」。疱疹B病毒並無特別季節變化,一年四季的傳染能力同樣活躍。

日本《讀賣新聞》、《中央社》報導,受託開發醫藥品的「新日本科學公司」總社位於東京,並在鹿兒島市設有一處研究設施;一名實驗業務助理在對猴子進行安全性調查時,不慎感染了疱疹B病毒感染症(Herpesvirus B Infection),今年2月出現頭痛、發燒等症狀,但直到8月底轉願接受基因檢查,才在11月上旬被驗出確診。

這類流行於獼猴、日本獼猴等獼猴屬猿猴間的病毒不會經由空氣傳染,主要透過抓、咬受傷後進入人體,潛伏期約2至5週,發病會出現發燒、麻痺等症狀。

鹿兒島市和新日本科學公司並未對外公布實驗業務助理的姓名,也未透露身體狀況、性別和年齡。報導指出,該名技術員平常直接接觸猿猴的機會很少,就診時也沒有被猿猴抓、咬的痕跡,目前正進一步調查染病途徑。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

※別再煩惱如何寫文案,掌握八大原則!

socketserver模塊使用與源碼分析

socketserver模塊使用與源碼分析

前言

  在前面的學習中我們其實已經可以通過socket模塊來建立我們的服務端,並且還介紹了關於TCP協議的粘包問題。但是還有一個非常大的問題就是我們所編寫的Server端是不支持併發性服務的,在我們之前的代碼中只能加入一個通信循環來進行排隊式的單窗口一對一服務。那麼這一篇文章將主要介紹如何使用socketserver模塊來建立具有併發性的Server端。

 

基於TCP協議的socketserver服務端

  我們先看它的一段代碼,對照代碼來看功能。

#!/usr/bin/env python3
# _*_ coding:utf-8 _*_

# ==== 使用socketserver創建支持多併發性的服務器 TCP協議 ====

import socketserver

class MyServer(socketserver.BaseRequestHandler):
    """自定義類"""

    def handle(self):
        """handle處理請求"""
        print("雙向鏈接通道建立完成:", self.request)  # 對於TCP協議來說,self.request相當於雙向鏈接通道conn,即accept()的第一部分
        print("客戶端的信息是:", self.client_address)  # 對於TCP協議來說,相當於accept()的第二部分,即客戶端的ip+port

        while 1:  # 開始內層通信循環
            try:  # # bug修復:針對windows環境
                data = self.request.recv(1024)

                if not data:
                    break  # bug修復:針對類UNIX環境

                print("收到客戶機[{0}]的消息:[{1}]".format(self.client_address, data))
                self.request.sendall(data.upper())  # #sendall是重複調用send.

            except Exception as e:
                break

        self.request.close()  # 當出現異常情況下一定要關閉鏈接


if __name__ == '__main__':
    s1 = socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer) # 公網服務器綁定 0.0.0.0 私網測試為 127.0.0.1
    s1.serve_forever()  # 啟動服務

 

  1.導入socketserver模塊

  2.創建一個新的類,並繼承socketserver.BaseRequestHandler

  3.覆寫handle方法,對於TCP協議來說,self.request 相當於雙向鏈接通道connself.client_address相當於被服務方的ip和port信息,也就是addr,而整個handle方法相當於鏈接循環。

  4.寫入收發邏輯規則

  5.防止客戶端發送空的消息已致雙方卡死

  6.防止客戶端突然斷開已致服務端崩潰

  7.粘包優化(可選)

  8.實例化 socketserver.ThreadingTCPServer類,並傳入IP+port,以及剛寫好的類名

  9.使用socketserver.ThreadingTCPServer實例化對象中的server_forever( )方法啟動服務

 

  它其實是這樣的:

    我們不用管鏈接循環,因為在執行handle方法之前內部已經幫我們做好了。當我們使用serve_forever()方法的時候便開始監聽鏈接描述符對象,一旦有鏈接請求就創建一個子線程來處理該鏈接。

 

 

基於UDP協議的socketserver服務端

  基於UDP協議的socketserver服務端與基於TCP協議的socketserver服務端大相徑庭,但是還是有幾點不太一樣的地方。

 

  對TCP來說:

    self.request = 雙向鏈接通道(conn)

  對UDP來說:

    self.request = (client_data_byte,udp的套接字對象)

 

#!/usr/bin/env python3
# _*_ coding:utf-8 _*_

# ==== 使用socketserver創建支持多併發性的服務器 UDP協議 ====

import socketserver

class MyServer(socketserver.BaseRequestHandler):
    """自定義類"""

    def handle(self):
        """handle處理請求"""

        # 由於UDP是基於消息的協議,故根本不用通信循環

        data = self.request[0]  # 對於UDP協議來說,self.request其實是個元組。第一個元素是消息內容主題(Bytes類型),相當於recvfrom()的第一部分
        server = self.request[1]    # 第二個元素是服務端本身,即自己

        print("客戶端的信息是:", self.client_address)   # 對於UDP協議來說,相當於recvfrom()的第二部分,即客戶端的ip+port

        print("收到客戶機[{0}]的消息:[{1}]".format(self.client_address, data))
        server.sendto(data.upper(),self.client_address)


if __name__ == '__main__':
    s1 = socketserver.ThreadingUDPServer(("0.0.0.0", 6666), MyServer)  # 公網服務器綁定 0.0.0.0 私網測試為 127.0.0.1
    s1.serve_forever()  # 啟動服務

 

擴展:socketserver源碼分析

探索socketserver中的繼承關係

  好了,接下來我們開始剖析socketserver模塊中的源碼部分。在Pycharm下使用CTRL+鼠標左鍵,可以進入源碼進行查看。

  我們在查看源碼前一定要首先要明白兩點:

 

  socketserver類分為兩部分,其一是server類主要是負責處理鏈接方面,另一類是request類主要負責處理通信方面。

 

  好了,請在腦子里記住這個概念。我們來看一些socketserver模塊的實現用了哪些其他的基礎模塊。

 

  注意,接下來的源碼註釋部分我並沒有在源代碼中修改,也請讀者不要修改源代碼的任何內容。

import socket  # 這模塊挺熟悉吧
import selectors  # 這個是一個多線程模塊,主要支持I/O多路復用。
import os # 老朋友了
import sys  # 老朋友
import threading  # 多線程模塊
from io import BufferedIOBase  # 讀寫相關的模塊
from time import monotonic as time  # 老朋友time模塊

socketserver中用到的基礎模塊

 

  好了,讓我們接着往下走。可以看到一個變量__all__,是不是覺得很熟悉?就是我們使用 from xxx import xxx 能導入進的東西全是被__all__控制的,我們看一下它包含了哪些內容。

__all__ = ["BaseServer", "TCPServer", "UDPServer",
           "ThreadingUDPServer", "ThreadingTCPServer",
           "BaseRequestHandler", "StreamRequestHandler",
           "DatagramRequestHandler", "ThreadingMixIn"]
           
# 這個是我們原本的 __all__ 中的值。
if hasattr(os, "fork"):
    __all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"])
if hasattr(socket, "AF_UNIX"):
    __all__.extend(["UnixStreamServer","UnixDatagramServer",
                    "ThreadingUnixStreamServer",
                    "ThreadingUnixDatagramServer"])
                    
# 上面兩個if判斷是給__all__添加內容的,os.fork()這個方法是創建一個新的進程,並且只在類UNIX平台下才有效,Windows平台下是無效的,所以這裏對於Windows平台來說就from socketserver import xxx 肯定少了三個類,這三個類的作用我們接下來會聊到。而關於socket中的AF_UNIX來說我們其實已經學習過了,是基於文件的socket家族。這在Windows上也是不支持的,只有在類UNIX平台下才有效。所以Windows平台下的導入又少了4個類。
​
​
# poll/select have the advantage of not requiring any extra file descriptor,
# contrarily to epoll/kqueue (also, they require a single syscall).
if hasattr(selectors, 'PollSelector'):
    _ServerSelector = selectors.PollSelector
else:
    _ServerSelector = selectors.SelectSelector
    
# 這兩個if還是做I/O多路復用使用的,Windows平台下的結果是False,而類Unix平台下的該if結果為True,這關乎I/O多路復用的性能選擇。到底是select還是poll或者epoll。

socketserver模塊對於from xxx import * 導入的處理

 

  我們接着向下看源碼,會看到許許多多的類。先關掉它來假設自己是解釋器一行一行往下走會去執行那個部分。首先是一條if判斷

if hasattr(os, "fork"):
    class ForkingMixIn:
        pass # 這裏我自己省略了
 
 # 我們可以看見這條代碼是接下來執行的,它意思還是如果在類Unix環境下,則會去創建該類。如果在Windows平台下則不會創建該類

處理點一

 

  繼續走,其實這種if判斷再創建類的地方還有兩處。我這裏全部列出來:

if hasattr(os, "fork"):
    class ForkingUDPServer(ForkingMixIn, UDPServer): pass
    class ForkingTCPServer(ForkingMixIn, TCPServer): pass
 

if hasattr(socket, 'AF_UNIX'):
​
    class UnixStreamServer(TCPServer):
        address_family = socket.AF_UNIX
​
    class UnixDatagramServer(UDPServer):
        address_family = socket.AF_UNIX
​
    class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): passclass ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass

處理點二 and 三

 

  好了,說完了大體粗略的一個流程,我們該來研究這裏面的類都有什麼作用,這裏可以查看每個類的文檔信息。大致如下:

 

  前面已經說過,socketserver模塊中主要分為兩大類,我們就依照這個來進行劃分。

 

socketserver模塊源碼內部class功能一覽 
處理鏈接相關  
BaseServer 基礎鏈接類
TCPServer TCP協議類
UDPServer UDP協議類
UnixStreamServer 文件形式字節流類
UnixDatagramServer 文件形式數據報類
處理通信相關  
BaseRequestHandler 基礎請求處理類
StreamRequestHandler 字節流請求處理類
DatagramRequestHandler 數據報請求處理類
多線程相關  
ThreadingMixIn 線程方式
ThreadingUDPServer 多線程UDP協議服務類
ThreadingTCPServer 多線程TCP協議服務類
多進程相關  
ForkingMixIn 進程方式
ForkingUDPServer 多進程UDP協議服務類
ForkingTCPServer 多進程TCP協議服務類

 

  他們的繼承關係如下:

ForkingUDPServer(ForkingMixIn, UDPServer)
​
ForkingTCPServer(ForkingMixIn, TCPServer)
​
ThreadingUDPServer(ThreadingMixIn, UDPServer)
​
ThreadingTCPServer(ThreadingMixIn, TCPServer)
​
StreamRequestHandler(BaseRequestHandler)
​
DatagramRequestHandler(BaseRequestHandler)

 

  處理鏈接相關

處理通信相關

多線程相關

 

總繼承關係(處理通信相關的不在其中,並且不包含多進程)

 

  最後補上一個多進程的繼承關係,就不放在總繼承關係中了,容易圖形造成混亂。

 

多進程相關

 

實例化過程分析

  有了繼承關係我們可以來模擬實例化的過程,我們以TCP協議為準:

 

socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer)

 

  我們點進(選中上面代碼的ThradingTCPServer部分,CTRL+鼠標左鍵)源碼部分,查找其 __init__ 方法:

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

 

  看來沒有,那麼就找第一父類有沒有,我們點進去可以看到第一父類ThreadingMixIn也沒有__init__方法,看上面的繼承關係圖可以看出是普通多繼承,那麼就是廣度優先的查找順序。我們來看第二父類TCPServer中有沒有,看來第二父類中是有__init__方法的,我們詳細來看。

class TCPServer(BaseServer):

   """註釋全被我刪了,影響視線"""

    address_family = socket.AF_INET  #  基於網絡的套接字家族

    socket_type = socket.SOCK_STREAM  # TCP(字節流)協議

    request_queue_size = 5  # 消息隊列最大為5,可以理解為backlog,即半鏈接池的大小

    allow_reuse_address = False  # 端口重用默認關閉

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
                                    
       # 可以看見,上面先是調用了父類的__init__方法,然後又實例化出了一個socket對象!所以我們先不着急往下看,先看其父類中的__init__方法。
       
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

TCPServer中的__init__()

 

  來看一下,BaseServer類中的__init__方法。

class BaseServer:

    """註釋依舊全被我刪了"""

    timeout = None  # 這個變量可以理解為超時時間,先不着急說他。先看 __init__ 方法

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address  # 即我們傳入的 ip+port ("0.0.0.0", 6666)
        self.RequestHandlerClass = RequestHandlerClass  # 即我們傳入的自定義類 MyServer
        self.__is_shut_down = threading.Event()  # 這裏可以看到執行了該方法,這裏先不詳解,因為它是一個事件鎖,所以不用管
        self.__shutdown_request = False 

BaseServer中的__init__()

 

  BaseServer中執行了thrading模塊下的Event()方法。我這裏還是提一嘴這個方法是幹嘛用的,它會去控制線程的啟動順序,這裏實例化出的self.__is_shut_down其實就是一把鎖,沒什麼深究的,接下來的文章中我也會寫到。我們繼續往下看,現在是該回到TCPServer__init__方法中來了。

class TCPServer(BaseServer):

   """註釋全被我刪了,影響視線"""
   
    address_family = socket.AF_INET  #  基於網絡的套接字家族

    socket_type = socket.SOCK_STREAM  # TCP(字節流)協議

    request_queue_size = 5  # 消息隊列最大為5,可以理解為backlog,即半鏈接池的大小
    
    allow_reuse_address = False  # 端口重用默認關閉

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 看這裏!!!!
    """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                self.socket_type)                        
   
        if bind_and_activate:  # 在創建完socket對象后就會進行該判斷。默認參數bind_and_activate就是為True
            try:
                self.server_bind() # 現在進入該方法查看細節
                self.server_activate()
            except:
                self.server_close()
                raise

TCPServer中的__init__()


  好了,需要找這個self.bind()方法,還是從頭開始找。實例本身沒有,第一父類ThreadingMixIn也沒有,所以現在我們看的是TCPServerserver_bind()方法:

def server_bind(self):
    """Called by constructor to bind the socket.

    May be overridden.

    """
    if self.allow_reuse_address:  # 這裏的變量對應 TCPServer.__init__ 上面定義的類方法,端口重用這個。由於是False,所以我們直接往下執行。
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.socket.bind(self.server_address)  # 綁定 ip+port 即 ("0.0.0.0", 6666)
    self.server_address = self.socket.getsockname() # 獲取socket的名字 其實還是 ("0.0.0.0", 6666)

TCPServer中的server_bind()

 

  現在我們該看TCPServer下的server_activate()方法了。

def server_activate(self):
    """Called by constructor to activate the server.

    May be overridden.

    """
    self.socket.listen(self.request_queue_size)  # 其實就是監聽半鏈接池,backlog為5

TCPServer中的server_activate()

 

  這個時候沒有任何異常會拋出的,所以我們已經跑完了整個實例化的流程。並將其賦值給s1

  現在我們看一下s1__dict__字典,再接着進行源碼分析。

{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}

s1的__dict__

 

server_forever()啟動服務分析

  我們接着來看下一條代碼。

s1.serve_forever()

 

  還是老規矩,由於s1ThreadingTCPServer類的實例對象,所以我們去一層層的找serve_forever(),最後在BaseServer類中找到了。

def serve_forever(self, poll_interval=0.5):
    """註釋被我刪了"""
    self.__is_shut_down.clear()  # 上面說過了那個Event鎖,控制子線程的啟動順序。這裏的clear()代表清除,這個不是重點,往下看。
    try:
        # XXX: Consider using another file descriptor or connecting to the
        # socket to wake this up instead of polling. Polling reduces our
        # responsiveness to a shutdown request and wastes cpu at all other
        # times.
        with _ServerSelector() as selector:  
            selector.register(self, selectors.EVENT_READ)# 這裡是設置了一個監聽類型為讀取事件。也就是說當有請求來的時候當前socket對象就會發生反應。

            while not self.__shutdown_request: # 為False,會執行,注意!下面都是死循環了!!!
                ready = selector.select(poll_interval)  # 設置最大監聽時間為0.5s
                # bpo-35017: shutdown() called during select(), exit immediately.
                if self.__shutdown_request: # BaseServer類中的類方法,為False,所以不執行這個。
                    break
                if ready: # 代表有鏈接請求會執行下面的方法
                    self._handle_request_noblock()  # 這兒是比較重要的一個點。我們先來看。

                self.service_actions() 
    finally:
        self.__shutdown_request = False
        self.__is_shut_down.set() # 這裡是一個釋放鎖的行為

BaseServer中的serve_forever()

 

  如果有鏈接請求,則會執行self._handle_request_noblock()方法,它在哪裡呢?剛好這個方法就在BaseServerserve_forever()方法的正下方第4個方法的位置。

def _handle_request_noblock(self):
    """註釋被我刪了"""
    try:
        request, client_address = self.get_request()  # 這裏的這個方法在TCPServer中,它的return值是 self.socket.accept(),就是就是返回了元組然後被解壓賦值了。其實到這一步三次握手監聽已經開啟了。
    except OSError:
        return
    if self.verify_request(request, client_address): # 這個是驗證ip和port,返回的始終是True
        try:
            self.process_request(request, client_address) # request 雙向鏈接通道,client_address客戶端ip+port。現在我們來找這個方法。
        except Exception:
            self.handle_error(request, client_address)
            self.shutdown_request(request)
        except:
            self.shutdown_request(request)
            raise
    else:
        self.shutdown_request(request)

BaseServer中的_handle_request_noblock()

 

  現在開始查找self.process_request(request, client_address)該方法,還是先從實例對象本身找,找不到去第一父類找。他位於第一父類ThreadingMixIn中。

def process_request(self, request, client_address):
    """Start a new thread to process the request."""
    t = threading.Thread(target = self.process_request_thread,
                         args = (request, client_address))  # 創建子線程!!看這裏!
    t.daemon = self.daemon_threads # ThreadingMixIn的類屬性,為False
    if not t.daemon and self.block_on_close:  # 第一個值為False,第二個值為True。他們都是ThreadingMixIn的類屬性
        if self._threads is None:  # 會執行
            self._threads = []  # 創建了空列表
        self._threads.append(t) # 將當前的子線程添加至空列表中
    t.start()  # 開始當前子線程的運行,即運行self.process_request_thread方法

ThreadingMixIn中的process_request()

 

  我們可以看到,這裏的target參數中指定了一個方法self.process_request_thread,其實意思就是說當這個線程tstart的時候會去執行該方法。我們看一下它都做了什麼,這個方法還是在ThreadingMixIn類中。

def process_request_thread(self, request, client_address):
    """Same as in BaseServer but as a thread.

    In addition, exception handling is done here.

    """
    try:
        self.finish_request(request, client_address) # 可以看到又執行該方法了,這裏我再標註一下,別弄頭暈了。request 雙向鏈接通道,client_address客戶端ip+port。
    except Exception:
        self.handle_error(request, client_address)
    finally:
        self.shutdown_request(request)  # 它不會關閉這個線程,而是將其設置為wait()狀態。

ThreadingMixIn中的 process_request_thread()

 

  self.finish_request()方法,它在BaseServer類中

def finish_request(self, request, client_address):
    """Finish one request by instantiating RequestHandlerClass."""
    self.RequestHandlerClass(request, client_address, self)  # 這裡是幹嘛?其實就是在進行實例化!

BaseServer中的finish_request


  self.RequestHandlerClass(request, client_address, self),我們找到self__dict__字典,看看這個到底是什麼東西

{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}

s1的__dict__

 

  可以看到,它就是我們傳入的那個類,即自定義的MyServer類。我們把request,client_address,以及整個是實例self傳給了MyServer的__init__方法。但是我們的MyServer類沒有__init__,怎麼辦呢?去它父類BaseRequestHandler裏面找唄。

class BaseRequestHandler:

    """註釋被我刪了"""

    def __init__(self, request, client_address, server):
        self.request = request  # request 雙向鏈接通道
        self.client_address = client_address  # 客戶端ip+port
        self.server = server # 即 實例對象本身。上面的__dict__就是它的__dict__
        self.setup() # 鈎子函數,我們可以自己寫一個類然後繼承`BaseRequestHandler`並覆寫其setup方法即可。
        try:
            self.handle()  # 看,自動執行handle
        finally:
            self.finish()  # 鈎子函數

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

BaseRequestHandler中的__init__

 

 

  現在我們知道了,為什麼一定要覆寫handle方法了吧。

 

socketserver內部調用順序流程圖(基於TCP協議)

 

實例化過程圖解

 

server_forever()啟動服務圖解

 

擴展:驗證鏈接合法性

  在很多時候,我們的TCP服務端為了防止網絡泛洪可以設置一個三次握手驗證機制。那麼這個驗證機制的實現其實也是非常簡單的,我們的思路在於進入通信循環之前,客戶端和服務端先走一次鏈接認證,只有通過認證的客戶端才能夠繼續和服務端進行鏈接。

  下面就來看一下具體的實現步驟。

 

#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
import hmac,os

secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn):
    '''
    認證客戶端鏈接
    :param conn:
    :return:
    '''
    print('開始驗證新鏈接的合法性')
    msg=os.urandom(32)  # 新方法,生成32位隨機Bytes類型的值
    conn.sendall(msg)
    h=hmac.new(secret_key,msg)
    digest=h.digest()
    respone=conn.recv(len(digest))
    return hmac.compare_digest(respone,digest) # 對比結果為True或者為False

def data_handler(conn,bufsize=1024):
    if not conn_auth(conn):
        print('該鏈接不合法,關閉')
        conn.close()
        return
    print('鏈接合法,開始通信')
    while True:
        data=conn.recv(bufsize)
        if not data:break
        conn.sendall(data.upper())

def server_handler(ip_port,bufsize,backlog=5):
    '''
    只處理鏈接
    :param ip_port:
    :return:
    '''
    tcp_socket_server=socket(AF_INET,SOCK_STREAM)
    tcp_socket_server.bind(ip_port)
    tcp_socket_server.listen(backlog)
    while True:
        conn,addr=tcp_socket_server.accept()
        print('新連接[%s:%s]' %(addr[0],addr[1]))
        data_handler(conn,bufsize)

if __name__ == '__main__':
    ip_port=('127.0.0.1',9999)
    bufsize=1024
    server_handler(ip_port,bufsize)

Server端

#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
import hmac,os

secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn):
    '''
    驗證客戶端到服務器的鏈接
    :param conn:
    :return:
    '''
    msg=conn.recv(32) # 拿到隨機位數
    h=hmac.new(secret_key,msg) # 摻鹽
    digest=h.digest()
    conn.sendall(digest)

def client_handler(ip_port,bufsize=1024):
    tcp_socket_client=socket(AF_INET,SOCK_STREAM)
    tcp_socket_client.connect(ip_port)

    conn_auth(tcp_socket_client)

    while True:
        data=input('>>: ').strip()
        if not data:continue
        if data == 'quit':break

        tcp_socket_client.sendall(data.encode('utf-8'))
        respone=tcp_socket_client.recv(bufsize)
        print(respone.decode('utf-8'))
    tcp_socket_client.close()

if __name__ == '__main__':
    ip_port=('127.0.0.1',9999)
    bufsize=1024
    client_handler(ip_port,bufsize)

Client端

 

  到這裏已經很簡單了,服務器將隨機數給客戶機發過去,客戶機收到后也用自家的鹽與隨機數加料,再使用digest()將它轉化為字節,直接發送了回來然後客戶端通過hmac.compare_digest()方法驗證兩個的值是否相等,如果不等就說明鹽不對。客戶機不合法服務端將會關閉與該客戶機的雙向鏈接通道。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

沒想到,這麼簡單的線程池用法,深藏這麼多坑!

又又又踩坑了

生產有個對賬系統,每天需要從渠道端下載對賬文件,然後開始日終對賬。這個系統已經運行了很久,前两天突然收到短信預警,沒有獲取渠道端對賬文件。

ps:對賬系統詳細實現方式:對賬系統設計與實現

本以為又是渠道端搞事情,上去一排查才發現,所有下載任務都被阻塞了。再進一步排查源碼,才發現自己一直用錯了線程池某個方法。

由於線程創建比較昂貴,正式項目中我們都會使用線程池執行異步任務。線程池,使用池化技術保存線程對象,使用的時候直接取出來,用完歸還以便使用。

雖然線程池的使用非常方法非常簡單,但是越簡單,越容易踩坑。細數一下,這些年來因為線程池導致生產事故也有好幾起。

所以今天,小黑哥就針對線程池的話題,給大家演示一下怎麼使用線程池才會踩坑。

希望大家看完,可以完美避開這些坑~

先贊后看,養成習慣。微信搜索「程序通事」,關注就完事了!

慎用 Executors 組件

Java 從 JDK1.5 開始提供線程池的實現類,我們只需要在構造函數內傳入相關參數,就可以創建一個線程池。

不過線程池的構造函數可以說非常複雜,就算最簡單的那個構造函數,也需要傳入 5 個參數。這對於新手來說,非常不方便哇。

也許 JDK 開發者也考慮到這個問題,所以非常貼心給我們提供一個工具類 Executors,用來快捷創建創建線程池。

雖然這個工具類使用真的非常方便,可以少寫很多代碼,但是小黑哥還是建議生產系統還是老老實實手動創建線程池,慎用Executors,尤其是工具類中兩個方法 Executors#newFixedThreadPoolExecutors#newCachedThreadPool

如果你圖了方便使用上述方法創建了線程池,那就是一顆定時炸彈,說不準那一天生產系統就會。

我們來看兩個,看下這個這兩個方法會有什麼問題。

假設我們有個應用有個批量接口,每次請求將會下載 100w 個文件,這裏我們使用 Executors#newFixedThreadPool批量下載。

下面方法中,我們隨機休眠,模擬真實下載耗時。

為了快速復現問題,調整 JVM 參數為 -Xmx128m -Xms128m

private ExecutorService threadPool = Executors.newFixedThreadPool(10);

/**
 * 批量下載對賬文件
 *
 * @return
 */
@RequestMapping("/batchDownload")
public String batchDownload() {
    
    // 模擬下載 100w 個文件
    for (int i = 0; i < 1000000; i++) {
        threadPool.execute(() -> {
            // 隨機休眠,模擬下載耗時
            Random random = new Random();
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    return "process";
}

程序運行之後,多請求幾次這個批量下載方法,程序很快就會 OOM

查看 Executors#newFixedThreadPool源碼,我們可以看到這個方法創建了一個默認的 LinkedBlockingQueue 當做任務隊列。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

這個問題槽點就在於 LinkedBlockingQueue,這個隊列的默認構造方法如下:

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

創建 LinkedBlockingQueue 隊列時,如果我們不指定隊列數量,默認數量上限為 Integer.MAX_VALUE。這麼大的數量,我們簡直可以當做無界隊列了。

上面我們使用 newFixedThreadPool,我們僅使用了固定數量的線程下載。如果線程都在執行任務,線程池將會任務加入任務隊列中。

如果線程池執行任務過慢,任務將會一直堆積在隊列中。由於我們隊列可以認為是無界的,可以無限制添加任務,這就導致內存佔用越來越高,直到 OOM 爆倉。

ps:線程池基本工作原理

下面我們將上面的例子稍微修改一下,使用 newCachedThreadPool 創建線程池。

程序運行之後,多請求幾次這個批量下載方法,程序很快就會 OOM ,不過這次報錯信息與之前信息與之前不同。

從報錯信息來看,這次 OOM 的主要原因是因為無法再創建新的線程。

這次看下一下 newCachedThreadPool 方法的源碼,可以看到這個方法將會創建最大線程數為 Integer.MAX_VALUE 的的線程池。

由於這個線程池使用 SynchronousQueue 隊列,這個隊列比較特殊,沒辦法存儲任務。所以默認情況下,線程池只要接到一個任務,就會創建一個線程。

一旦線程池收到大量任務,就會創建大量線程。Java 中的線程是會佔用一定的內存空間 ,所以創建大量的線程是必然會導致 OOM

先贊后看,養成習慣。微信搜索「程序通事」,關注就完事了!

復用線程池

由於線程池的構造方法比較複雜,而 Executors 創建的線程池比較坑,所以我們有個項目中自己封裝了一個線程池工具類。

工具類代碼如下:

public static ThreadPoolExecutor getThreadPool() {
    // 為了快速復現問題,故將線程池 核心線程數與最大線程數設置為 100
    return new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
}

項目代碼中這樣使用這個工具類:

@RequestMapping("/batchDownload")
public String batchDownload() {
    ExecutorService threadPool = ThreadPoolUtils.getThreadPool();

    // 模擬下載 100w 個文件
    for (int i = 0; i < 100; i++) {
        threadPool.execute(() -> {
            // 隨機休眠,模擬下載耗時
            Random random = new Random();
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    return "process";
}

使用 WRK 工具對這個接口同時發起多個請求,很快應用就會拋出 OOM

每次請求都會創建一個新的線程池執行任務,如果短時間內有大量的請求,就會創建很多的線程池,間接導致創建很多線程。從而導致內存佔盡,發生 OOM 問題。

這個問題修復辦法很簡單,要麼工具類生成一個單例線程池,要麼項目代碼中復用創建出來的線程池。

Spring 異步任務

上面代碼中我們都是自己創建一個線程池執行異步任務,這樣還是比較麻煩。在 Spring 中, 我們可以在方法上使用 Spring 註解 @Async,然後執行異步任務。

代碼如下:

@Async
public void async() throws InterruptedException {
    log.info("async process");
    Random random = new Random();
    TimeUnit.SECONDS.sleep(random.nextInt(100));
}

不過使用 Spring 異步任務,我們需要自定義線程池,不然大量請求下,還是有可能發生 OOM 問題。

這是原因主要是 Spring 異步任務默認使用 Spring 內部線程池 SimpleAsyncTaskExecutor

這個線程池比較坑爹,不會復用線程。也就是說來一個請求,將會新建一個線程。

所以如果需要使用異步任務,一定要使用自定義線程池替換默認線程池。

如果使用 XML 配置,我們可以增加如下配置:

<task:executor id="myexecutor" pool-size="5"  />
<task:annotation-driven executor="myexecutor"/>

如果使用註解配置,我們需要設置一個 Bean:

@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setThreadNamePrefix("test-%d");
    // 其他設置
    return new ThreadPoolTaskExecutor();
}

然後使用註解時指定線程池名稱:

@Async("threadPoolTaskExecutor")
public void xx() {
    // 業務邏輯
}

如果是 SpringBoot 項目,從本人測試情況來看,默認將會創建核心線程數為 8,最大線程數為 Integer.MAX_VALUE,隊列數也為 Integer.MAX_VALUE線程池。

ps:以下代碼基於 Spring-Boot 2.1.6-RELEASE,暫不確定 Spring-Boot 1.x 版本是否也是這種策略,熟悉的同學的,也可以留言指出一下。

雖然上面的線程池不用擔心創建過多線程的問題,不是還是有可能隊列任務過多,導致 OOM 的問題。所以還是建議使用自定義線程池嗎,或者在配置文件修改默認配置,例如:

spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=20
spring.task.execution.pool.queue-capacity=200

Spring 相關踩坑案例: Spring 定時任務突然不執行

線程池方法使用不當

最後再來說下文章開頭的我踩到的這個坑,這個問題主要是因為理解錯這個方法。

錯誤代碼如下:

// 創建線程池
ExecutorService threadPool = ...
List<Callable<String>> tasks = new ArrayList<>();
// 批量創建任務
for (int i = 0; i < 100; i++) {
    tasks.add(() -> {
        Random random = new Random();
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "success";
    });
}
// 執行所有任務
List<Future<String>> futures = threadPool.invokeAll(tasks);
// 獲取結果
for (Future<String> future : futures) {
    try {
        future.get();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

上面代碼中,使用 invokeAll執行所有任務。由於這個方法返回值為 List<Future<T>>,我誤以為這個方法如 submit一樣,異步執行,不會阻塞主線程。

實際上從源碼上,這個方法實際上逐個調用 Future#get獲取任務結果,而這個方法會同步阻塞主線程。

一旦某個任務被永久阻塞,比如 Socket 網絡連接位置超時時間,導致任務一直阻塞在網絡連接,間接導致這個方法一直被阻塞,從而影響後續方法執行。

如果需要使用 invokeAll 方法,最好使用其另外一個重載方法,設置超時時間。

總結

今天文章通過幾個例子,給大家展示了一下線程池使用過程一些坑。為了快速復現問題,上面的示例代碼還是比較極端,實際中可能並不會這麼用。

不過即使這樣,我們千萬不要抱着僥倖的心理,認為這些任務很快就會執行結束。我們在生產上碰到好幾次事故,正常的情況執行都很快。但是偶爾外部程序抽瘋,返回時間變長,就可能導致系統中存在大量任務,導致 OOM

最後總結一下幾個線程池幾個最佳實踐:

第一,生產系統慎用 Executors 類提供的便捷方法,我們需要自己根據自己的業務場景,配置合理的線程數,任務隊列,拒絕策略,線程回收策略等等,並且一定記得自定義線程池的命名方式,以便於後期排查問題。

第二,線程池不要重複創建,每次都創建一個線程池可能比不用線程池還要糟糕。如果使用其他同學創建的線程池工具類,最好還是看一下實現方式,防止自己誤用。

第三,一定不要按照自己的片面理解去使用 API 方法,如果把握不準,一定要去看下方法上註釋以及相關源碼。

歡迎關注我的公眾號:程序通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

※別再煩惱如何寫文案,掌握八大原則!

人象共用44%國土 斯里蘭卡7隻大象遭毒殺 衝突待解

環境資訊中心外電;范震華 翻譯;賴慧玲 審校;稿源:Mongabay

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

冰河期大氣中二氧化碳濃度為何比較低? 關鍵證據找到了

編譯:嚴融怡(胡適國小創思組科任教師)

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

※別再煩惱如何寫文案,掌握八大原則!