
"""
Handling of RTP media streams according to RFC3550, RFC3605, RFC3581,
RFC2833 and RFC3711, RFC3489 and RFC5245.
"""

__all__ = ['RTPStream']

import weakref

from abc import ABCMeta, abstractmethod
from application.notification import IObserver, NotificationCenter, NotificationData, ObserverWeakrefProxy
from application.python import Null
from threading import RLock
from zope.interface import implementer

from sipsimple.account import BonjourAccount
from sipsimple.configuration.settings import SIPSimpleSettings
from sipsimple.core import RTPTransport, SIPCoreError, SIPURI
from sipsimple.lookup import DNSLookup
from sipsimple.streams import IMediaStream, InvalidStreamError, MediaStreamType, UnknownStreamError
from sipsimple.threading import run_in_thread


@implementer(IObserver)
class ZRTPStreamOptions(object):

    def __init__(self, stream):
        self._stream = stream
        self.__dict__['master'] = None
        self.__dict__['sas'] = None
        self.__dict__['verified'] = False
        self.__dict__['peer_name'] = ''

    @property
    def sas(self):
        if self.master is not None:
            return self.master.encryption.zrtp.sas
        return self.__dict__['sas']

    @property
    def verified(self):
        if self.master is not None:
            return self.master.encryption.zrtp.verified
        return self.__dict__['verified']

    @verified.setter
    def verified(self, verified):
        if self.__dict__['verified'] == verified:
            return
        if self.sas is None:
            raise AttributeError('Cannot verify peer before SAS is received')
        if self.master is not None:
            self.master.encryption.zrtp.verified = verified
        else:
            rtp_transport = self._stream._rtp_transport
            if rtp_transport is not None:
                @run_in_thread('file-io')
                def update_verified(rtp_transport, verified):
                    rtp_transport.set_zrtp_sas_verified(verified)
                    notification_center = NotificationCenter()
                    notification_center.post_notification('RTPStreamZRTPVerifiedStateChanged', sender=self._stream, data=NotificationData(verified=verified))
                self.__dict__['verified'] = verified
                update_verified(rtp_transport, verified)

    @property
    def peer_id(self):
        if self.master is not None:
            return self.master.encryption.zrtp.peer_id
        rtp_transport = self._stream._rtp_transport
        if rtp_transport is None:
            return None
        return rtp_transport.zrtp_peer_id

    @property
    def peer_name(self):
        if self.master is not None:
            return self.master.encryption.zrtp.peer_name
        return self.__dict__['peer_name']

    @peer_name.setter
    def peer_name(self, name):
        if self.__dict__['peer_name'] == name:
            return
        if self.master is not None:
            self.master.encryption.zrtp.peer_name = name
        else:
            rtp_transport = self._stream._rtp_transport
            if rtp_transport is not None:
                @run_in_thread('file-io')
                def update_name(rtp_transport, name):
                    rtp_transport.zrtp_peer_name = name if isinstance(name, bytes) else name.encode()
                    notification_center = NotificationCenter()
                    notification_center.post_notification('RTPStreamZRTPPeerNameChanged', sender=self._stream, data=NotificationData(name=name))
                self.__dict__['peer_name'] = name
                update_name(rtp_transport, name)

    @property
    def master(self):
        return self.__dict__['master']

    @master.setter
    def master(self, master):
        old_master = self.__dict__['master']
        if old_master is master:
            return
        notification_center = NotificationCenter()
        if old_master is not None:
            notification_center.remove_observer(self, sender=old_master)
        if master is not None:
            notification_center.add_observer(self, sender=master)
        self.__dict__['master'] = master

    def _enable(self, master_stream=None):
        rtp_transport = self._stream._rtp_transport
        if rtp_transport is None:
            return
        if master_stream is not None and not (master_stream.encryption.active and master_stream.encryption.type == 'ZRTP'):
            raise RuntimeError('Master stream must have ZRTP encryption activated')
        rtp_transport.set_zrtp_enabled(True, master_stream)
        self.master = master_stream

    def handle_notification(self, notification):
        handler = getattr(self, '_NH_%s' % notification.name, Null)
        handler(notification)

    def _NH_RTPStreamZRTPReceivedSAS(self, notification):
        # ZRTP begins on the audio stream, so this notification will only be processed
        # by the other streams
        self.__dict__['sas'] = notification.data.sas
        self.__dict__['verified'] = notification.data.verified
        self.__dict__['peer_name'] = notification.data.peer_name
        notification.center.post_notification(notification.name, sender=self._stream, data=notification.data)

    def _NH_RTPStreamZRTPVerifiedStateChanged(self, notification):
        self.__dict__['verified'] = notification.data.verified
        notification.center.post_notification(notification.name, sender=self._stream, data=notification.data)

    def _NH_RTPStreamZRTPPeerNameChanged(self, notification):
        self.__dict__['peer_name'] = notification.data.name
        notification.center.post_notification(notification.name, sender=self._stream, data=notification.data)

    def _NH_MediaStreamDidEnd(self, notification):
        self.master = None


