格式化与验证

众所周知,前端很大程度上就是做数据的展示和收集工作,这时候用户看到的内容和服务端存储的数据中间就会有多层转换过程。

最重要的当然是 money 啦,相差一分钱都能让用户急得哇哇大叫。比如业务里有三种价格:

  • fixed_price 原价
  • promotion_price 特价,一般为 fixed_price * promotion.discount
  • vip_price 会员价,一般为 fixed_price * vip.discount,可能为空

经常出现的情形时既要显示折扣(discount)又要显示价格(promotion_price or vip_price),可不可以直接返回 fixed_pricediscount 呢?不可以!细心的读者已经觉察到问题了 —— discount 为小数,fixed_price 与其相乘很可能不是一个整数(这里要插一句,一般情况下价格都是记录为 int,以 cent 作为单位)。比如臭名昭著的 JavaScript 浮点数相加问题。

1
2
3
4
> 0.1 + 0.2
< 0.30000000000000004
> 0.1 + 0.2 === 0.3
< false

原因就是 float 在计算机里存了 32 位,1 位符号位 + 8 位指数位 + 23 位尾数,反正就是不精确就完事了。那我们即使不精确也要保证各处拿到的是一个值,这个值只能以后端为准。

1
2
3
4
5
6
7
8
9
@property
def promotion_price(self):
promotion = self.get_active_promotion()
if not promotion:
return self.fixed_price

if self.is_chapter:
return int(self.fixed_price * self.column.promotion_discount)
return promotion.price

这个值是惰性的,也就是说只有用到时才会计算值,返回的一定是一个整数。有一些应用场景:

  • 直接展示价格:(price / 100).toFixed(2) => 0.99
  • 很多章节合并购买,items.reduce((total, item) => total + item.price, 0) 注意这个值可能会不等于整本的定价,这时就要引导或劝说用户直接买整本更划算呀
  • 满减活动,类似合并购买情形,只不过是有一些阈值情形
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    getMaxAvailableRebateAmountInGroup = (group) => {
    const total = this.getTotalPriceInGroup(group)
    let maxAmount = 0

    if (!group.event) {
    return maxAmount
    }

    group.event.availablecouponGroups.some((coupon) => {
    if (total > coupon[0]) {
    maxAmount = coupon[1]
    }
    return total > coupon[0]
    })
    return maxAmount
    }
    /**
    * Returns rebate threshold info.
    * @param {Object[]} couponGroups - Available coupons.
    * @param {number} couponGroups[][0] - The threshold of coupon.
    * @param {number} couponGroups[][1] - The amount of coupon, will be reduced from price when total meets threshold.
    * @param {numer} total - Total of prices.
    * @returns {[string, bool]} Description String, isMaxThresholdMet
    */
    // returned value: [descString, isMaxThresholdMet]
    getRebateThreshold = (couponGroups, total) => {
    const mf = moneyformatShort


    if (couponGroups.length === 0 ) {
    return ['本活动满减券已全部用完', true]
    }

    for (let i = 0, prev_threshold = 0, prev_amount = 0; i < couponGroups.length; i++) {
    const [threshold, amount] = couponGroups[i]

    if (total >= threshold) {
    if (i === 0) {
    return ['已购满 ' + mf(threshold) + ',已减 ' + mf(amount), true]
    } else {
    return ['已减 ' + mf(amount) +
    ',再购 ' + mf(prev_threshold - total) + ' 可减 ' + mf(prev_amount), false]
    }
    } else {
    if (i === couponGroups.length - 1) {
    return ['购满 ' + mf(threshold) + ' 可减 ' + mf(amount) +
    ',还差 ' + mf(threshold - total), false]
    }
    }
    [prev_threshold, prev_amount] = [threshold, amount]
    }
    }

    getTotalPriceInGroup = (group) => {
    return group.itemList.reduce((total, item) => {
    if (item.onSale && item.selected) {
    total += item.salePrice
    }
    return total
    }, 0)
    }

钱的计算大概就是这样,涉及到第三方支付就更头疼了。

时间

金钱是宝贵的,时间是更宝贵的。而我们需要根据不同场景甚至用户所在时区去显示不同的时间格式,有一种方案是 date-fns,项目里也有根据 timestamp 转换成用户可读格式的各种函数,但有时候只是想简简单单显示一个时间,同时考虑各种情况下的复用性。上 GraphQL

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from libs.utils.date import mtimeformat
from ..types import TimeFormat

