Skip to content

FIFO、LRU与LFU

双向链表的原理与实践

  • 介绍
    • 单向链表
      单向链表
      每一个节点都有下一个节点的地址或引用
    • 双向链表
      双向链表
      每一个节点都有上一个节点和下一个节点的地址或引用
  • 双向链表有什么好处

    • 可以快速找到一个节点的下一个节点
    • 可以快速找到一个节点的上一个节点
    • 可以快速去掉链表中的某一个节点
  • 实现双向链表

    #! -*- encoding=utf-8 -*-
    
    class Node:
        def __init__(self, key, value):
            self.key = key
            self.value = value
            self.prev = None
            self.next = None
    
        def __str__(self):
            val = '{%d: %d}' % (self.key, self.value)
            return val
    
        def __repr__(self):
            val = '{%d: %d}' % (self.key, self.value)
            return val
    
    
    class DoubleLinkedList:
        def __init__(self, capacity=0xffff):
            self.capacity = capacity
            self.head = None
            self.tail = None
            self.size = 0
    
        # 从头部添加
        def __add_head(self, node):
            if not self.head:
                self.head = node
                self.tail = node
                self.head.next = None
                self.head.prev = None
            else:
                node.next = self.head
                self.head.prev = node
                self.head = node
                self.head.prev = None
            self.size += 1
            return node
    
        # 从尾部添加
        def __add_tail(self, node):
            if not self.tail:
                self.tail = node
                self.head = node
                self.tail.next = None
                self.tail.prev = None
            else:
                self.tail.next = node
                node.prev = self.tail
                self.tail = node
                self.tail.next = None
            self.size += 1
            return node
    
        # 从尾部删除
        def __del_tail(self):
            if not self.tail:
                return
            node = self.tail
            if node.prev:
                self.tail = node.prev
                self.tail.next = None
            else:
                self.tail = self.head = None
            self.size -= 1
            return node
    
        # 从头部删除
        def __del_head(self):
            if not self.head:
                return
            node = self.head
            if self.head.next:
                self.head.next.prev = None
                self.head = self.head.next
            else:
                self.head = self.tail = None
            self.size -= 1
            return node
    
        # 任意节点删除
        def __remove(self, node):
            # 如果node=None, 默认删除尾部节点
            if not node:
                node = self.tail
            if node == self.tail:
                self.__del_tail()
            elif node == self.head:
                self.__del_head()
            else:
                node.prev.next = node.next
                node.next.prev = node.prev
                self.size -= 1
            return node
    
        def pop(self):
            return self.__del_head()
    
        def append(self, node):
            return self.__add_tail(node)
    
        def append_front(self, node):
            return self.__add_head(node)
    
        def remove(self, node=None):
            return self.__remove(node)
    
        def print(self):
            p = self.head
            line = ''
            while p:
                line += '%s' % (p)
                p = p.next
                if p:
                    line += '->'
            print(line)
    
    
    if __name__ == '__main__':
        l = DoubleLinkedList(10)
        nodes = []
        for i in range(10):
            node = Node(i, i)
            nodes.append(node)
    
        l.append(nodes[0])
        l.print()
        l.append(nodes[1])
        l.print()
        l.pop()
        l.print()
        l.append(nodes[2])
        l.print()
        l.append_front(nodes[3])
        l.print()
        l.append(nodes[4])
        l.print()
        l.remove(nodes[2])
        l.print()
        l.remove()
        l.print()
    

实现FIFO缓存置换算法

  • 先进先出算法(FIFO)

    • 把高速缓存看做是一个先进先出的队列
    • 优先替换最先进入队列的字块
  • 代码示例

    # -*- encoding=utf-8 -*-
    
    from computer_principle.DoubleLinkedList import DoubleLinkedList, Node
    
    
    class FIFOCache(object):
        def __init__(self, capacity):
            self.capacity = capacity
            self.size = 0
            self.map = {}
            self.list = DoubleLinkedList(self.capacity)
    
        def get(self, key):
            if key not in self.map:
                return -1
            else:
                node = self.map.get(key)
                return node.value
    
        def put(self, key, value):
            if self.capacity == 0:
                return
    
            if key in self.map:
                node = self.map.get(key)
                self.list.remove(node)
                node.value = value
                self.list.append(node)
            else:
                if self.size == self.capacity:
                    node = self.list.pop()
                    del self.map[node.key]
                    self.size -= 1
                node = Node(key, value)
                self.list.append(node)
                self.map[key] = node
                self.size += 1
    
        def print(self):
            self.list.print()
    
    
    if __name__ == '__main__':
        cache = FIFOCache(2)
        cache.put(1, 1)
        cache.print()
        cache.put(2, 2)
        cache.print()
        print(cache.get(1))
        cache.put(3, 3)
        cache.print()
        print(cache.get(2))
        cache.print()
        cache.put(4, 4)
        cache.print()
        print(cache.get(1))
    