@implementer(IObserver)
class RTPStreamEncryption(object):

    def __init__(self, stream):
        self._stream_ref = weakref.ref(stream)  # Keep a weak reference before the stream is initialized to avoid a memory cycle that would delay releasing audio resources
        self._stream = None                     # We will store the actual reference once it's initialized and we're guaranteed to get MediaStreamDidEnd and do the cleanup
        self._rtp_transport = None

        self.__dict__['type'] = None
        self.__dict__['zrtp'] = None

        notification_center = NotificationCenter()
        notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize')
        notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize')

    @property
    def active(self):
        rtp_transport = self._rtp_transport
        if rtp_transport is None:
            return False
        if self.type == 'SRTP/SDES':
            return rtp_transport.srtp_active
        elif self.type == 'ZRTP':
            return rtp_transport.zrtp_active
        return False

    @property
    def cipher(self):
        rtp_transport = self._rtp_transport
        if rtp_transport is None:
            return None
        if self.type == 'SRTP/SDES':
            return rtp_transport.srtp_cipher
        elif self.type == 'ZRTP':
            return rtp_transport.zrtp_cipher
        return None

    @property
    def type(self):
        return self.__dict__['type']

    @property
    def zrtp(self):
        zrtp = self.__dict__['zrtp']
        if zrtp is None:
            raise RuntimeError('ZRTP options have not been initialized')
        return zrtp

    def handle_notification(self, notification):
        handler = getattr(self, '_NH_%s' % notification.name, Null)
        handler(notification)

    def _NH_MediaStreamDidInitialize(self, notification):
        stream = notification.sender
        if stream is self._stream_ref():
            self._stream = stream
            self._rtp_transport = stream._rtp_transport
            notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize')
            notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize')
            notification.center.add_observer(self, sender=self._stream)
            notification.center.add_observer(self, sender=self._rtp_transport)
            encryption = stream._srtp_encryption or ''
            if encryption.startswith('sdes'):
                self.__dict__['type'] = 'SRTP/SDES'
            elif encryption == 'zrtp':
                self.__dict__['type'] = 'ZRTP'
                self.__dict__['zrtp'] = ZRTPStreamOptions(stream)
            elif encryption == 'opportunistic':
                # The underlying RTPTransport is the stacked chain
                # UDP -> ZRTP -> SRTP(OPTIONAL). We don't know yet
                # which keying will be selected -- that is decided
                # by the remote answer. Type is resolved later in
                # _resolve_opportunistic_type(), called just before
                # MediaStreamDidStart fires. ZRTP options are
                # pre-allocated so the ZRTP leg has its plumbing
                # ready if it ends up winning.
                self.__dict__['type'] = None
                self.__dict__['zrtp'] = ZRTPStreamOptions(stream)

    def _NH_MediaStreamDidNotInitialize(self, notification):
        if notification.sender is self._stream_ref():
            notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize')
            notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize')
            self._stream_ref = None
            self._stream = None

    def _resolve_opportunistic_type(self):
        # Called by the stream just before MediaStreamDidStart is posted
        # (from AudioStream.start() / VideoStream.start() and from the ICE
        # negotiation handlers in RTPStream). After this returns, self.type
        # will be 'SRTP/SDES', 'ZRTP', or None (neither leg available).
        # Calling it more than once is safe; only the first call has effect.
        if self.__dict__['type'] is not None:
            return
        rtp_transport = self._rtp_transport
        if rtp_transport is None:
            return
        # PJSIP populates srtp_info after pjmedia_transport_media_start
        # has consumed the remote SDP. For the stacked transport, that
        # happens inside AudioTransport.start() (or VideoTransport.start()),
        # which the stream calls before posting MediaStreamDidStart.
        if rtp_transport.srtp_active:
            # The remote answered with a=crypto. SDES won; the ZRTP layer
            # below stays dormant because we never call set_zrtp_enabled.
            self.__dict__['type'] = 'SRTP/SDES'
            self.__dict__['zrtp'] = None
        else:
            # SDES did not negotiate. Fall back to ZRTP -- the existing
            # session-level _NH_MediaStreamDidStart handler will then call
            # stream.encryption.zrtp._enable() and the ZRTP transport will
            # take it from there (success fires RTPTransportZRTPSecureOn,
            # failure fires RTPTransportZRTPNotSupportedByRemote, both of
            # which the existing observer methods already handle).
            self.__dict__['type'] = 'ZRTP'

    def _NH_MediaStreamDidStart(self, notification):
        # Defensive: if the stream forgot to resolve the type for the
        # opportunistic chain, resolve it now so that downstream observers
        # (notably Session._NH_MediaStreamDidStart, which checks
        # encryption.type to decide whether to enable ZRTP) see a final
        # value.
        if self.type is None:
            self._resolve_opportunistic_type()
        if self.type == 'SRTP/SDES':
            stream = notification.sender
            if self.active:
                notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream)
            else:
                reason = 'Not supported by remote'
                notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason))

    def _NH_MediaStreamDidEnd(self, notification):
        notification.center.remove_observer(self, sender=self._stream)
        notification.center.remove_observer(self, sender=self._rtp_transport)
        self._stream = None
        self._stream_ref = None
        self._rtp_transport = None
        self.__dict__['type'] = None
        self.__dict__['zrtp'] = None

    def _NH_RTPTransportZRTPSecureOn(self, notification):
        stream = self._stream
        with stream._lock:
            if stream.state == "ENDED":
                return
        notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream)

    def _NH_RTPTransportZRTPSecureOff(self, notification):
        # We should never get here because we don't allow disabling encryption -Saul
        pass

    def _NH_RTPTransportZRTPReceivedSAS(self, notification):
        stream = self._stream
        with stream._lock:
            if stream.state == "ENDED":
                return
        self.zrtp.__dict__['sas'] = sas = notification.data.sas
        self.zrtp.__dict__['verified'] = verified = notification.data.verified
        self.zrtp.__dict__['peer_name'] = peer_name = notification.sender.zrtp_peer_name
        notification.center.post_notification('RTPStreamZRTPReceivedSAS', sender=stream, data=NotificationData(sas=sas.decode(), verified=verified, peer_name=peer_name))

    def _NH_RTPTransportZRTPLog(self, notification):
        stream = self._stream
        with stream._lock:
            if stream.state == "ENDED":
                return
        notification.center.post_notification('RTPStreamZRTPLog', sender=stream, data=notification.data)

    def _NH_RTPTransportZRTPNegotiationFailed(self, notification):
        stream = self._stream
        with stream._lock:
            if stream.state == "ENDED":
                return
        reason = 'Negotiation failed: %s' % notification.data.reason
        notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason))

    def _NH_RTPTransportZRTPNotSupportedByRemote(self, notification):
        stream = self._stream
        with stream._lock:
            if stream.state == "ENDED":
                return
        reason = 'ZRTP not supported by remote'
        notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason))


