Source code for switchyard.lib.packet.ipv6

import struct
from ipaddress import IPv6Address
from abc import ABCMeta, abstractmethod
from enum import IntEnum

from ..logging import log_warn
from .packet import PacketHeaderBase, Packet, NullPacketHeader
from ..address import EthAddr, ip_address, SpecialIPv6Addr, SpecialEthAddr
from .common import IPProtocol, checksum
from ..exceptions import *

from .icmpv6 import ICMPv6
from .tcp import TCP
from .udp import UDP

'''
References:
    IETF RFC 2460 http://tools.ietf.org/html/rfc2460 (ipv6)
    IETF RFC 2460 http://tools.ietf.org/html/rfc2460 (ipv6)
    IETF RFC 6564 http://tools.ietf.org/html/rfc6564 (uniform format for ipv6 extension headers)
    IETF RFC 7045 http://tools.ietf.org/html/rfc7045 (transmission and processing of ipv6 extension headers)
'''

class IPv6ExtensionHeader(PacketHeaderBase):
    __slots__ = ['_nextheader','_optdatalen','_optlenmultiplier']
    _PACKFMT = '!BB'
    _MINLEN = 2

    def __init__(self, optlenmultiplier, **kwargs):
        self._nextheader = None
        self._optdatalen = 0
        # number of bytes represented by length field (should be 1 or 8, depending on ext hdr)
        assert(optlenmultiplier in (1,8))
        self._optlenmultiplier = optlenmultiplier
        super().__init__(**kwargs)

    def pre_serialize(self, raw, pkt, i):
        pass

    def size(self):
        return self._optdatalen

    @property
    def nextheader(self):
        return self._nextheader

    @nextheader.setter
    def nextheader(self, value):
        self._nextheader = IPProtocol(value)

    @property 
    def protocol(self):
        return self._nextheader

    @protocol.setter
    def protocol(self, value):
        self._nextheader = IPProtocol(value)

    def next_header_class(self):
        cls = IPTypeClasses.get(self.nextheader, None) 
        if cls is None and self.nextheader not in IPTypeClasses:
            log_warn("Warning: no class exists to parse next protocol type: {}".format(self.protocol))
        return cls

    def to_bytes(self):
        return struct.pack(IPv6ExtensionHeader._PACKFMT, 
            self.nextheader.value, self._optdatalen)

    def from_bytes(self, raw):
        if len(raw) < IPv6ExtensionHeader._MINLEN:
            raise NotEnoughDataError("Not enough data to unpack IPv6ExtensionHeader")

        self.nextheader = IPProtocol(raw[0])
        self._optdatalen = int(raw[1])

        if len(raw) < self._optdatalen * self._optlenmultiplier:
            raise Exception("Not enough data to unpack IPv6ExtensionHeader")
        return raw[IPv6ExtensionHeader._MINLEN:]

    def __eq__(self, other):
        return self.to_bytes() == other.to_bytes()


class IPv6RouteOption(IPv6ExtensionHeader):
    '''
    IPv6 routing option.  Only supports type 2 (single address) option.
    '''
    __slots__ = ['_routingtype', '_segmentsleft','_address']
    def __init__(self, **kwargs):
        self._routingtype = 2
        self._segmentsleft = 1
        self._address = SpecialIPv6Addr.UNDEFINED.value
        self._optdatalen = 2 # RFC2460: len is number of 8 octet units, not including for 8 octets
        super().__init__(8, **kwargs)

    def __str__(self):
        return "{} (type {}, {})".format(self.__class__.__name__, self._routingtype, self._address)

    @property
    def address(self):
        return self._address

    @address.setter
    def address(self, value):
        self._address = IPv6Address(value)

    def to_bytes(self):
        common = super().to_bytes()
        payload = struct.pack('!BBI', self._routingtype, self._segmentsleft, 0) + self._address.packed
        return common + payload

    def from_bytes(self, raw):
        remain = super().from_bytes(raw)
        self._routingtype = remain[0]
        self._segmentsleft = remain[1]
        if self._routingtype == 2:
            rawaddr = remain[6:22]
            remain = remain[22:]
            self._address = IPv6Address(rawaddr)
        else:
            raise ValueError("IPv6 routing option only supports type 2 (but I got type {})".format(self._routingtype))
        return remain

