Source code for switchyard.lib.packet.arp
from .packet import PacketHeaderBase,Packet
from ..address import EthAddr,ip_address,SpecialIPv4Addr,SpecialEthAddr
import struct
from .common import EtherType, ArpHwType, ArpOperation
from ..exceptions import *
'''
References:
Plummer.
"RFC826", An Ethernet Address Resolution Protocol.
Finlayson, Mann, Mogul, and Theimer.
"RFC903", A Reverse Address Resolution Protocol.
http://en.wikipedia.org/wiki/Address_Resolution_Protocol
'''
[docs]
class Arp(PacketHeaderBase):
__slots__ = ['_hwtype','_prototype','_hwaddrlen','_protoaddrlen',
'_operation','_senderhwaddr','_senderprotoaddr',
'_targethwaddr','_targetprotoaddr']
_PACKFMT = '!HHBBH6s4s6s4s'
_MINLEN = struct.calcsize(_PACKFMT)
def __init__(self, **kwargs):
self._hwtype = ArpHwType.Ethernet
self._prototype = EtherType.IP
self._hwaddrlen = 6
self._protoaddrlen = 4
self.operation = ArpOperation.Request
self.senderhwaddr = SpecialEthAddr.ETHER_ANY.value
self.senderprotoaddr = SpecialIPv4Addr.IP_ANY.value
self.targethwaddr = SpecialEthAddr.ETHER_BROADCAST.value
self.targetprotoaddr = SpecialIPv4Addr.IP_ANY.value
super().__init__(**kwargs)
def size(self):
return struct.calcsize(Arp._PACKFMT)
def pre_serialize(self, raw, pkt, i):
pass
def to_bytes(self):
'''
Return packed byte representation of the ARP header.
'''
return struct.pack(Arp._PACKFMT, self._hwtype.value, self._prototype.value, self._hwaddrlen, self._protoaddrlen, self._operation.value, self._senderhwaddr.packed, self._senderprotoaddr.packed, self._targethwaddr.packed, self._targetprotoaddr.packed)
def from_bytes(self, raw):
'''Return an Ethernet object reconstructed from raw bytes, or an
Exception if we can't resurrect the packet.'''
if len(raw) < Arp._MINLEN:
raise NotEnoughDataError("Not enough bytes ({}) to reconstruct an Arp object".format(len(raw)))
fields = struct.unpack(Arp._PACKFMT, raw[:Arp._MINLEN])
try:
self._hwtype = ArpHwType(fields[0])
self._prototype = EtherType(fields[1])
self._hwaddrlen = fields[2]
self._protoaddrlen = fields[3]
self.operation = ArpOperation(fields[4])
self.senderhwaddr = EthAddr(fields[5])
self.senderprotoaddr = ip_address(fields[6])
self.targethwaddr = EthAddr(fields[7])
self.targetprotoaddr = ip_address(fields[8])
except Exception as e:
raise Exception("Error constructing Arp packet object from raw bytes: {}".format(str(e)))
return raw[Arp._MINLEN:]
def __eq__(self, other):
return self.hardwaretype == other.hardwaretype and \
self.protocoltype == other.protocoltype and \
self.operation == other.operation and \
self.senderhwaddr == other.senderhwaddr and \
self.senderprotoaddr == other.senderprotoaddr and \
self.targethwaddr == other.targethwaddr and \
self.targetprotoaddr == other.targetprotoaddr
@property
def hardwaretype(self):
return self._hwtype
@property
def protocoltype(self):
return self._prototype
@property
def operation(self):
return self._operation
@operation.setter
def operation(self, value):
self._operation = ArpOperation(value)
@property
def senderhwaddr(self):
return self._senderhwaddr
@senderhwaddr.setter
def senderhwaddr(self, value):
self._senderhwaddr = EthAddr(value)
@property
def senderprotoaddr(self):
return self._senderprotoaddr
@senderprotoaddr.setter
def senderprotoaddr(self, value):
self._senderprotoaddr = ip_address(value)
@property
def targethwaddr(self):
return self._targethwaddr
@targethwaddr.setter
def targethwaddr(self, value):
self._targethwaddr = EthAddr(value)
@property
def targetprotoaddr(self):
return self._targetprotoaddr
@targetprotoaddr.setter
def targetprotoaddr(self, value):
self._targetprotoaddr = ip_address(value)
def next_header_class(self):
'''
No other headers should follow ARP.
'''
return None
def __str__(self):
return '{} {}:{} {}:{}'.format(self.__class__.__name__,
self.senderhwaddr, self.senderprotoaddr,
self.targethwaddr, self.targetprotoaddr)