迭代器与生成器

关注 4.4 4.13

4.1 手动遍历迭代器

问题

不使用 for 循环来实现迭代

解决方案

使用 next() 函数并在代码中捕获 StopIteration 异常。

通常来讲, StopIteration 用来指示迭代的结尾。不过也可以通过返回一个指定值来标记结尾,比如 None 。

def manual_iter():
    with open('/etc/passwd') as f:
        try:
            while True:
                line = next(f)
                print(line, end='')
        except StopIteration:
            pass
# same 指定迭代结束后返回什么
with open('/etc/passwd') as f:
    while True:
        line = next(f, None) # here
        if line is None:
            break
        print(line, end='')

讨论

迭代期间的基本细节:

>>> items = [1, 2, 3]
>>> # Get the iterator
>>> it = iter(items) # Invokes items.__iter__()
>>> # Run the iterator
>>> next(it) # Invokes it.__next__()
1
>>> next(it)
2
>>> next(it)
3
>>> next(it)
Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
StopIteration

4.2 代理迭代

问题

想在自定义的容器对象里执行迭代操作

解决方案

实际上你只需要定义一个 iter() 方法,将迭代操作代理到容器内部的对象上去。比如:

class Node:
    def __init__(self, value):
        self._value = value
        self._children = []

    def __repr__(self):
        return 'Node({!r})'.format(self._value)

    def add_child(self, node):
        self._children.append(node)

    def __iter__(self):
        return iter(self._children)

# Example
if __name__ == '__main__':
    root = Node(0)
    child1 = Node(1)
    child2 = Node(2)
    root.add_child(child1)
    root.add_child(child2)
    # Outputs Node(1), Node(2)
    for ch in root:
        print(ch)

讨论

Python的迭代器协议需要 iter() 方法返回一个实现了 next() 方法的迭代器对象。

iter(s) 只是简单的通过调用 s.iter() 方法来返回对应的迭代器对象。就像 len(s) 只是调用了 s.len() 一样。

4.3 使用生成器创建新的迭代模式

问题

想自定义迭代模式

解决方案

如果你想实现一种新的迭代模式,使用一个生成器函数来定义它。

def frange(start, stop, increment):
    x = start
    while x < stop:
        yield x
        x += increment

讨论

一个函数中需要有一个 yield 语句即可将其转换为一个生成器。

一个生成器函数主要特征是它只会回应在迭代中使用到的 next 操作。 一旦生成器函数返回退出,迭代终止。

另外,生成器只能使用一次,即迭代完了就无法恢复最初的状态,但是生成器函数是可以多次使用的,比如

def gen(n):
    for i in range(n):
        yield i

for i in gen(3):
    print(i)
for i in gen(3):
    print(i)