class IPv6Fragment(IPv6ExtensionHeader):
    __slots__ = ['_id','_offset','_morefragments']
    _PACKFMT = '!HI'
    _MINLEN = 6

    def __init__(self, **kwargs):
        self._id = 0
        self._offset = 0
        self._morefragments = False
        self._optdatalen = 0
        super().__init__(1, **kwargs)

    @property 
    def id(self):
        return self._id

    @id.setter
    def id(self, value):
        self._id = int(value)

    @property 
    def offset(self):
        return self._offset

    @offset.setter
    def offset(self, value):
        self._offset = int(value)

    @property 
    def mf(self):
        return self._morefragments

    @mf.setter
    def mf(self, value):
        self._morefragments = bool(value)

    def __str__(self):
        return "{} (id: {} offset: {} mf: {})".format(self.__class__.__name__, self._id, self._offset, self._morefragments)

    def to_bytes(self):
        common = super().to_bytes()
        payload = struct.pack(IPv6Fragment._PACKFMT, self._offset << 3 | int(self._morefragments), self._id)
        return common + payload

    def from_bytes(self, raw):
        remain = super().from_bytes(raw)
        if len(remain) < IPv6Fragment._MINLEN:
            raise NotEnoughDataError("Not enough data to unpack IPv6Fragment extension header")
        offsetfield, xid = struct.unpack(IPv6Fragment._PACKFMT, remain[:IPv6Fragment._MINLEN])
        self._id = xid
        self._offset = offsetfield >> 3
        self._morefragments = bool(offsetfield & 0x1)
        remain = remain[IPv6Fragment._MINLEN:]
        return remain

class IPv6Option(metaclass=ABCMeta):
    def __init__(self):
        pass

    @abstractmethod
    def to_bytes(self):
        pass

    def __str__(self):
        return "{}".format(self.__class__.__name__)

class Pad1(IPv6Option):
    def __init__(self):
        pass

    def to_bytes(self):
        return b'\x00'

    @staticmethod
    def from_bytes(raw):
        return Pad1()

class PadN(IPv6Option):
    __slots__ = ('_n',)

    def __init__(self, n=0):
        self._n = n - 2

    def to_bytes(self):
        return struct.pack('BB', 1, self._n) + b'\x00' * self._n

    @property 
    def n(self):
        return self._n

    @staticmethod
    def from_bytes(raw):
        p = PadN()
        p._n = len(raw)
        return p

    def __str__(self):
        return "{} ({})".format(self.__class__.__name__, self._n+2)

class JumboPayload(IPv6Option):
    __slots__ = ('_len',)
    def __init__(self, len):
        self._len = len

    def to_bytes(self):
        return struct.pack('!BBI', 0xc2, 4, self._len)

    @property 
    def len(self):
        return self._len

    @staticmethod
    def from_bytes(raw):
        assert(len(raw) == 4)
        fields = struct.unpack('!I', raw)
        return JumboPayload(fields[0])

    def __str__(self):
        return "{} ({})".format(self.__class__.__name__, self._len)

class TunnelEncapsulationLimit(IPv6Option):
    __slots__ = ('_limit',)
    def __init__(self, limit):
        self._limit = int(limit)

    def to_bytes(self):
        return struct.pack('BBB', 4, 1, self._limit)

    @property 
    def limit(self):
        return self._limit

    @staticmethod
    def from_bytes(raw):
        assert(len(raw) == 1)
        return TunnelEncapsulationLimit(raw[0])

    def __str__(self):
        return "{} ({})".format(self.__class__.__name__, self._limit)


class RouterAlert(IPv6Option):
    __slots__ = ('_value',)
    def __init__(self, value):
        self._value = value

    def to_bytes(self):
        return struct.pack('!BBH', 5, 2, self._value)

    @property 
    def value(self):
        return self._value

    @staticmethod
    def from_bytes(raw):
        assert(len(raw) == 2)
        fields = struct.unpack('!H', raw)
        return RouterAlert(fields[0])

    def __str__(self):
        return "{} ({})".format(self.__class__.__name__, self._value)

class HomeAddress(IPv6Option):
    __slots__ = ('_addr',)
    def __init__(self, addr):
        self._addr = IPv6Address(addr)

    def to_bytes(self):
        return struct.pack('!BB', 201, 16) + self._addr.packed

    @property 
    def address(self):
        return self._addr

    @staticmethod
    def from_bytes(raw):
        assert(len(raw) == 16)
        return HomeAddress(raw)

    def __str__(self):
        return "{} ({})".format(self.__class__.__name__, self._value)

class IPv6HopOption(IPv6ExtensionHeader):
    __slots__ = ['_options']
    _option_type_dict = {
        0: Pad1,
        1: PadN,
        194: JumboPayload,
        4: TunnelEncapsulationLimit,
        5: RouterAlert,
        201: HomeAddress 
    }

    def __init__(self, **kwargs):
        self._options = []
        super().__init__(8, **kwargs)

    def __str__(self):
        return "{}/{}".format(self.__class__.__name__, ' ; '.join([str(o) for o in self._options]))

    def to_bytes(self):
        xopt = b''.join([o.to_bytes() for o in self._options])
        hdrlen = len(xopt) + 2
        if hdrlen % 8 != 0:
            log_warn("Number of bytes in {} is not an even multiple of 8 ({}); " 
                     "padding must be explicitly added to correctly form the packet".format(self.__class__.__name__, hdrlen))
        self._optdatalen = hdrlen // 8 - 1
        common = super().to_bytes()
        return common + xopt

    def from_bytes(self, raw):
        remain = super().from_bytes(raw)
        optbytes = (self._optdatalen + 1) * 8 - 2
        if (optbytes+2) % 8 != 0:
            log_warn("Trying to reconstruct {} that isn't a multiple of 8. This will end badly.".format(self.__class__.__name__))
        rawopt = remain[:optbytes]
        remain = remain[optbytes:]
        self._parseTLVOptions(rawopt)
        return remain

    def add_option(self, optobj):
        if not issubclass(optobj.__class__, IPv6Option):
            raise Exception("IPv6 option object isn't derived from IPv6Option class.")
        self._options.append(optobj)

    def __getitem__(self, idx):
        if not isinstance(idx, int):
            raise TypeError("indexing in IPv6 option requires an int")
        if not 0 <= idx < len(self._options):
            raise IndexError("Bad index in IPv6 option access")
        return self._options[idx]

    def __len__(self):
        return len(self._options)

    def _parseTLVOptions(self, raw):
        self._options = []
        while len(raw):
            xtype = raw[0]
            if xtype == 0:
                self._options.append(Pad1())
                raw = raw[1:]
            else:
                xlen = raw[1]
                if len(raw) < xlen+2:
                    raise NotEnoughDataError("Not enough data to unpack IPv6 TLV option (have {} bytes, need {} bytes for type {}".format(len(raw), xlen, xtype))
                data = raw[2:(2+xlen)]
                raw = raw[(2+xlen):]
                cls = IPv6HopOption._option_type_dict.get(xtype, None)
                if cls is None:
                    raise ValueError("Bad IPv6 option type {}".format(xtype))
                self._options.append(cls.from_bytes(raw=data))

class IPv6DestinationOption(IPv6HopOption):
    pass

# class IPv6MobilityHeaderType(IntEnum):
#     BindingRefreshRequest = 0
#     HomeTestInit = 1
#     CareOfTestInit = 2
#     HomeTest = 3
#     CareOfTest = 4
#     BindingUpdate = 5
#     BindingAcknowledgment = 6
#     BindingError = 7