def format_time(time_, format=TimeFormat.FULL_TIME.value):
return {
TimeFormat.FURTHER: mtimeformat(time_),
TimeFormat.FULL_TIME: time_.strftime('%Y-%m-%d %H:%M:%S'),
TimeFormat.FULL_DAY: time_.strftime('%Y-%m-%d'),
TimeFormat.CHINESE_FULL_DAY: time_.strftime('%Y 年 %-m 月 %-d 日'),
TimeFormat.ISO: time_.isoformat(),
TimeFormat.TIMESTAMP: int(time.mktime(time_.timetuple()) * 1000),
}[format]

其中 TimeFormat 是一个 GraphQL 的 enum 类型,mtimeformat 是一个可以根据相差时间来区别展示的函数,比如可以展示成「刚刚」「5 分钟前」这样的口语化格式。

实际效果

表单

表单的验证可以有很多实现,最简单的莫过于 maxlengthrequire 这种,直接交给浏览器,项目里也用到了一些 jQuery 的表单绑定,在提交之前一次性遍历表单项根据 data-* 来进行 check。

现实是 react 相关的表单验证有以下两个痛点:

异步验证

所幸的是 formik 支持了 Promise 的验证结果

1
2
3
4
5
6
7
8
9
10
11
12
13
<Field name={name} type="number"
validate={function(value) {
return fetchAgent(value).then((res) => {
if (res.agentExisted) {
if (res.existed) {
return `该作者经纪合同已经存在,负责人:${res.editorName}`
}
} else {
return `'作者 ID' 为 ${value} 的用户不存在或不是作者身份`
}
})
}}
/>

依赖另一输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import * as Yup from 'yup'

const OTHER_NATIONALITY = '其他'

export const validationSchema=Yup.object().shape({
nationality: Yup.string().nullable(true).required('请选择国家或地区'),
otherNationality: Yup.string().test(
'need-other-nationality',
'请填写其他国家或地区',
function(value) {
return this.parent.nationality !== OTHER_NATIONALITY || !!value
}
),
})

实际情形为中间的输入框依赖于前面的下拉筛选框是否选了「其他」

路由

异步鉴权路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const PrivateRoute = ({ component: Component, ...rest }) => {
const user = useSelector(state => selectors.user(state))
const isLoaded = useSelector(state => selectors.isLoaded(state))

return (
<Route
{...rest}
render={(props) =>
!isLoaded ? (
<></>
) : user ? (
<Component {...props} />
) : (
<Redirect to='/404' />
)
}
/>
)
}

面试小结

算法题

全排列

实现二维数组的全排列

1
2
3
4
5
6
7
8
9
10
// 如输入[[1,2],[3,4],[5,6]]
// 输出:
// [ 1, 3, 5 ]
// [ 1, 3, 6 ]
// [ 1, 4, 5 ]
// [ 1, 4, 6 ]
// [ 2, 3, 5 ]
// [ 2, 3, 6 ]
// [ 2, 4, 5 ]
// [ 2, 4, 6 ]

思路:最后需要得到一个二维数组,那基本都是 reduce 操作的话也应该是一个二维数组开头,每一次都把前一次结果得到的数组们尾部分别加上二维数组里的一项,也就是 m * n * [...prevResultList[i], list[j]],其中 mn 分别是 prevResultListlist 的项数,这样也就成功实现了 m × n 的项数膨胀,至于降维操作我们有 flatMap 这个神器。面试时用了很丑陋的 reduce + map 嵌套,甚至还忘了把数组摊平……

1
2
3
4
5
function arrange(doubleList) {
return doubleList.reduce((prevResultList, list) => {
return prevResultList.flatMap((result) => list.map((v) => result.concat(v)))
}, [[]])
}

随机自然数组

1~1000 范围内生成长度为 1000 的随机不重复自然数数组,并验证

思路:这道题也是老生常谈了,直接暴力一点 sort(() => Math.random() > 0.5) 解之,80% 的面试官都会眼前一愣,心想算了算了投机取巧的家伙,但有一个面试官对此质疑了很久,想了想也是,sort 的内部实现并不稳定,而且每次排出来结果不一致不知道性能有没有问题,还是要把随机数稳定下来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function generateRandomInter(n) {
const list = Array(n).fill(0).map((v, i) => [i + 1, Math.random()])
list.sort((a, b) => a[1] - b[1])
return list.map((v) => v[0])
}

function validateResult(result, n) {
const uniqList = Array.from(new Set(result))
return uniqList.length === n
}

const result = generateRandomInter(1000)
console.log(result)
console.log(validateResult(result, 1000))

去重

用 es5 实现数组去重?
[1, 2, 3, true, '2']

思路:没啥思路,就老老实实遍历再挨个取 indexOf?最后面试官给出了 typeof 的解法,真是……

1
2
3
4
5
6
7
8
9
10
11
12
function unique(list) {
const cacheMap = {}
return list.reduce((acc, v) => {
const key = typeof v + v
if (cacheMap[key]) {
return acc
} else {
cacheMap[key] = true
return acc.concat(v)
}
}, [])
}

合并有序数组(链表?)

思路:简单来说维护两个 head,每次取一个值插入新数组后就步进一次,然后两者其中一个 head 到达底部后一次性将另一个数组剩余元素灌到新数组里。面试官问还有没有高效一点的方案,就用了 Symbol.iterator 理论上会高效一点?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
function mergeList(list1, list2) {
let result = []
const iterator1 = list1[Symbol.iterator]()
const iterator2 = list2[Symbol.iterator]()
let head1 = iterator1.next()
let head2 = iterator2.next()
while (!head1.done && !head2.done) {
const value1 = head1.value
const value2 = head2.value
if (value1 <= value2) {
result.push(value1)
head1 = iterator1.next()
} else {
result.push(value2)
head2 = iterator2.next()
}
}
if (!head1.done) {
result = [...result, head1.value, ...iterator1]
}
if (!head2.done) {
result = [...result, head2.value, ...iterator2]
}
return result
}

console.log(mergeList([1, 2], [1, 2, 3]))

判断二叉树镜像

给定一个二叉树,判断是否为镜像
1
2 2
3 4 4 3
1
2 2
3 3

思路:跟判断两颗二叉树是否相同区别不大,面试时采用了简单粗暴的分层比较法,空间复杂度达到了 2 ** n …… 😂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function checkMirror(roots) {
const len = roots.length
let isAllNull = true
const doesRootsEqual = roots.every((root, i) => {
if (root) {
isAllNull = false
}
const oppositeRoot = roots[len - 1 - i]
return (root && root.val) === (oppositeRoot && oppositeRoot.val)
})
if (!doesRootsEqual) { return false }
if (isAllNull) { return true }
const nextRoots = roots.flatMap((r) => r ? [r.left, r.right] : [null, null])
return checkMirror(nextRoots)
}

const root = {
val: 1,
left: { val: 2, left: { val: 3}, right: { val: 4, left: { val: 2}}},
right: { val: 2, left: { val: 4, right: { val: 2}}, right: { val: 3}}
}

console.log(checkMirror([root]))

工程题

控制 Promise 并发

[promise]

1
2
3
dispatch(arr, n) {

}

思路:肯定要用 Promise.race,然后如果要阻塞的话还需要是 async/await 写法。这道题其实还是算一般的,后面写个 Scheduler 就痛苦了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const promises = Array(10).fill(0).map((v, i) => (
new Promise((resolve) => setTimeout(() => resolve(i), i * 1000 + 1000)).then(console.log)
))

async function dispatch(list, n) {
const total = list.length
let pending = []
let completed = 0
while(completed < total && list.length) {
const moreCount = n - pending.length
Array(moreCount).fill(0).forEach(() => {
const p = list.shift()
.catch(() => {})
.then(() => pending = pending.filter(v => v !== p))
pending.push(p)
})
console.log('len: ', pending.length)
await Promise.race(pending)
completed += 1
}
}

dispatch(promises, 3)

实现异步调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Scheduler {
async add(promiseFunc: () => Promise<void>): Promise<void> {
}
}

const scheduler = new Scheduler()
const timeout = (time) => {
return new Promise(r => setTimeout(r, time))
}
const addTask = (time, order) => {
scheduler.add(() => timeout(time))
.then(() => console.log(order))
}