实现LRU缓存置换算法

  • 最近最少使用算法(LRU)

    • 优先淘汰一段时间内没有使用的字块
    • 有多种实现方法,一般使用双向链表
    • 把当前访问节点置于链表前面(保证链表头部节点是最近使用的)
  • 代码示例

    # -*- encoding=utf-8 -*-
    
    from computer_principle.DoubleLinkedList import DoubleLinkedList, Node
    
    
    class LRUCache(object):
    
        def __init__(self, capacity):
            self.capacity = capacity
            self.map = {}
            self.list = DoubleLinkedList(self.capacity)
    
        def get(self, key):
            if key in self.map:
                node = self.map[key]
                self.list.remove(node)
                self.list.append_front(node)
                return node.value
            else:
                return -1
    
        def put(self, key, value):
            if key in self.map:
                node = self.map.get(key)
                self.list.remove(node)
                node.value = value
                self.list.append_front(node)
            else:
                node = Node(key, value)
                # 缓存已经满了
                if self.list.size >= self.list.capacity:
                    old_node = self.list.remove()
                    self.map.pop(old_node.key)
    
                self.list.append_front(node)
                self.map[key] = node
    
        def print(self):
            self.list.print()
    
    
    if __name__ == '__main__':
        cache = LRUCache(2)
        cache.put(2, 2)
        cache.print()
        cache.put(1, 1)
        cache.print()
        cache.put(3, 3)
        cache.print()
        print(cache.get(1))
        cache.print()
        print(cache.get(2))
        cache.print()
        print(cache.get(3))
        cache.print()
    

实现LFU缓存置换算法

  • 最不经常使用算法(LFU)
    • 优先淘汰最不经常使用的字块
    • 需要额外的空间记录字块的使用频率
  • 示例代码
    # -*- encoding=utf-8 -*-
    
    from computer_principle.DoubleLinkedList import DoubleLinkedList, Node
    
    
    class LFUNode(Node):
        def __init__(self, key, value):
            self.freq = 0
            super(LFUNode, self).__init__(key, value)
    
    
    class LFUCache(object):
    
        def __init__(self, capacity):
            self.capacity = capacity
            self.map = {}
            # key: 频率, value: 频率对应的双向链表
            self.freq_map = {}
            self.size = 0
    
        # 更新节点频率的操作
        def __update_freq(self, node):
            freq = node.freq
    
            # 删除
            node = self.freq_map[freq].remove(node)
            if self.freq_map[freq].size == 0:
                del self.freq_map[freq]
    
            # 更新
            freq += 1
            node.freq = freq
            if freq not in self.freq_map:
                self.freq_map[freq] = DoubleLinkedList()
            self.freq_map[freq].append(node)
    
        def get(self, key):
            if key not in self.map:
                return -1
            node = self.map.get(key)
            self.__update_freq(node)
            return node.value
    
        def put(self, key, value):
            if self.capacity == 0:
                return
    
            # 缓存命中
            if key in self.map:
                node = self.map.get(key)
                node.value = value
                self.__update_freq(node)
    
            # 缓存没有命中
            else:
                if self.capacity == self.size:
                    min_freq = min(self.freq_map)
                    node = self.freq_map[min_freq].pop()
                    del self.map[node.key]
                    self.size -= 1
                node = LFUNode(key, value)
                node.freq = 1
                self.map[key] = node
                if node.freq not in self.freq_map:
                    self.freq_map[node.freq] = DoubleLinkedList()
                node = self.freq_map[node.freq].append(node)
                self.size += 1
    
        def print(self):
            print('***************************')
            for k, v in self.freq_map.items():
                print('Freq = %d' % k)
                self.freq_map[k].print()
            print('***************************')
            print()
    
    
    if __name__ == '__main__':
        cache = LFUCache(2)
        cache.put(1, 1)
        cache.print()
        cache.put(2, 2)
        cache.print()
        print(cache.get(1))
        cache.print()
        cache.put(3, 3)
        cache.print()
        print(cache.get(2))
        cache.print()
        print(cache.get(3))
        cache.print()
        cache.put(4, 4)
        cache.print()
        print(cache.get(1))
        cache.print()
        print(cache.get(3))
        cache.print()
        print(cache.get(4))
        cache.print()