0
1
2
0
1
2
```python
def gen(n):
    for i in range(n):
        yield i
c = gen(3)
for i in c:
    print(i)
for i in c:
    print(i)
0
1
2

后一种情况下,生成器只运行了一次

4.4 实现迭代器协议

问题

想构建一个能支持迭代操作的自定义对象,并希望找到一个能实现迭代协议的简单方法。

解决方案

目前为止,在一个对象上实现迭代最简单的方式是使用一个生成器函数。

实现一个以深度优先方式遍历树形节点的生成器。 下面是代码示例:

class Node:
    def __init__(self, value):
        self._value = value
        self._children = []

    def __repr__(self):
        return 'Node({!r})'.format(self._value)

    def add_child(self, node):
        self._children.append(node)

    def __iter__(self):
        return iter(self._children)

    def depth_first(self):
        yield self
        for c in self:
            yield from c.depth_first()


# Example
if __name__ == '__main__':
    root = Node(0)
    child1 = Node(1)
    child2 = Node(2)
    root.add_child(child1)
    root.add_child(child2)
    child1.add_child(Node(3))
    child1.add_child(Node(4))
    child2.add_child(Node(5))

    for ch in root.depth_first():
        print(ch)
    # Outputs Node(0), Node(1), Node(3), Node(4), Node(2), Node(5)

在这段代码中,depth_first() 方法简单直观。 它首先返回自己本身并迭代每一个子节点并通过调用子节点的 depth_first() 方法(使用 yield from 语句)返回对应元素。

讨论

使用一个关联迭代器类重新实现 depth_first() 方法:

class Node2:
    def __init__(self, value):
        self._value = value
        self._children = []

    def __repr__(self):
        return 'Node({!r})'.format(self._value)

    def add_child(self, node):
        self._children.append(node)

    def __iter__(self):
        return iter(self._children)

    def depth_first(self):
        return DepthFirstIterator(self)


class DepthFirstIterator(object):
    '''
    Depth-first traversal
    '''

    def __init__(self, start_node):
        self._node = start_node
        self._children_iter = None
        self._child_iter = None

    def __iter__(self):
        return self

    def __next__(self):
        # Return myself if just started; create an iterator for children
        if self._children_iter is None:
            self._children_iter = iter(self._node)
            return self._node
        # If processing a child, return its next item
        elif self._child_iter:
            try:
                nextchild = next(self._child_iter)
                return nextchild
            except StopIteration:
                self._child_iter = None
                return next(self)
        # Advance to the next child and start its iteration
        else:
            self._child_iter = next(self._children_iter).depth_first()
            return next(self)

DepthFirstIterator 类和上面使用生成器的版本工作原理类似, 但是它写起来很繁琐,因为必须在迭代处理过程中维护大量的状态信息

4.5 反向迭代

问题

反方向迭代一个序列

解决方案

使用内置的 reversed() 函数,比如:

>>> a = [1, 2, 3, 4]
>>> for x in reversed(a):
...     print(x)
反向迭代仅仅当对象的大小可预先确定或者对象实现了 __reversed__() 的特殊方法时才能生效。 如果两者都不符合,那你必须先将对象转换为一个列表才行,比如:

# Print a file backwards
f = open('somefile')
for line in reversed(list(f)):
    print(line, end='')

要注意的是如果可迭代对象元素很多的话,将其预先转换为一个列表要消耗大量的内存。

讨论

很多人都可以通过在自定义类上实现 reversed() 方法来实现反向迭代。比如:

class Countdown:
    def __init__(self, start):
        self.start = start

    # Forward iterator
    def __iter__(self):
        n = self.start
        while n > 0:
            yield n
            n -= 1

    # Reverse iterator
    def __reversed__(self):
        n = 1
        while n <= self.start:
            yield n
            n += 1

4.6 带有外部状态的生成器函数

问题

定义一个生成器函数,但是它会调用某个你想暴露给用户使用的外部状态值。

解决方案

如果你想让你的生成器暴露外部状态给用户, 别忘了你可以简单的将它实现为一个类,然后把生成器函数放到 iter() 方法中过去。比如:

from collections import deque

class linehistory:
    def __init__(self, lines, histlen=3):
        self.lines = lines
        self.history = deque(maxlen=histlen)

    def __iter__(self):
        for lineno, line in enumerate(self.lines, 1):
            self.history.append((lineno, line))
            yield line

    def clear(self):
        self.history.clear()

可以将它当做是一个普通的生成器函数,由于可以创建一个实例对象,于是你可以访问内部属性值, 比如 history 属性或者是 clear() 方法

with open('somefile.txt') as f:
    lines = linehistory(f)
    for line in lines:
        if 'python' in line:
            for lineno, hline in lines.history:
                print('{}:{}'.format(lineno, hline), end='')

讨论

如果生成器函数需要跟你的程序其他部分打交道的话(比如暴露属性值,允许通过方法调用来控制等等), 可能会导致你的代码异常的复杂。 如果是这种情况的话,可以考虑使用上面介绍的定义类的方式。 在 iter() 方法中定义你的生成器不会改变你任何的算法逻辑。 由于它是类的一部分,所以允许你定义各种属性和方法来供用户使用。

当基于类实现迭代时, for 循环可以直接使用 iter() 方法,如果要使用 next() 函数必须通过 iter() 以调用 iter() 方法。

4.7 迭代器切片

问题

想得到一个由迭代器生成的切片对象,但是标准切片操作并不能做到。

解决方案

函数 itertools.islice() 正好适用于在迭代器和生成器上做切片操作。比如:

>>> def count(n):
...     while True:
...         yield n
...         n += 1
...
>>> c = count(0)
>>> c[10:20]
Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
TypeError: 'generator' object is not subscriptable

>>> # Now using islice()
>>> import itertools
>>> for x in itertools.islice(c, 10, 20):
...     print(x)
...
10
11
12
# xxx

讨论

迭代器和生成器不能使用标准的切片操作,因为它们的长度事先我们并不知道(并且也没有实现索引)。 函数 islice() 返回一个可以生成指定元素的迭代器,它通过遍历并丢弃直到切片开始索引位置的所有元素。 然后才开始一个个的返回元素,并直到切片结束索引位置。

如果需要再次使用迭代器中的值,需要用 list 等容器保存起来。

4.8 跳过可迭代对象的开始部分

问题

想遍历一个可迭代对象,但是它开始的某些元素你并不感兴趣,想跳过它们。

解决方案

itertools 模块中有一些函数可以完成这个任务。 首先介绍的是 itertools.dropwhile()函数。使用时,你给它传递一个函数对象和一个可迭代对象。 它会返回一个迭代器对象,丢弃原有序列中直到函数返回Flase之前的所有元素,然后返回后面所有元素

读取一个开始部分是几行注释的源文件。比如:

>>> from itertools import dropwhile
>>> with open('/etc/passwd') as f:
...     for line in dropwhile(lambda line: line.startswith('#'), f):
...         print(line, end='')

如果你已经明确知道了要跳过的元素的个数的话,那么可以使用 itertools.islice() 来代替。比如:

讨论

跳过一个可迭代对象的开始部分跟通常的过滤是不同的,dropwhile 在跳过开头的注释后就不再关注后面是否有注释行了。

本节的方案适用于所有可迭代对象,包括那些事先不能确定大小的, 比如生成器,文件及其类似的对象。

4.9 排列组合的迭代

问题

想迭代遍历一个集合中元素的所有可能的排列或组合

解决方案

itertools模块提供了三个函数来解决这类问题。

  • itertools.permutations(items, len = None) 返回一个元组序列,每个元组是集合中所有元素的一个可能排列,可以指定排列长度
  • itertools.combinations(items, len = None) 返回输入集合中元素的所有的组合
  • itertools.combinations_with_replacement(items, len = None) 允许同一个元素被选择多次
>>> items = ['a', 'b', 'c']
>>> from itertools import permutations
>>> for p in permutations(items, 2):
...     print(p)
...
('a', 'b')
('a', 'c')
('b', 'a')
('b', 'c')
('c', 'a')
('c', 'b')
>>> from itertools import combinations
>>> for c in combinations(items, 2):
...     print(c)
...
('a', 'b')
('a', 'c')
('b', 'c')
>>> for c in combinations_with_replacement(items, 3):
...     print(c)
...
('a', 'a', 'a')
('a', 'a', 'b')
('a', 'a', 'c')
('a', 'b', 'b')
('a', 'b', 'c')
('a', 'c', 'c')
('b', 'b', 'b')
('b', 'b', 'c')
('b', 'c', 'c')
('c', 'c', 'c')

4.10 序列上索引值迭代

问题

想在迭代一个序列的同时跟踪正在被处理的元素索引。

解决方案

内置的 enumerate() 函数可以很好的解决这个问题:enumerate(iterable, start=0) start 为第一个标号

>>> for idx, val in enumerate(my_list, 1)

小陷阱

data = [ (1, 2), (3, 4), (5, 6), (7, 8) ]

# Correct!
for n, (x, y) in enumerate(data):
    ...
# Error!
for n, x, y in enumerate(data):

4.11 同时迭代多个序列

问题

想同时迭代多个序列,每次分别从一个序列中取一个元素。

解决方案

  • zip(*iterables) Returns an iterator of tuples 返回迭代器
  • zip_longest method from itertools

为了同时迭代多个序列,使用 zip() 函数。比如:

>>> xpts = [1, 5, 4, 2, 10, 7]
>>> ypts = [101, 78, 37, 15, 62, 99]
>>> for x, y in zip(xpts, ypts):
...     print(x,y)

zip(a, b) 会生成一个可返回元组 (x, y) 的迭代器,其中x来自a,y来自b。

迭代长度跟参数中最短序列长度一致。

如果这个不是你想要的效果,那么还可以使用 itertools.zip_longest() 函数来代替。

>>> from itertools import zip_longest
>>> for i in zip_longest(a,b):
...     print(i)
...
(1, 'w')
(2, 'x')
(3, 'y')
(None, 'z')

>>> for i in zip_longest(a, b, fillvalue=0):
...     print(i)
...
(1, 'w')
(2, 'x')
(3, 'y')
(0, 'z')

讨论

使用zip()可以打包并生成一个字典:s = dict(zip(headers,values))

zip() 可以接受多于两个的序列的参数

4.12 不同集合上元素的迭代

问题

想在多个对象执行相同的操作,但是这些对象在不同的容器中,你希望代码在不失可读性的情况下避免写重复的循环。

解决方案

itertools.chain() 方法可以用来简化这个任务。 它接受一个可迭代对象列表作为输入,并返回一个迭代器,有效的屏蔽掉在多个容器中迭代细节。

>>> from itertools import chain
>>> a = [1, 2, 3, 4]
>>> b = ['x', 'y', 'z']
>>> for x in chain(a, b):
... print(x)
...
1
2
3
4
x
y
z

讨论

itertools.chain() 接受一个或多个可迭代对象最为输入参数。 然后创建一个迭代器,依次连续的返回每个可迭代对象中的元素,节省内存。

4.13 创建数据处理管道

问题

想以数据管道(类似Unix管道)的方式迭代处理数据。 比如,你有个大量的数据需要处理,但是不能将它们一次性放入内存中。

解决方案

生成器函数是一个实现管道机制的好办法。

假定你要处理一个非常大的日志文件目录:

foo/ access-log-012007.gz access-log-022007.gz access-log-032007.gz ... access-log-012008 bar/ access-log-092007.bz2 ... access-log-022008 假设每个日志文件包含这样的数据:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875 210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369 61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 - ...

为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器。就像这样:

import os
import fnmatch
import gzip
import bz2
import re

def gen_find(filepat, top):
    '''
    Find all filenames in a directory tree that match a shell wildcard pattern
    '''
    for path, dirlist, filelist in os.walk(top):
        for name in fnmatch.filter(filelist, filepat):
            yield os.path.join(path,name)

def gen_opener(filenames):
    '''
    Open a sequence of filenames one at a time producing a file object.
    The file is closed immediately when proceeding to the next iteration.
    '''
    for filename in filenames:
        if filename.endswith('.gz'):
            f = gzip.open(filename, 'rt')
        elif filename.endswith('.bz2'):
            f = bz2.open(filename, 'rt')
        else:
            f = open(filename, 'rt')
        yield f
        f.close()

def gen_concatenate(iterators):
    '''
    Chain a sequence of iterators together into a single sequence.
    '''
    for it in iterators:
        yield from it

def gen_grep(pattern, lines):
    '''
    Look for a regex pattern in a sequence of lines
    '''
    pat = re.compile(pattern)
    for line in lines:
        if pat.search(line):
            yield line

现在你可以很容易的将这些函数连起来创建一个处理管道。 比如,为了查找包含单词python的所有日志行,你可以这样做:

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
for line in pylines:
    print(line)

如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。

lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
bytes = (int(x) for x in bytecolumn if x != '-')
print('Total', sum(bytes))

讨论

以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时轮询等。

这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就很容易编写和维护它们了。 很多时候,这些函数如果比较通用的话可以在其他场景重复使用。 并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。

事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。

在调用 gen_concatenate() 函数的时候你可能会有些不太明白。 这个函数的目的是将输入序列拼接成一个很长的行序列。itertools.chain() 函数同样有类似的功能,但是它需要将所有可迭代对象最为参数传入。在上面这个例子中,你可能会写类似这样的语句lines = itertools.chain(*files),这将导致 gen_opener() 生成器被提前全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭代步骤时文件就关闭了,因此 chain() 在这里不能这样使用,上面的方案可以避免这种情况。

gen_concatenate() 函数中出现过 yield from 语句,它将 yield 操作代理到父生成器上去。 语句 yield from it 简单的返回生成器 it 所产生的所有值。

管道方式并不是万能的。有时候你想立即处理所有数据。即便如此,使用生成器管道也可以将这类问题从逻辑上变为工作流的处理方式。

4.14 展开嵌套的序列

问题

想将一个多层嵌套的序列展开成一个单层列表

解决方案

可以写一个包含 yield from 语句的递归生成器来轻松解决这个问题。比如:

from collections import Iterable

def flatten(items, ignore_types=(str, bytes)):
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types):
            yield from flatten(x)
        else:
            yield x

items = [1, 2, [3, 4, [5, 6], 7], 8]
# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):
    print(x)

讨论

语句 yield from 在你想在生成器中调用其他生成器作为子例程的时候非常有用。 如果不使用的话,那就必须再写一个 for 循环了。

yield from 在涉及到基于协程和生成器的并发编程中扮演着更加重要的角色

4.15 顺序迭代合并后的排序迭代对象

问题

有一系列排序序列,想将它们合并后得到一个排序序列并在上面迭代遍历。

解决方案

heapq.merge() 函数可以帮你解决这个问题。比如:

>>> import heapq
>>> a = [1, 4, 7, 10]
>>> b = [2, 5, 6, 11]
>>> for c in heapq.merge(a, b):
...     print(c)
...
1
2
4
5
6
7
10
11

讨论

heapq.merge 返回一个迭代器而不是列表

有一点要强调的是** heapq.merge() 需要所有输入序列必须是排过序的,它并不会预先读取所有数据到堆栈中或者预先排序**,仅仅是检查所有序列的开始部分并返回最小的那个,这个过程一直会持续到所有输入序列中的元素都被遍历完。

4.16 迭代器代替 while 循环

问题

在代码中使用 while 循环来迭代处理数据,因为它需要调用某个函数或者和一般迭代模式不同的测试条件。 能不能用迭代器来重写这个循环呢?

解决方案

一个常见的IO操作程序可能会像下面这样:

CHUNKSIZE = 8192

def reader(s):
    while True:
        data = s.recv(CHUNKSIZE)
        if data == b'':
            break
        process_data(data)

这种代码通常可以使用 iter() 来代替,如下所示:

def reader2(s):
    for chunk in iter(lambda: s.recv(CHUNKSIZE), b''):
        pass
        # process_data(data)

讨论

iter 函数一个鲜为人知的特性是它接受一个可选的 callable 对象和一个标记(结尾)值作为输入参数。 当以这种方式使用的时候,它会创建一个迭代器, 这个迭代器会不断调用 callable 对象直到返回值和标记值相等为止。这种特殊的方法对于一些特定的会被重复调用的函数很有效果,比如涉及到I/O调用的函数。 举例来讲,如果你想从套接字或文件中以数据块的方式读取数据,通常你得要不断重复的执行 read() 或 recv() , 并在后面紧跟一个文件结尾测试来决定是否终止。这节中的方案使用一个简单的 iter() 调用就可以将两者结合起来了