addTask(1000, 1)
addTask(500, 2)
addTask(300, 3)
addTask(400, 4)
// log: 2 3 1 4

思路:一开始看到这题还有些欣喜,似曾相识的感觉,但实际一做发现不是这样,需要直接在 add 方法后返回一个 Promise,面试时没有写出有效解,事后想想还是能写出个解的,就是实现比较丑陋 ……

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
type promiseFuncType = () => Promise<void>

class Scheduler {
pending: Array<[promiseFuncType, () => void]>
running: Array<Promise<void>>
CONCURRENCY = 2
constructor() {
this.pending = []
this.running = []
}
async add(promiseFunc: promiseFuncType): Promise<void> {
const pending = new Promise<void>((r) => {
this.pending.push([promiseFunc, r])
})
this.execute()
return pending
}

execute = () => {
let restNum = this.CONCURRENCY - this.running.length
if (this.running.length === 0 && this.pending.length === 0) {
return
}
while (restNum > 0 && this.pending.length) {
const [promiseFunc, callback] = this.pending.shift()
const prom = promiseFunc().then(() => {
this.running = this.running.filter(p => p !== prom)
callback()
})
this.running.push(prom)
restNum -= 1
}
Promise.race(this.running).then(this.execute)
}
}

const scheduler = new Scheduler()
const timeout: (number) => Promise<void> = (time) => {
return new Promise(r => setTimeout(r, time))
}
const addTask = (time, order) => {
scheduler.add(() => timeout(time))
.then(() => console.log(order))
}

addTask(1000, 1)
addTask(500, 2)
addTask(300, 3)
addTask(400, 4)

实现 Observable

思路:观察者和迭代器,需要理解 Observable 和 Observer,才疏学浅,没有 get 到精髓,此题仍为 WIP 状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const arr = [1, 2, 3]

function Observable() {
this.uid = 0
this.subscribers = {}
this.onSubscribe = null
}

Observable.from = function(array) {
const observable = new Observable()
observable.onSubscribe = (observer) => {
array.forEach((v) => observer.next(v))
observer.complete()
}
return observable
}

Observable.prototype.subscribe = function(observer) {
const id = this.id++
this.subscribers[id] = observer
if (this.onSubscribe) {
this.onSubscribe(observer)
}
return {
unsubscribe: () => {
this.subscribers[id] = null
console.log('unsubscribe')
}
}
}

const arr$ = Observable.from(arr)

const subscriber = arr$.subscribe({
next: console.log,
complete: () => console.log('complete'),
error: console.error,
})

subscriber.unsubscribe()

口答知识点

  • HTML 一点都没问,CSS 就问了简单九宫格 header/nav/main 布局以及垂直居中等,说明组件化已经深入人心,高级前端基本没有写样式的部分了
  • 简历里写到了 Gulp/Webpack 相关,所以被问了很多次 Webpack Loader/Plugin + Gulp plugin 开发 😂 像我这么水当然是只能扯扯从一个 File 到另一个 File 输出这样子的
  • TypeScriptGraphQL 也被问了很多次,TypeScript 如何实现类型推导(Pick, Omit),interface 和 type 区别,GraphQL 解决什么问题
  • React 相关:hooks 生命周期,fiber 是啥,setState 到渲染发生了什么 ……
  • 深拷贝也是问了无数次,直接 lodash.cloneDeep 它不香么 😂 当然有些比如循环引用,class instance 等也是要注意的
  • macroTask/microTask:一个 macroTask 多个 microTask,microTask-in-microTask 继续排队,Promise((r) => …) … 是 macroTask
  • HTTPS 如何加密通讯过程、Server/Client Hello + 校验证书合法性 + 三次生成随机字符串 + RSA 非对称加密 + 约定密钥对称加密 / 浏览器缓存有哪些字段 / WebSocket 做了啥 / SSO:在第三方 Cookie 无法读取情形下怎么办?(OS:我也很无奈啊)/ script async defer 具体怎么 load
  • 最复杂、最有挑战性的项目经历:复杂筛选器 + GraphQL 应用,小程序解析 nodes 图文混排,原生端通讯 + 跨端开发联调
  • 最感兴趣的方向:富文本渲染与编辑、GIS 系统以及 WebAssembly 相关

复盘一次 gulp & webpack 构建优化

接续前一篇《并行化密集型计算》,发现实际上线时 parallel terser 并没有提升速度。反而比单线程还要慢,这就有点不科学了。首先想本地跑起来没有出错,大概不是程序逻辑问题,再想想看,估计是配置问题,与 SA 进行了一波有力的交流。

有力的交流过程

原来 os.cpus() 拿到的是真实的计算机 CPU 核数,而 k8s 会限制容器的 CPU 使用量,所以我们应该手动将 NODE_ENV=production 时的并发数限制在 5 以内。

进一步思考这种 utility 和 business 代码混写在一起的方式极其耦合,想要更高效地发挥代码的作用必须分离它们,于是就借鉴了 terser-webpack-plugin 中对于 jest-worker 的应用改了一下,并将其应用于 webpack 的打包,取得了不错的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
gulp.task('webpack', function() {
const { result$, next, end } = parallelRunner(
__filename,
function(worker, configName) {
return Promise.resolve(
worker.compile(
null,
[configName]
)
)
},
isProduction ? PRODUCTION_PARALLEL_NUMBER : true
)
Object.keys(configNameToFileMap).forEach((name) => next(name))
end()
return result$
})

webpack 打包跑进了 40 秒大关

但实际到了线上 webpack 依然花了将近 4 分钟,与此同时 gulp parallel terser 已经稳定在 1 分钟之内了。这时一段异常的日志引起了我的注意。

terser-webpack-plugin 并发出错

原来 webpack 里自带的 terser-webpack-plugin 会在 production mode 下自动开启,又是 48 线程 …… 4 * 48 = 192 线程,OMG,不跑崩让 SA 来请喝茶才怪了。但转念一想,好像后面我们还会对各种文件都 terser 一遍,在 webpack 里压缩似乎是没有必要的?于是乎直接禁用 optimization.minimize 就可以了。

期间又优化了一下 parallel-runner,处理了一下 stream/rxjs observable/worker 之间的 back pressure 问题。简单说就是当 rxjs mergeMap 把一个值传到 worker 里去处理时才算作 consumed,会继续向上游 pull 下一个数据,这样就把上下游的数据链接给建立起来了,而不是之前那样全部 pull 下来堆在 mergeMap 外面,容易形成 memory leak。

Call the callback function only when the current file (stream/buffer) is completely consumed.

parallel-runner 把 data(vinyl File) 放到 worker 去处理标志该数据为 consumed(实际结果会在 result$ 里被 push 给下游的 writable stream),然后 stream 会根据是否 end 或者到达 highWaterMark 自动去 read(pull) 上游 readable stream 的下一个数据,也就是执行我们传入的 transform 方法。数据可能堆积在 mergeMap 外最多一个,因为那个数据不进入 mergeMap 就不会继续触发 consumed,之前是一口气全 read。

最后,规范化了一下 webpack 出错时向 main process 通报错误的方法,该出错就出错,不要静默失败到上线时出大问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
return new Promise((resolve, reject) => {
webpack(configs, function(error, stats) {
let errors
if (stats) {
log(stats.toString(Object.assign({
// https://webpack.js.org/configuration/stats/
chunks: false,
colors: colors.supportsColor,
}, configs[0].stats)))
}

if (error || stats.hasErrors()) {
// errors should be serializable since it will go through processes
errors = error ? error.toString() : stats.toJson().errors
}

if (callback) {
callback(errors)
}
if (errors) {
reject(errors)
} else {
resolve()
}
})
})

至此,这次优化 gulp & webpack 打包构建流程的优化工作就算告一段落了,实际上大概处理了以下几个问题:

  • 将重复性的任务放到多线程并行执行
  • 提取公共代码转成 utilities
  • 区分本地开发与生产环境,尊重基础设施对于计算资源的分配规则
  • 剔除多余的步骤避免重复计算(侧面说明不要替用户预先做决定的重要性)
  • 向官方推荐实现靠拢,以求符合标准融入开源库的生态环境

历经近一周时间,完成了优化工作

整体打包速度从 700 ~ 800s 提升到了 250 ~ 300s!