# _IPv6MobilityHeaderStruct = {
#     IPv6MobilityHeaderType.BindingRefreshRequest: '!H', # 2 reserved bytes, TLV options
#     IPv6MobilityHeaderType.HomeTestInit: '!H8s', # 2 reserved bytes, 8 byte cookie, TLV options
#     IPv6MobilityHeaderType.CareOfTestInit: '!H8s', # 2 reserved bytes, 8 byte cookie, TLV options
#     IPv6MobilityHeaderType.HomeTest: '!H8s8s', # 2 byte nonce, 8 byte cookie, 8 byte keygen token, TLV options
#     IPv6MobilityHeaderType.CareOfTest: '!H8s8s', # 2 byte nonce, 8 byte cookie, 8 byte keygen token, TLV options
#     IPv6MobilityHeaderType.BindingUpdate: '!H4B', # 2 byte seq, 4 more bytes (bitfields, etc.), TLV options
#     IPv6MobilityHeaderType.BindingAcknowledgment: '!2BHH', # 2 byte status/reserved, 2 byte seq, 2 byte lifetime, TLV options
#     IPv6MobilityHeaderType.BindingError: '!BB16s', # status(1), reserved(1), homeaddr(16), TLV options
# }


# Mobility Options: Pad1, PadN, BindingRefreshAdvice (type = 2), Alternate CoA (type = 3),
# Nonce Indices (type=4), Binding Authz Data (type=5)


# class IPv6Mobility(IPv6ExtensionHeader):
#     '''
#     IPv6Mobility packet header.

#     This header is incomplete, but *should* sufficiently parse any valid
#     MIPv6 header.  In particular, there is no special handling of the
#     header type elements apart from simply making sure that all the data
#     are encoded/decoded in the right byte sizes (see IPv6MobilityHeaderType 
#     enumeration, above).  
#     '''

#     __slots__ = ('_mhtype','_checksum','_data','_src','_dst')
#     _PACKFMT = '!BBH'
#     _MINLEN = struct.calcsize(_PACKFMT)

#     def __init__(self, **kwargs):
#         self._nextheader = IPProtocol.IPv6NoNext
#         self._optdatalen = 0 #FIXME
#         self._mhtype = IPv6MobilityHeaderType(0)
#         self._data = (0,)
#         self._src = self._dst = SpecialIPv6Addr.UNDEFINED.value
#         super().__init__(8, **kwargs)

#     def pre_serialize(self, raw, pkt, i):
#         ipv6hdr = pkt.get_header(IPv6)
#         self._src = ipv6hdr.src
#         self._dst = ipv6hdr.dst

#     def __str__(self):
#         return "IPv6Mobility ({})".format(self._mhtype.name)

#     def _compute_checksum(self):
#         # FIXME: computed on IPv6 pseudoheader + full mobility header (starting with ext header)
#         # pseudoheader: srcaddr(16),dstaddr(16),len of mobility header (4), 000 next header(4)
#         self._checksum = 0
#         exthdr = self.to_bytes(computecsum=False)
#         self._checksum = checksum(struct.pack('16s16sIxxxB',
#                                   self._src.packed,
#                                   self._dst.packed,
#                                   len(exthdr), IPProtocol.IPv6Mobility.value) +
#             exthdr)
#         return self._checksum

#     def _parse_tlv(self, raw):
#         pass

#     def to_bytes(self, computecsum=True):
#         if computecsum:
#             self._compute_checksum()
#         exthdr = super().to_bytes()
#         mobhdr = struct.pack(IPv6Mobility._PACKFMT, self._mhtype.value, 0, self._checksum)
#         remain = struct.pack(_IPv6MobilityHeaderStruct[self._mhtype], *self._data)
#         return exthdr + mobhdr + remain

#     def from_bytes(self, raw):
#         super().from_bytes(raw)
#         remain = raw[2:]
#         if len(remain) < IPv6Mobility._MINLEN:
#             raise NotEnoughDataError("Not enough data to unpack IPv6Mobility header")
#         mhtype,reserved,checksum = struct.unpack(IPv6Mobility._PACKFMT, 
#                                                  remain[:IPv6Mobility._MINLEN])
#         self._mhtype = IPv6MobilityHeaderType(mhtype)
#         self._checksum = checksum
#         mobheaderstruct = _IPv6MobilityHeaderStruct[self._mhtype]
#         structsize = struct.calcsize(mobheaderstruct)
#         self._data = struct.unpack(mobheaderstruct, remain[IPv6Mobility._MINLEN:(IPv6Mobility._MINLEN+structsize)])
#         return raw[(IPv6Mobility._MINLEN + structsize):]