class RTPStreamType(ABCMeta, MediaStreamType):
    pass


@implementer(IMediaStream, IObserver)
class RTPStream(object, metaclass=RTPStreamType):

    type = None
    priority = None

    hold_supported = True

    def __init__(self):
        self.notification_center = NotificationCenter()
        self.on_hold_by_local = False
        self.on_hold_by_remote = False
        self.direction = None
        self.state = "NULL"
        self.session = None
        self.encryption = RTPStreamEncryption(self)

        self._transport = None
        self._hold_request = None
        self._ice_state = "NULL"
        self._lock = RLock()
        self._rtp_transport = None

        self._try_ice = False
        self._srtp_encryption = None
        self._remote_rtp_address_sdp = None
        self._remote_rtp_port_sdp = None

        self._initialized = False
        self._done = False
        self._failure_reason = None

    @property
    def codec(self):
        return self._transport.codec.decode() if (self._transport and self._transport.codec) else None

    @property
    def sample_rate(self):
        return self._transport.sample_rate if self._transport else None

    @property
    def statistics(self):
        return self._transport.statistics if self._transport else None

    @property
    def local_rtp_address(self):
        return self._rtp_transport.local_rtp_address.decode() if (self._rtp_transport and self._rtp_transport.local_rtp_address) else None

    @property
    def local_rtp_port(self):
        return self._rtp_transport.local_rtp_port if self._rtp_transport else None

    @property
    def local_rtp_candidate(self):
        return self._rtp_transport.local_rtp_candidate if self._rtp_transport else None

    @property
    def remote_rtp_address(self):
        if self._ice_state == "IN_USE":
            return self._rtp_transport.remote_rtp_address if self._rtp_transport else None
        return self._remote_rtp_address_sdp.decode() if self._remote_rtp_address_sdp else None

    @property
    def remote_rtp_port(self):
        if self._ice_state == "IN_USE":
            return self._rtp_transport.remote_rtp_port if self._rtp_transport else None
        return self._remote_rtp_port_sdp if self._rtp_transport else None

    @property
    def remote_rtp_candidate(self):
        return self._rtp_transport.remote_rtp_candidate if self._rtp_transport else None

    @property
    def ice_active(self):
        return self._ice_state == "IN_USE"

    @property
    def on_hold(self):
        return self.on_hold_by_local or self.on_hold_by_remote

    # ---- Sylk AEAD (AES-128-GCM on RTP payload, above SRTP) -----------
    #
    # The underlying RTPTransport always wraps its SRTP transport with the
    # Sylk AEAD adapter at create time. The adapter sits passthrough until
    # set_aead_keys is called with the HKDF-derived per-direction keys.
    # After that call, outgoing RTP is AES-128-GCM-encrypted on the payload
    # before SRTP wraps it, and incoming RTP is decrypted after SRTP unwraps.
    # See sipsimple/core/_core.mediatransport.pxi:RTPTransport.set_aead_keys.

    def set_aead_keys(self, send_key, send_salt, recv_key, recv_salt,
                      key_id=1, video_prefix=0):
        """Install per-direction AES-128-GCM keys on this stream's adapter.

        video_prefix is the number of codec-metadata bytes left plaintext
        at the start of each RTP payload (audio = 0, VP8/VP9 = 3, H264 = 2,
        AV1 = 1). Must match what the peer's decryptor uses or every frame
        fails its GCM tag and falls through to permissive passthrough.
        """
        rtp_transport = self._rtp_transport
        if rtp_transport is None:
            raise RuntimeError('cannot set AEAD keys: stream has no RTP transport yet')
        rtp_transport.set_aead_keys(send_key, send_salt, recv_key, recv_salt,
                                    key_id, video_prefix)

    def get_aead_stats(self):
        """(encrypted_frames, decrypted_frames, passthrough_frames) — see RTPTransport."""
        rtp_transport = self._rtp_transport
        if rtp_transport is None:
            return (0, 0, 0)
        return rtp_transport.get_aead_stats()

    @abstractmethod
    def start(self, local_sdp, remote_sdp, stream_index):
        raise NotImplementedError

    @abstractmethod
    def update(self, local_sdp, remote_sdp, stream_index):
        raise NotImplementedError

    @abstractmethod
    def validate_update(self, remote_sdp, stream_index):
        raise NotImplementedError

    @abstractmethod
    def deactivate(self):
        raise NotImplementedError

    @abstractmethod
    def end(self):
        raise NotImplementedError

    @abstractmethod
    def reset(self, stream_index):
        raise NotImplementedError

    def hold(self):
        with self._lock:
            if self.on_hold_by_local or self._hold_request == 'hold':
                return
            if self.state == "ESTABLISHED" and self.direction != "inactive":
                self._pause()
            self._hold_request = 'hold'

    def unhold(self):
        with self._lock:
            if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold':
                return
            if self.state == "ESTABLISHED" and self._hold_request == 'hold':
                self._resume()
            self._hold_request = None if self._hold_request == 'hold' else 'unhold'

    @classmethod
    def new_from_sdp(cls, session, remote_sdp, stream_index):
        # TODO: actually validate the SDP
        settings = SIPSimpleSettings()
        remote_stream = remote_sdp.media[stream_index]
        if remote_stream.media != cls.type.encode():
            raise UnknownStreamError
        if remote_stream.transport not in (b'RTP/AVP', b'RTP/SAVP'):
            raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in %s stream, got %s" % (cls.type, remote_stream.transport.decode()))
        local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None
        
        if local_encryption_policy == "sdes_mandatory" and not b'crypto' in remote_stream.attributes:
            raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled")
        if remote_stream.transport == b'RTP/SAVP' and b'crypto' in remote_stream.attributes and local_encryption_policy not in ("opportunistic", "sdes_optional", "sdes_mandatory"):
            raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled")
        account_preferred_codecs = getattr(session.account.rtp, '%s_codec_list' % cls.type)
        general_codecs = getattr(settings.rtp, '%s_codec_list' % cls.type)
        supported_codecs = account_preferred_codecs or general_codecs
        if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs):
            raise InvalidStreamError("no compatible codecs found")
        stream = cls()
        stream._incoming_remote_sdp = remote_sdp
        stream._incoming_stream_index = stream_index
        return stream

    def initialize(self, session, direction):
        with self._lock:
            if self.state != "NULL":
                raise RuntimeError("%sStream.initialize() may only be called in the NULL state" % self.type.capitalize())
            self.state = "INITIALIZING"
            self.session = session
            local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None
            if hasattr(self, "_incoming_remote_sdp") and hasattr(self, '_incoming_stream_index'):
                # ICE attributes could come at the session level or at the media level
                remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index]
                self._try_ice = self.session.account.nat_traversal.use_ice and ((remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates)
                if b'zrtp-hash' in remote_stream.attributes:
                    incoming_stream_encryption = 'zrtp'
                elif b'crypto' in remote_stream.attributes:
                    incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == b'RTP/SAVP' else 'sdes_optional'
                else:
                    incoming_stream_encryption = None
                if incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic':
                    # We already know what the remote offered; pick that
                    # specific keying so the transport is built as a single
                    # SRTP or ZRTP wrapper (cheaper than the stacked chain).
                    self._srtp_encryption = incoming_stream_encryption
                else:
                    # No incoming SDP yet. Pass 'opportunistic' through so the
                    # RTPTransport builds a stacked chain (UDP -> ZRTP -> SRTP
                    # in OPTIONAL mode) and offers both a=zrtp-hash and
                    # a=crypto in the INVITE.
                    self._srtp_encryption = local_encryption_policy
            else:
                self._try_ice = self.session.account.nat_traversal.use_ice
                # Outgoing call without an incoming SDP. Same reasoning as above.
                self._srtp_encryption = local_encryption_policy

            if self._try_ice:
                if self.session.account.nat_traversal.stun_server_list:
                    stun_servers = list((server.host, server.port) for server in self.session.account.nat_traversal.stun_server_list)
                    self._init_rtp_transport(stun_servers)
                elif not isinstance(self.session.account, BonjourAccount):
                    dns_lookup = DNSLookup()
                    self.notification_center.add_observer(self, sender=dns_lookup)
                    dns_lookup.lookup_service(SIPURI(self.session.account.id.domain), "stun")
            else:
                self._init_rtp_transport()

    def get_local_media(self, remote_sdp=None, index=0):
        with self._lock:
            if self.state not in ("INITIALIZED", "WAIT_ICE", "ESTABLISHED"):
                raise RuntimeError("%sStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE  or ESTABLISHED states" % self.type.capitalize())
            if remote_sdp is None:
                # offer
                old_direction = self._transport.direction
                if old_direction is None:
                    new_direction = "sendrecv"
                elif b"send" in old_direction:
                    new_direction = ("sendonly" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "sendrecv")
                else:
                    new_direction = ("inactive" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "recvonly")
            else:
                new_direction = None
            new_direction = new_direction.encode() if new_direction else None
            return self._transport.get_local_media(remote_sdp, index, new_direction)

    # Notifications

    def handle_notification(self, notification):
        handler = getattr(self, '_NH_%s' % notification.name, Null)
        handler(notification)

    def _NH_DNSLookupDidFail(self, notification):
        self.notification_center.remove_observer(self, sender=notification.sender)
        with self._lock:
            if self.state == "ENDED":
                return
        self._init_rtp_transport()

    def _NH_DNSLookupDidSucceed(self, notification):
        self.notification_center.remove_observer(self, sender=notification.sender)
        with self._lock:
            if self.state == "ENDED":
                return
        self._init_rtp_transport(notification.data.result)

    def _NH_RTPTransportDidInitialize(self, notification):
        rtp_transport = notification.sender
        with self._lock:
            if self.state == "ENDED":
                self.notification_center.remove_observer(self, sender=rtp_transport)
                return
            del self._rtp_args
            del self._stun_servers
            remote_sdp = self.__dict__.pop('_incoming_remote_sdp', None)
            stream_index = self.__dict__.pop('_incoming_stream_index', None)
            try:
                if remote_sdp is not None:
                    transport = self._create_transport(rtp_transport, remote_sdp=remote_sdp, stream_index=stream_index)
                    self._save_remote_sdp_rtp_info(remote_sdp, stream_index)
                else:
                    transport = self._create_transport(rtp_transport)
            except SIPCoreError as e:
                self.state = "ENDED"
                self.notification_center.remove_observer(self, sender=rtp_transport)
                self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0]))
                return
            self._rtp_transport = rtp_transport
            self._transport = transport
            self.notification_center.add_observer(self, sender=transport)
            self._initialized = True
            self.state = "INITIALIZED"
            self.notification_center.post_notification('MediaStreamDidInitialize', sender=self)

    def _NH_RTPTransportDidFail(self, notification):
        self.notification_center.remove_observer(self, sender=notification.sender)
        with self._lock:
            if self.state == "ENDED":
                return
        self._try_next_rtp_transport(notification.data.reason)

    def _NH_RTPTransportICENegotiationStateDidChange(self, notification):
        with self._lock:
            if self._ice_state != "NULL" or self.state not in ("INITIALIZING", "INITIALIZED", "WAIT_ICE"):
                return
        self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data)

    def _NH_RTPTransportICENegotiationDidSucceed(self, notification):
        with self._lock:
            if self.state != "WAIT_ICE":
                return
            self._ice_state = "IN_USE"
            self.state = 'ESTABLISHED'
        self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data)
        # Make sure encryption.type is resolved before MediaStreamDidStart
        # so other observers (e.g. Session) see the final value.
        self.encryption._resolve_opportunistic_type()
        self.notification_center.post_notification('MediaStreamDidStart', sender=self)

    def _NH_RTPTransportICENegotiationDidFail(self, notification):
        with self._lock:
            if self.state != "WAIT_ICE":
                return
            self._ice_state = "FAILED"
            self.state = 'ESTABLISHED'
        self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data)
        # Make sure encryption.type is resolved before MediaStreamDidStart
        # so other observers (e.g. Session) see the final value.
        self.encryption._resolve_opportunistic_type()
        self.notification_center.post_notification('MediaStreamDidStart', sender=self)

    # Private methods

    def _init_rtp_transport(self, stun_servers=None):
        self._rtp_args = dict()
        self._rtp_args["encryption"] = self._srtp_encryption
        self._rtp_args["use_ice"] = self._try_ice
        self._stun_servers = [(None, None)]
        if stun_servers:
            self._stun_servers.extend(reversed(stun_servers))
        self._try_next_rtp_transport()

    def _try_next_rtp_transport(self, failure_reason=None):
        if self._stun_servers:
            stun_address, stun_port = self._stun_servers.pop()
            try:
                stun_address = stun_address.encode() if stun_address else None
                rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args)
            except SIPCoreError as e:
                self._try_next_rtp_transport(e.args[0])
            else:
                self.notification_center.add_observer(self, sender=rtp_transport)
                try:
                    rtp_transport.set_INIT()
                except SIPCoreError as e:
                    self.notification_center.remove_observer(self, sender=rtp_transport)
                    self._try_next_rtp_transport(e.args[0])
        else:
            self.state = "ENDED"
            self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason))

    def _save_remote_sdp_rtp_info(self, remote_sdp, index):
        connection = remote_sdp.media[index].connection or remote_sdp.connection
        self._remote_rtp_address_sdp = connection.address
        self._remote_rtp_port_sdp = remote_sdp.media[index].port

    @abstractmethod
    def _create_transport(self, rtp_transport, remote_sdp=None, stream_index=None):
        raise NotImplementedError

    @abstractmethod
    def _check_hold(self, direction, is_initial):
        raise NotImplementedError

    @abstractmethod
    def _pause(self):
        raise NotImplementedError

    @abstractmethod
    def _resume(self):
        raise NotImplementedError


from sipsimple.streams.rtp import audio, video