下面直接贴出了 parallel-runner 的代码,小弟手艺不佳,写得不好,各位如有需要可以在此基础上稍加改动以适应业务需要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Inspired by terser-webpack-plugin.

const os = require('os')
const rx = require('rxjs')
const rxOperators = require('rxjs/operators')
const JestWorker = require('jest-worker').default
const through2 = require('through2')
const log = require('fancy-log')

const { Subject, from } = rx
const { mergeMap, share } = rxOperators

function getAvailableNumberOfCores(parallel) {
const cpus = os.cpus() || { length: 1 }

return parallel === true
? cpus.length - 1
: Math.min(Number(parallel) || 0, cpus.length - 1)
}

function parallelRunner(
module,
taskCallback,
parallel = true
) {
const availableNumberOfCores = getAvailableNumberOfCores(parallel)
let concurrency = 1 // which means mergeMap will behave as concatMap
let worker
let total = 0
let completed = 0
let allScheduled = false

log('Parallel Running: ', module)
log('Available Number of Cores: ', availableNumberOfCores)

// setup worker
if (availableNumberOfCores > 0) {
const numWorkers = availableNumberOfCores
concurrency = numWorkers
log('Number of Workers: ', numWorkers)
worker = new JestWorker(module, { numWorkers, enableWorkerThreads: true })

const workerStdout = worker.getStdout()
if (workerStdout) {
workerStdout.on('data', (chunk) => {
return process.stdout.write(chunk)
})
}

const workerStderr = worker.getStderr()

if (workerStderr) {
workerStderr.on('data', (chunk) => {
return process.stderr.write(chunk)
})
}
}

// handle concurrency with rxjs
const scheduled = new Subject()
const consumed = new Subject()

const result$ = scheduled.pipe(
mergeMap((data) => {
// data is actually consumed here
consumed.next(null)
// worker[methodName] can only be invoked with serializable data
// and returned value could be just plain RESULT or Promise<RESULT>
return from(taskCallback(worker || require(module), data))
}, concurrency),
share()
)
result$.subscribe({
complete: function() {
if (worker) {
worker.end()
}
},
next: function() {
completed += 1
if (allScheduled && completed === total) {
scheduled.complete()
}
},
error: function(err) {
throw err
}
})

return {
result$,
consumed$: consumed.asObservable(),
next: (data) => {
scheduled.next(data)
total += 1
},
complete: () => { allScheduled = true }
}
}

function gulpParallelRunner(module, taskCallback, parallel) {
const {
result$,
consumed$,
next,
complete
} = parallelRunner(module, taskCallback, parallel)
let afterComplete, stream, afterConsume

consumed$.subscribe(() => {
// `afterComplete was defined` means there is no more data
if (!afterComplete && afterConsume) {
afterConsume()
}
})

result$.subscribe({
complete: () => {
if (afterComplete) {
afterComplete()
}
},
next: (data) => {
stream.push(data)
// if returned value is false means stream ends or meets highWaterMark
// but we don't care since we use rxjs to control concurrency
}
})

const flush = function(cb) {
afterComplete = cb
complete()
}
const transform = function(file, enc, afterTransform) {
if (!stream) {
stream = this
}
if (!afterConsume) {
afterConsume = afterTransform
}
next(file)
}
return through2.obj(transform, flush)
}

// Staticng has CPU limit of 5 on k8s, so we can't use os.cpus().length which
// reports the number of online CPUs, but running with 4 threads is fast enough.
// https://github.com/nodejs/node/issues/28762#issuecomment-513730856
const PRODUCTION_PARALLEL_NUMBER = 4

module.exports = {
parallelRunner,
gulpParallelRunner,
PRODUCTION_PARALLEL_NUMBER,
}

实际应用于 terser 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function parallelTerser(needMangle) {
const options = generateTerserOptions(needMangle)
return gulpParallelRunner(
require.resolve('terser'),
function(worker, file) {
return Promise.resolve(worker.minify({
[file.path]: file.contents.toString('utf-8')
}, options)).then((result) => {
if ('error' in result) {
throw new Error(result.error.message)
}
file.contents = 'from' in Buffer ? Buffer.from(result.code) : new Buffer(result.code)
return file
})
},
isProduction ? PRODUCTION_PARALLEL_NUMBER : true
)
}