IPTypeClasses = {
    IPProtocol.TCP: TCP,
    IPProtocol.UDP: UDP,
    IPProtocol.ICMPv6: ICMPv6,

    # IPv6 extension headers
    IPProtocol.IPv6HopOption: IPv6HopOption,
    IPProtocol.IPv6RouteOption: IPv6RouteOption,
    IPProtocol.IPv6Fragment: IPv6Fragment,
    IPProtocol.IPv6DestinationOption: IPv6DestinationOption,
    IPProtocol.IPv6NoNext: None,
}


[docs] class IPv6(PacketHeaderBase): __slots__ = ['_trafficclass','_flowlabel','_ttl', '_nextheader','_payloadlen', '_src','_dst','_extheaders'] _PACKFMT = '!BBHHBB16s16s' _MINLEN = struct.calcsize(_PACKFMT) _next_header_map = IPTypeClasses _next_header_class_key = '_nextheader' def __init__(self, **kwargs): self.trafficclass = 0 self.flowlabel = 0 self.ttl = 128 self.nextheader = IPProtocol.ICMPv6 self._payloadlen = 0 self.src = SpecialIPv6Addr.UNDEFINED.value self.dst = SpecialIPv6Addr.UNDEFINED.value self._extheaders = [] super().__init__(**kwargs) def size(self): return IPv6._MINLEN + 0 # FIXME extension headers def pre_serialize(self, raw, pkt, i): self._payloadlen = len(raw) def to_bytes(self): return struct.pack(IPv6._PACKFMT, 6 << 4 | self.trafficclass >> 4, (self.trafficclass & 0x0f) << 4 | (self.flowlabel & 0xf0000) >> 16, self.flowlabel & 0x0ffff, self._payloadlen, self.nextheader.value, self.ttl, self.src.packed, self.dst.packed) def from_bytes(self, raw): if len(raw) < IPv6._MINLEN: raise NotEnoughDataError("Not enough data to unpack IPv6 header (only {} bytes)".format(len(raw))) fields = struct.unpack(IPv6._PACKFMT, raw[:IPv6._MINLEN]) ipversion = fields[0] >> 4 if ipversion != 6: raise ValueError("Trying to parse IPv6 header, but IP version is not 6! ({})".format(ipversion)) self.trafficclass = (fields[0] & 0x0f) << 4 | (fields[1] >> 4) self.flowlabel = (fields[1] & 0x0f) << 16 | fields[2] self._payloadlen = fields[3] self.nextheader = fields[4] self.ttl = fields[5] self.src = fields[6] self.dst = fields[7] # FIXME: extension headers return raw[IPv6._MINLEN:] def __eq__(self, other): return self._trafficclass == other._trafficclass and \ self._flowlabel == other._flowlabel and \ self._ttl == other._ttl and \ self._nextheader == other._nextheader and \ self._src == other._src and \ self._dst == other._dst and \ self._extheaders == other._extheaders # accessors and mutators @property def trafficclass(self): return self._trafficclass @trafficclass.setter def trafficclass(self, value): self._trafficclass = value @property def flowlabel(self): return self._flowlabel @flowlabel.setter def flowlabel(self, value): self._flowlabel = value @property def nextheader(self): return self._nextheader @nextheader.setter def nextheader(self, value): self._nextheader = IPProtocol(value) @property def ttl(self): return self._ttl @ttl.setter def ttl(self, value): self._ttl = int(value) @property def hoplimit(self): return self.ttl @hoplimit.setter def hoplimit(self, value): self.ttl = value @property def src(self): return self._src @src.setter def src(self, value): self._src = IPv6Address(value) @property def dst(self): return self._dst @dst.setter def dst(self, value): self._dst = IPv6Address(value) def __str__(self): return '{} {}->{} {}'.format(self.__class__.__name__, self.src, self.dst, self.nextheader.name)