这里有一个 primes.py
我一开始以为是库函数里面的东西,原来是作者在前面的部分给的代码不完整,完整的 primes.py
如下,
import math
PRIME_FIXTURE = [
(2, True),
(142702110479723, True),
(299593572317531, True),
(3333333333333301, True),
(3333333333333333, False),
(3333335652092209, False),
(4444444444444423, True),
(4444444444444444, False),
(4444444488888889, False),
(5555553133149889, False),
(5555555555555503, True),
(5555555555555555, False),
(6666666666666666, False),
(6666666666666719, True),
(6666667141414921, False),
(7777777536340681, False),
(7777777777777753, True),
(7777777777777777, False),
(9999999999999917, True),
(9999999999999999, False),
]
NUMBERS = [n for n, _ in PRIME_FIXTURE]
# tag::IS_PRIME[]
def is_prime(n: int) -> bool:
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
root = math.isqrt(n)
for i in range(3, root + 1, 2):
if n % i == 0:
return False
return True
# end::IS_PRIME[]
if __name__ == '__main__':
for n, prime in PRIME_FIXTURE:
prime_res = is_prime(n)
assert prime_res == prime
print(n, prime)
来自:https://github.com/fluentpython/example-code-2e/blob/master/19-concurrency/primes/primes.py
一篇需要好好理解的多进程代码,
"""
procs.py: shows that multiprocessing on a multicore machine
can be faster than sequential code for CPU-intensive work.
"""
# tag::PRIMES_PROC_TOP[]
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count # <1>
from multiprocessing import queues # <2>
from primes import is_prime, NUMBERS
class PrimeResult(NamedTuple): # <3>
n: int
prime: bool
elapsed: float
JobQueue = queues.SimpleQueue[int] # <4>
ResultQueue = queues.SimpleQueue[PrimeResult] # <5>
def check(n: int) -> PrimeResult: # <6>
t0 = perf_counter()
res = is_prime(n)
return PrimeResult(n, res, perf_counter() - t0)
def worker(jobs: JobQueue, results: ResultQueue) -> None: # <7>
while n := jobs.get(): # <8>
results.put(check(n)) # <9>
results.put(PrimeResult(0, False, 0.0)) # <10>
def start_jobs(
procs: int, jobs: JobQueue, results: ResultQueue # <11>
) -> None:
for n in NUMBERS:
jobs.put(n) # <12>
for _ in range(procs):
proc = Process(target=worker, args=(jobs, results)) # <13>
proc.start() # <14>
jobs.put(0) # <15>
# end::PRIMES_PROC_TOP[]
# tag::PRIMES_PROC_MAIN[]
def main() -> None:
if len(sys.argv) < 2: # <1>
procs = cpu_count()
else:
procs = int(sys.argv[1])
print(f'Checking {len(NUMBERS)} numbers with {procs} processes:')
t0 = perf_counter()
jobs: JobQueue = SimpleQueue() # <2>
results: ResultQueue = SimpleQueue()
start_jobs(procs, jobs, results) # <3>
checked = report(procs, results) # <4>
elapsed = perf_counter() - t0
print(f'{checked} checks in {elapsed:.2f}s') # <5>
def report(procs: int, results: ResultQueue) -> int: # <6>
checked = 0
procs_done = 0
while procs_done < procs: # <7>
n, prime, elapsed = results.get() # <8>
if n == 0: # <9>
procs_done += 1
else:
checked += 1 # <10>
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s')
return checked
if __name__ == '__main__':
main()
# end::PRIMES_PROC_MAIN[]
理解这里的代码的关键是理解几个无限循环,以及这几个无限循环的终止条件。
report 也是无限循环。
其实,应该解决这个问题就可以了:如何保证每个核都会去不间断地工作?
回答: