Repository URL to install this package:
# Copyright 2014 Globo.com Player authors. All rights reserved.
# Use of this source code is governed by a MIT License
# license that can be found in the LICENSE file.
import decimal
import os
from .mixins import BasePathMixin, GroupedBasePathMixin
from .parser import format_date_time, parse
from .protocol import (
ext_oatcls_scte35,
ext_x_asset,
ext_x_key,
ext_x_map,
ext_x_session_key,
ext_x_start,
)
class MalformedPlaylistError(Exception):
pass
class M3U8:
"""
Represents a single M3U8 playlist. Should be instantiated with
the content as string.
Parameters:
`content`
the m3u8 content as string
`base_path`
all urls (key and segments url) will be updated with this base_path,
ex.:
base_path = "http://videoserver.com/hls"
/foo/bar/key.bin --> http://videoserver.com/hls/key.bin
http://vid.com/segment1.ts --> http://videoserver.com/hls/segment1.ts
can be passed as parameter or setted as an attribute to ``M3U8`` object.
`base_uri`
uri the playlist comes from. it is propagated to SegmentList and Key
ex.: http://example.com/path/to
Attributes:
`keys`
Returns the list of `Key` objects used to encrypt the segments from m3u8.
It covers the whole list of possible situations when encryption either is
used or not.
1. No encryption.
`keys` list will only contain a `None` element.
2. Encryption enabled for all segments.
`keys` list will contain the key used for the segments.
3. No encryption for first element(s), encryption is applied afterwards
`keys` list will contain `None` and the key used for the rest of segments.
4. Multiple keys used during the m3u8 manifest.
`keys` list will contain the key used for each set of segments.
`session_keys`
Returns the list of `SessionKey` objects used to encrypt multiple segments from m3u8.
`segments`
a `SegmentList` object, represents the list of `Segment`s from this playlist
`is_variant`
Returns true if this M3U8 is a variant playlist, with links to
other M3U8s with different bitrates.
If true, `playlists` is a list of the playlists available,
and `iframe_playlists` is a list of the i-frame playlists available.
`is_endlist`
Returns true if EXT-X-ENDLIST tag present in M3U8.
http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.8
`playlists`
If this is a variant playlist (`is_variant` is True), returns a list of
Playlist objects
`iframe_playlists`
If this is a variant playlist (`is_variant` is True), returns a list of
IFramePlaylist objects
`playlist_type`
A lower-case string representing the type of the playlist, which can be
one of VOD (video on demand) or EVENT.
`media`
If this is a variant playlist (`is_variant` is True), returns a list of
Media objects
`target_duration`
Returns the EXT-X-TARGETDURATION as an integer
http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.2
`media_sequence`
Returns the EXT-X-MEDIA-SEQUENCE as an integer
http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.3
`program_date_time`
Returns the EXT-X-PROGRAM-DATE-TIME as a string
http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
`version`
Return the EXT-X-VERSION as is
`allow_cache`
Return the EXT-X-ALLOW-CACHE as is
`files`
Returns an iterable with all files from playlist, in order. This includes
segments and key uri, if present.
`base_uri`
It is a property (getter and setter) used by
SegmentList and Key to have absolute URIs.
`is_i_frames_only`
Returns true if EXT-X-I-FRAMES-ONLY tag present in M3U8.
http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.12
`is_independent_segments`
Returns true if EXT-X-INDEPENDENT-SEGMENTS tag present in M3U8.
https://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.16
`image_playlists`
If this is a variant playlist (`is_variant` is True), returns a list of
ImagePlaylist objects
`is_images_only`
Returns true if EXT-X-IMAGES-ONLY tag present in M3U8.
https://github.com/image-media-playlist/spec/blob/master/image_media_playlist_v0_4.pdf
"""
simple_attributes = (
# obj attribute # parser attribute
("is_variant", "is_variant"),
("is_endlist", "is_endlist"),
("is_i_frames_only", "is_i_frames_only"),
("target_duration", "targetduration"),
("media_sequence", "media_sequence"),
("program_date_time", "program_date_time"),
("is_independent_segments", "is_independent_segments"),
("version", "version"),
("allow_cache", "allow_cache"),
("playlist_type", "playlist_type"),
("discontinuity_sequence", "discontinuity_sequence"),
("is_images_only", "is_images_only"),
)
def __init__(
self,
content=None,
base_path=None,
base_uri=None,
strict=False,
custom_tags_parser=None,
):
if content is not None:
self.data = parse(content, strict, custom_tags_parser)
else:
self.data = {}
self._base_uri = base_uri
if self._base_uri:
if not self._base_uri.endswith("/"):
self._base_uri += "/"
self._initialize_attributes()
self.base_path = base_path
def _initialize_attributes(self):
self.keys = [
Key(base_uri=self.base_uri, **params) if params else None
for params in self.data.get("keys", [])
]
self.segment_map = [
InitializationSection(base_uri=self.base_uri, **params) if params else None
for params in self.data.get("segment_map", [])
]
self.segments = SegmentList(
[
Segment(
base_uri=self.base_uri,
keyobject=find_key(segment.get("key", {}), self.keys),
**segment
)
for segment in self.data.get("segments", [])
]
)
for attr, param in self.simple_attributes:
setattr(self, attr, self.data.get(param))
for i, segment in enumerate(self.segments, self.media_sequence or 0):
segment.media_sequence = i
self.files = []
for key in self.keys:
# Avoid None key, it could be the first one, don't repeat them
if key and key.uri not in self.files:
self.files.append(key.uri)
self.files.extend(self.segments.uri)
self.media = MediaList(
[
Media(base_uri=self.base_uri, **media)
for media in self.data.get("media", [])
]
)
self.playlists = PlaylistList(
[
Playlist(base_uri=self.base_uri, media=self.media, **playlist)
for playlist in self.data.get("playlists", [])
]
)
self.iframe_playlists = PlaylistList()
for ifr_pl in self.data.get("iframe_playlists", []):
self.iframe_playlists.append(
IFramePlaylist(
base_uri=self.base_uri,
uri=ifr_pl["uri"],
iframe_stream_info=ifr_pl["iframe_stream_info"],
)
)
self.image_playlists = PlaylistList()
for img_pl in self.data.get("image_playlists", []):
self.image_playlists.append(
ImagePlaylist(
base_uri=self.base_uri,
uri=img_pl["uri"],
image_stream_info=img_pl["image_stream_info"],
)
)
start = self.data.get("start", None)
self.start = start and Start(**start)
server_control = self.data.get("server_control", None)
self.server_control = server_control and ServerControl(**server_control)
part_inf = self.data.get("part_inf", None)
self.part_inf = part_inf and PartInformation(**part_inf)
skip = self.data.get("skip", None)
self.skip = skip and Skip(**skip)
self.rendition_reports = RenditionReportList(
[
RenditionReport(base_uri=self.base_uri, **rendition_report)
for rendition_report in self.data.get("rendition_reports", [])
]
)
self.session_data = SessionDataList(
[
SessionData(**session_data)
for session_data in self.data.get("session_data", [])
if "data_id" in session_data
]
)
self.session_keys = [
SessionKey(base_uri=self.base_uri, **params) if params else None
for params in self.data.get("session_keys", [])
]
preload_hint = self.data.get("preload_hint", None)
self.preload_hint = preload_hint and PreloadHint(
base_uri=self.base_uri, **preload_hint
)
content_steering = self.data.get("content_steering", None)
self.content_steering = content_steering and ContentSteering(
base_uri=self.base_uri, **content_steering
)
def __unicode__(self):
return self.dumps()
@property
def base_uri(self):
return self._base_uri
@base_uri.setter
def base_uri(self, new_base_uri):
self._base_uri = new_base_uri
self.media.base_uri = new_base_uri
self.playlists.base_uri = new_base_uri
self.iframe_playlists.base_uri = new_base_uri
self.segments.base_uri = new_base_uri
self.rendition_reports.base_uri = new_base_uri
self.image_playlists.base_uri = new_base_uri
for key in self.keys:
if key:
key.base_uri = new_base_uri
for key in self.session_keys:
if key:
key.base_uri = new_base_uri
if self.preload_hint:
self.preload_hint.base_uri = new_base_uri
if self.content_steering:
self.content_steering.base_uri = new_base_uri
@property
def base_path(self):
return self._base_path
@base_path.setter
def base_path(self, newbase_path):
self._base_path = newbase_path
self._update_base_path()
def _update_base_path(self):
if self._base_path is None:
return
for key in self.keys:
if key:
key.base_path = self._base_path
for key in self.session_keys:
if key:
key.base_path = self._base_path
self.media.base_path = self._base_path
self.segments.base_path = self._base_path
self.playlists.base_path = self._base_path
self.iframe_playlists.base_path = self._base_path
self.image_playlists.base_path = self._base_path
self.rendition_reports.base_path = self._base_path
if self.preload_hint:
self.preload_hint.base_path = self._base_path
if self.content_steering:
self.content_steering.base_path = self._base_path
def add_playlist(self, playlist):
self.is_variant = True
self.playlists.append(playlist)
def add_iframe_playlist(self, iframe_playlist):
if iframe_playlist is not None:
self.is_variant = True
self.iframe_playlists.append(iframe_playlist)
def add_image_playlist(self, image_playlist):
if image_playlist is not None:
self.is_variant = True
self.image_playlists.append(image_playlist)
def add_media(self, media):
self.media.append(media)
def add_segment(self, segment):
self.segments.append(segment)
def add_rendition_report(self, report):
self.rendition_reports.append(report)
def dumps(self, timespec="milliseconds"):
"""
Returns the current m3u8 as a string.
You could also use unicode(<this obj>) or str(<this obj>)
"""
output = ["#EXTM3U"]
if self.content_steering:
output.append(str(self.content_steering))
if self.media_sequence:
output.append("#EXT-X-MEDIA-SEQUENCE:" + str(self.media_sequence))
if self.discontinuity_sequence:
output.append(
"#EXT-X-DISCONTINUITY-SEQUENCE:{}".format(self.discontinuity_sequence)
)
if self.allow_cache:
output.append("#EXT-X-ALLOW-CACHE:" + self.allow_cache.upper())
if self.version:
output.append("#EXT-X-VERSION:" + str(self.version))
if self.is_independent_segments:
output.append("#EXT-X-INDEPENDENT-SEGMENTS")
if self.target_duration:
output.append(
"#EXT-X-TARGETDURATION:" + number_to_string(self.target_duration)
)
if not (self.playlist_type is None or self.playlist_type == ""):
output.append("#EXT-X-PLAYLIST-TYPE:%s" % str(self.playlist_type).upper())
if self.start:
output.append(str(self.start))
if self.is_i_frames_only:
output.append("#EXT-X-I-FRAMES-ONLY")
if self.is_images_only:
output.append("#EXT-X-IMAGES-ONLY")
if self.server_control:
output.append(str(self.server_control))
if self.is_variant:
if self.media:
output.append(str(self.media))
output.append(str(self.playlists))
if self.iframe_playlists:
output.append(str(self.iframe_playlists))
if self.image_playlists:
output.append(str(self.image_playlists))
if self.part_inf:
output.append(str(self.part_inf))
if self.skip:
output.append(str(self.skip))
if self.session_data:
output.append(str(self.session_data))
for key in self.session_keys:
output.append(str(key))
output.append(self.segments.dumps(timespec))
if self.preload_hint:
output.append(str(self.preload_hint))
if self.rendition_reports:
output.append(str(self.rendition_reports))
if self.is_endlist:
output.append("#EXT-X-ENDLIST")
# ensure that the last line is terminated correctly
if output[-1] and not output[-1].endswith("\n"):
output.append("")
return "\n".join(output)
def dump(self, filename):
"""
Saves the current m3u8 to ``filename``
"""
self._create_sub_directories(filename)
with open(filename, "w") as fileobj:
fileobj.write(self.dumps())
def _create_sub_directories(self, filename):
basename = os.path.dirname(filename)
if basename:
os.makedirs(basename, exist_ok=True)
class Segment(BasePathMixin):
"""
A video segment from a M3U8 playlist
`uri`
a string with the segment uri
`title`
title attribute from EXTINF parameter
`program_date_time`
Returns the EXT-X-PROGRAM-DATE-TIME as a datetime. This field is only set
if EXT-X-PROGRAM-DATE-TIME exists for this segment
http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
`current_program_date_time`
Returns a datetime of this segment, either the value of `program_date_time`
when EXT-X-PROGRAM-DATE-TIME is set or a calculated value based on previous
segments' EXT-X-PROGRAM-DATE-TIME and EXTINF values
`discontinuity`
Returns a boolean indicating if a EXT-X-DISCONTINUITY tag exists
http://tools.ietf.org/html/draft-pantos-http-live-streaming-13#section-3.4.11
`cue_out_start`
Returns a boolean indicating if a EXT-X-CUE-OUT tag exists
`cue_out`
Returns a boolean indicating if a EXT-X-CUE-OUT-CONT tag exists
Note: for backwards compatibility, this will be True when cue_out_start
is True, even though this tag did not exist in the input, and
EXT-X-CUE-OUT-CONT will not exist in the output
`cue_in`
Returns a boolean indicating if a EXT-X-CUE-IN tag exists
`scte35`
Base64 encoded SCTE35 metadata if available
`scte35_duration`
Planned SCTE35 duration
`duration`
duration attribute from EXTINF parameter
`base_uri`
uri the key comes from in URI hierarchy. ex.: http://example.com/path/to
`bitrate`
bitrate attribute from EXT-X-BITRATE parameter
`byterange`
byterange attribute from EXT-X-BYTERANGE parameter
`key`
Key used to encrypt the segment (EXT-X-KEY)
`parts`
partial segments that make up this segment
`dateranges`
any dateranges that should precede the segment
`gap_tag`
GAP tag indicates that a Media Segment is missing
`custom_parser_values`
Additional values which custom_tags_parser might store per segment
"""
def __init__(
self,
uri=None,
base_uri=None,
program_date_time=None,
current_program_date_time=None,
duration=None,
title=None,
bitrate=None,
byterange=None,
cue_out=False,
cue_out_start=False,
cue_in=False,
discontinuity=False,
key=None,
scte35=None,
oatcls_scte35=None,
scte35_duration=None,
scte35_elapsedtime=None,
asset_metadata=None,
keyobject=None,
parts=None,
init_section=None,
dateranges=None,
gap_tag=None,
media_sequence=None,
custom_parser_values=None,
):
self.media_sequence = media_sequence
self.uri = uri
self.duration = duration
self.title = title
self._base_uri = base_uri
self.bitrate = bitrate
self.byterange = byterange
self.program_date_time = program_date_time
self.current_program_date_time = current_program_date_time
self.discontinuity = discontinuity
self.cue_out_start = cue_out_start
self.cue_out = cue_out
self.cue_in = cue_in
self.scte35 = scte35
self.oatcls_scte35 = oatcls_scte35
self.scte35_duration = scte35_duration
self.scte35_elapsedtime = scte35_elapsedtime
self.asset_metadata = asset_metadata
self.key = keyobject
self.parts = PartialSegmentList(
[PartialSegment(base_uri=self._base_uri, **partial) for partial in parts]
if parts
else []
)
if init_section is not None:
self.init_section = InitializationSection(self._base_uri, **init_section)
else:
self.init_section = None
self.dateranges = DateRangeList(
[DateRange(**daterange) for daterange in dateranges] if dateranges else []
)
self.gap_tag = gap_tag
self.custom_parser_values = custom_parser_values or {}
def add_part(self, part):
self.parts.append(part)
def dumps(self, last_segment, timespec="milliseconds"):
output = []
if last_segment and self.key != last_segment.key:
output.append(str(self.key))
output.append("\n")
else:
# The key must be checked anyway now for the first segment
if self.key and last_segment is None:
output.append(str(self.key))
output.append("\n")
if last_segment and self.init_section != last_segment.init_section:
if not self.init_section:
raise MalformedPlaylistError(
"init section can't be None if previous is not None"
)
output.append(str(self.init_section))
output.append("\n")
else:
if self.init_section and last_segment is None:
output.append(str(self.init_section))
output.append("\n")
if self.discontinuity:
output.append("#EXT-X-DISCONTINUITY\n")
if self.program_date_time:
output.append(
"#EXT-X-PROGRAM-DATE-TIME:%s\n"
% format_date_time(self.program_date_time, timespec=timespec)
)
if len(self.dateranges):
output.append(str(self.dateranges))
output.append("\n")
if self.cue_out_start:
if self.oatcls_scte35:
output.append("{}:{}\n".format(ext_oatcls_scte35, self.oatcls_scte35))
if self.asset_metadata:
asset_suffix = []
for metadata_key, metadata_value in self.asset_metadata.items():
asset_suffix.append("{}={}".format(metadata_key.upper(), metadata_value))
output.append("{}:{}\n".format(ext_x_asset, ','.join(asset_suffix)))
output.append(
"#EXT-X-CUE-OUT{}\n".format(
(":" + self.scte35_duration) if self.scte35_duration else ""
)
)
elif self.cue_out:
cue_out_cont_suffix = []
if self.scte35_elapsedtime:
cue_out_cont_suffix.append("ElapsedTime={}".format(self.scte35_elapsedtime))
if self.scte35_duration:
cue_out_cont_suffix.append("Duration={}".format(self.scte35_duration))
if self.scte35:
cue_out_cont_suffix.append("SCTE35={}".format(self.scte35))
if cue_out_cont_suffix:
cue_out_cont_suffix = ":" + ",".join(cue_out_cont_suffix)
else:
cue_out_cont_suffix = ""
output.append("#EXT-X-CUE-OUT-CONT{}\n".format(cue_out_cont_suffix))
if self.cue_in:
output.append("#EXT-X-CUE-IN\n")
if self.parts:
output.append(str(self.parts))
output.append("\n")
if self.uri:
if self.duration is not None:
output.append("#EXTINF:%s," % number_to_string(self.duration))
if self.title:
output.append(self.title)
output.append("\n")
if self.byterange:
output.append("#EXT-X-BYTERANGE:%s\n" % self.byterange)
if self.bitrate:
output.append("#EXT-X-BITRATE:%s\n" % self.bitrate)
if self.gap_tag:
output.append("#EXT-X-GAP\n")
output.append(self.uri)
return "".join(output)
def __str__(self):
return self.dumps(None)
@property
def base_path(self):
return super().base_path
@base_path.setter
def base_path(self, newbase_path):
super(Segment, self.__class__).base_path.fset(self, newbase_path)
self.parts.base_path = newbase_path
if self.init_section is not None:
self.init_section.base_path = newbase_path
@property
def base_uri(self):
return self._base_uri
@base_uri.setter
def base_uri(self, newbase_uri):
self._base_uri = newbase_uri
self.parts.base_uri = newbase_uri
if self.init_section is not None:
self.init_section.base_uri = newbase_uri
class SegmentList(list, GroupedBasePathMixin):
def dumps(self, timespec="milliseconds"):
output = []
last_segment = None
for segment in self:
output.append(segment.dumps(last_segment, timespec))
last_segment = segment
return "\n".join(output)
def __str__(self):
return self.dumps()
@property
def uri(self):
return [seg.uri for seg in self]
def by_key(self, key):
return [segment for segment in self if segment.key == key]
class PartialSegment(BasePathMixin):
"""
A partial segment from a M3U8 playlist
`uri`
a string with the segment uri
`program_date_time`
Returns the EXT-X-PROGRAM-DATE-TIME as a datetime. This field is only set
if EXT-X-PROGRAM-DATE-TIME exists for this segment
http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.5
`current_program_date_time`
Returns a datetime of this segment, either the value of `program_date_time`
when EXT-X-PROGRAM-DATE-TIME is set or a calculated value based on previous
segments' EXT-X-PROGRAM-DATE-TIME and EXTINF values
`duration`
duration attribute from EXTINF parameter
`byterange`
byterange attribute from EXT-X-BYTERANGE parameter
`independent`
the Partial Segment contains an independent frame
`gap`
GAP attribute indicates the Partial Segment is not available
`dateranges`
any dateranges that should precede the partial segment
`gap_tag`
GAP tag indicates one or more of the parent Media Segment's Partial
Segments have a GAP=YES attribute. This tag should appear immediately
after the first EXT-X-PART tag in the Parent Segment with a GAP=YES
attribute.
"""
def __init__(
self,
base_uri,
uri,
duration,
program_date_time=None,
current_program_date_time=None,
byterange=None,
independent=None,
gap=None,
dateranges=None,
gap_tag=None,
):
self.base_uri = base_uri
self.uri = uri
self.duration = duration
self.program_date_time = program_date_time
self.current_program_date_time = current_program_date_time
self.byterange = byterange
self.independent = independent
self.gap = gap
self.dateranges = DateRangeList(
[DateRange(**daterange) for daterange in dateranges] if dateranges else []
)
self.gap_tag = gap_tag
def dumps(self, last_segment):
output = []
if len(self.dateranges):
output.append(str(self.dateranges))
output.append("\n")
if self.gap_tag:
output.append("#EXT-X-GAP\n")
output.append(
'#EXT-X-PART:DURATION=%s,URI="%s"'
% (number_to_string(self.duration), self.uri)
)
if self.independent:
output.append(",INDEPENDENT=%s" % self.independent)
if self.byterange:
output.append(",BYTERANGE=%s" % self.byterange)
if self.gap:
output.append(",GAP=%s" % self.gap)
return "".join(output)
def __str__(self):
return self.dumps(None)
class PartialSegmentList(list, GroupedBasePathMixin):
def __str__(self):
output = [str(part) for part in self]
return "\n".join(output)
class Key(BasePathMixin):
"""
Key used to encrypt the segments in a m3u8 playlist (EXT-X-KEY)
`method`
is a string. ex.: "AES-128"
`uri`
is a string. ex:: "https://priv.example.com/key.php?r=52"
`base_uri`
uri the key comes from in URI hierarchy. ex.: http://example.com/path/to
`iv`
initialization vector. a string representing a hexadecimal number. ex.: 0X12A
"""
tag = ext_x_key
def __init__(
self,
method,
base_uri,
uri=None,
iv=None,
keyformat=None,
keyformatversions=None,
**kwargs
):
self.method = method
self.uri = uri
self.iv = iv
self.keyformat = keyformat
self.keyformatversions = keyformatversions
self.base_uri = base_uri
self._extra_params = kwargs
def __str__(self):
output = [
"METHOD=%s" % self.method,
]
if self.uri:
output.append('URI="%s"' % self.uri)
if self.iv:
output.append("IV=%s" % self.iv)
if self.keyformat:
output.append('KEYFORMAT="%s"' % self.keyformat)
if self.keyformatversions:
output.append('KEYFORMATVERSIONS="%s"' % self.keyformatversions)
return self.tag + ":" + ",".join(output)
def __eq__(self, other):
if not other:
return False
return (
self.method == other.method
and self.uri == other.uri
and self.iv == other.iv
and self.base_uri == other.base_uri
and self.keyformat == other.keyformat
and self.keyformatversions == other.keyformatversions
)
def __ne__(self, other):
return not self.__eq__(other)
class InitializationSection(BasePathMixin):
"""
Used to obtain Media Initialization Section required to
parse the applicable Media Segments (EXT-X-MAP)
`uri`
is a string. ex:: "https://priv.example.com/key.php?r=52"
`byterange`
value of BYTERANGE attribute
`base_uri`
uri the segment comes from in URI hierarchy. ex.: http://example.com/path/to
"""
tag = ext_x_map
def __init__(self, base_uri, uri, byterange=None):
self.base_uri = base_uri
self.uri = uri
self.byterange = byterange
def __str__(self):
output = []
if self.uri:
output.append("URI=" + quoted(self.uri))
if self.byterange:
output.append("BYTERANGE=" + quoted(self.byterange))
return "{tag}:{attributes}".format(tag=self.tag, attributes=",".join(output))
def __eq__(self, other):
if not other:
return False
return (
self.uri == other.uri
and self.byterange == other.byterange
and self.base_uri == other.base_uri
)
def __ne__(self, other):
return not self.__eq__(other)
class SessionKey(Key):
tag = ext_x_session_key
class Playlist(BasePathMixin):
"""
Playlist object representing a link to a variant M3U8 with a specific bitrate.
Attributes:
`stream_info` is a named tuple containing the attributes: `program_id`,
`bandwidth`, `average_bandwidth`, `resolution`, `codecs` and `resolution`
which is a a tuple (w, h) of integers
`media` is a list of related Media entries.
More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.10
"""
def __init__(self, uri, stream_info, media, base_uri):
self.uri = uri
self.base_uri = base_uri
resolution = stream_info.get("resolution")
if resolution is not None:
resolution = resolution.strip('"')
values = resolution.split("x")
resolution_pair = (int(values[0]), int(values[1]))
else:
resolution_pair = None
self.stream_info = StreamInfo(
bandwidth=stream_info["bandwidth"],
video=stream_info.get("video"),
audio=stream_info.get("audio"),
subtitles=stream_info.get("subtitles"),
closed_captions=stream_info.get("closed_captions"),
average_bandwidth=stream_info.get("average_bandwidth"),
program_id=stream_info.get("program_id"),
resolution=resolution_pair,
codecs=stream_info.get("codecs"),
frame_rate=stream_info.get("frame_rate"),
video_range=stream_info.get("video_range"),
hdcp_level=stream_info.get("hdcp_level"),
pathway_id=stream_info.get("pathway_id"),
stable_variant_id=stream_info.get("stable_variant_id"),
)
self.media = []
for media_type in ("audio", "video", "subtitles"):
group_id = stream_info.get(media_type)
if not group_id:
continue
self.media += filter(lambda m: m.group_id == group_id, media)
def __str__(self):
media_types = []
stream_inf = [str(self.stream_info)]
for media in self.media:
if media.type in media_types:
continue
else:
media_types += [media.type]
media_type = media.type.upper()
stream_inf.append('{}={}'.format(media_type, media.group_id))
return "#EXT-X-STREAM-INF:" + ",".join(stream_inf) + "\n" + self.uri
class IFramePlaylist(BasePathMixin):
"""
IFramePlaylist object representing a link to a
variant M3U8 i-frame playlist with a specific bitrate.
Attributes:
`iframe_stream_info` is a named tuple containing the attributes:
`program_id`, `bandwidth`, `average_bandwidth`, `codecs`, `video_range`,
`hdcp_level` and `resolution` which is a tuple (w, h) of integers
More info: http://tools.ietf.org/html/draft-pantos-http-live-streaming-07#section-3.3.13
"""
def __init__(self, base_uri, uri, iframe_stream_info):
self.uri = uri
self.base_uri = base_uri
resolution = iframe_stream_info.get("resolution")
if resolution is not None:
values = resolution.split("x")
resolution_pair = (int(values[0]), int(values[1]))
else:
resolution_pair = None
self.iframe_stream_info = StreamInfo(
bandwidth=iframe_stream_info.get("bandwidth"),
average_bandwidth=iframe_stream_info.get("average_bandwidth"),
video=iframe_stream_info.get("video"),
# Audio, subtitles, and closed captions should not exist in
# EXT-X-I-FRAME-STREAM-INF, so just hardcode them to None.
audio=None,
subtitles=None,
closed_captions=None,
program_id=iframe_stream_info.get("program_id"),
resolution=resolution_pair,
codecs=iframe_stream_info.get("codecs"),
video_range=iframe_stream_info.get("video_range"),
hdcp_level=iframe_stream_info.get("hdcp_level"),
frame_rate=None,
pathway_id=iframe_stream_info.get("pathway_id"),
stable_variant_id=iframe_stream_info.get("stable_variant_id"),
)
def __str__(self):
iframe_stream_inf = []
if self.iframe_stream_info.program_id:
iframe_stream_inf.append(
"PROGRAM-ID=%d" % self.iframe_stream_info.program_id
)
if self.iframe_stream_info.bandwidth:
iframe_stream_inf.append("BANDWIDTH=%d" % self.iframe_stream_info.bandwidth)
if self.iframe_stream_info.average_bandwidth:
iframe_stream_inf.append(
"AVERAGE-BANDWIDTH=%d" % self.iframe_stream_info.average_bandwidth
)
if self.iframe_stream_info.resolution:
res = (
str(self.iframe_stream_info.resolution[0])
+ "x"
+ str(self.iframe_stream_info.resolution[1])
)
iframe_stream_inf.append("RESOLUTION=" + res)
if self.iframe_stream_info.codecs:
iframe_stream_inf.append("CODECS=" + quoted(self.iframe_stream_info.codecs))
if self.iframe_stream_info.video_range:
iframe_stream_inf.append(
"VIDEO-RANGE=%s" % self.iframe_stream_info.video_range
)
if self.iframe_stream_info.hdcp_level:
iframe_stream_inf.append(
"HDCP-LEVEL=%s" % self.iframe_stream_info.hdcp_level
)
if self.uri:
iframe_stream_inf.append("URI=" + quoted(self.uri))
if self.iframe_stream_info.pathway_id:
iframe_stream_inf.append(
"PATHWAY-ID=" + quoted(self.iframe_stream_info.pathway_id)
)
if self.iframe_stream_info.stable_variant_id:
iframe_stream_inf.append(
"STABLE-VARIANT-ID=" + quoted(self.iframe_stream_info.stable_variant_id)
)
return "#EXT-X-I-FRAME-STREAM-INF:" + ",".join(iframe_stream_inf)
class StreamInfo:
bandwidth = None
closed_captions = None
average_bandwidth = None
program_id = None
resolution = None
codecs = None
audio = None
video = None
subtitles = None
frame_rate = None
video_range = None
hdcp_level = None
pathway_id = None
stable_variant_id = None
def __init__(self, **kwargs):
self.bandwidth = kwargs.get("bandwidth")
self.closed_captions = kwargs.get("closed_captions")
self.average_bandwidth = kwargs.get("average_bandwidth")
self.program_id = kwargs.get("program_id")
self.resolution = kwargs.get("resolution")
self.codecs = kwargs.get("codecs")
self.audio = kwargs.get("audio")
self.video = kwargs.get("video")
self.subtitles = kwargs.get("subtitles")
self.frame_rate = kwargs.get("frame_rate")
self.video_range = kwargs.get("video_range")
self.hdcp_level = kwargs.get("hdcp_level")
self.pathway_id = kwargs.get("pathway_id")
self.stable_variant_id = kwargs.get("stable_variant_id")
def __str__(self):
stream_inf = []
if self.program_id is not None:
stream_inf.append("PROGRAM-ID=%d" % self.program_id)
if self.closed_captions is not None:
stream_inf.append("CLOSED-CAPTIONS=%s" % self.closed_captions)
if self.bandwidth is not None:
stream_inf.append("BANDWIDTH=%d" % self.bandwidth)
if self.average_bandwidth is not None:
stream_inf.append("AVERAGE-BANDWIDTH=%d" % self.average_bandwidth)
if self.resolution is not None:
res = str(self.resolution[0]) + "x" + str(self.resolution[1])
stream_inf.append("RESOLUTION=" + res)
if self.frame_rate is not None:
stream_inf.append(
"FRAME-RATE=%g"
% decimal.Decimal(self.frame_rate).quantize(decimal.Decimal("1.000"))
)
if self.codecs is not None:
stream_inf.append("CODECS=" + quoted(self.codecs))
if self.video_range is not None:
stream_inf.append("VIDEO-RANGE=%s" % self.video_range)
if self.hdcp_level is not None:
stream_inf.append("HDCP-LEVEL=%s" % self.hdcp_level)
if self.pathway_id is not None:
stream_inf.append("PATHWAY-ID=" + quoted(self.pathway_id))
if self.stable_variant_id is not None:
stream_inf.append("STABLE-VARIANT-ID=" + quoted(self.stable_variant_id))
return ",".join(stream_inf)
class Media(BasePathMixin):
"""
A media object from a M3U8 playlist
https://tools.ietf.org/html/draft-pantos-http-live-streaming-16#section-4.3.4.1
`uri`
a string with the media uri
`type`
`group_id`
`language`
`assoc-language`
`name`
`default`
`autoselect`
`forced`
`instream_id`
`characteristics`
`channels`
`stable_rendition_id`
attributes in the EXT-MEDIA tag
`base_uri`
uri the media comes from in URI hierarchy. ex.: http://example.com/path/to
"""
def __init__(
self,
uri=None,
type=None,
group_id=None,
language=None,
name=None,
default=None,
autoselect=None,
forced=None,
characteristics=None,
channels=None,
stable_rendition_id=None,
assoc_language=None,
instream_id=None,
base_uri=None,
**extras
):
self.base_uri = base_uri
self.uri = uri
self.type = type
self.group_id = group_id
self.language = language
self.name = name
self.default = default
self.autoselect = autoselect
self.forced = forced
self.assoc_language = assoc_language
self.instream_id = instream_id
self.characteristics = characteristics
self.channels = channels
self.stable_rendition_id = stable_rendition_id
self.extras = extras
def dumps(self):
media_out = []
if self.uri:
media_out.append("URI=" + quoted(self.uri))
if self.type:
media_out.append("TYPE=" + self.type)
if self.group_id:
media_out.append("GROUP-ID=" + quoted(self.group_id))
if self.language:
media_out.append("LANGUAGE=" + quoted(self.language))
if self.assoc_language:
media_out.append("ASSOC-LANGUAGE=" + quoted(self.assoc_language))
if self.name:
media_out.append("NAME=" + quoted(self.name))
if self.default:
media_out.append("DEFAULT=" + self.default)
if self.autoselect:
media_out.append("AUTOSELECT=" + self.autoselect)
if self.forced:
media_out.append("FORCED=" + self.forced)
if self.instream_id:
media_out.append("INSTREAM-ID=" + quoted(self.instream_id))
if self.characteristics:
media_out.append("CHARACTERISTICS=" + quoted(self.characteristics))
if self.channels:
media_out.append("CHANNELS=" + quoted(self.channels))
if self.stable_rendition_id:
media_out.append("STABLE-RENDITION-ID=" + quoted(self.stable_rendition_id))
return "#EXT-X-MEDIA:" + ",".join(media_out)
def __str__(self):
return self.dumps()
class TagList(list):
def __str__(self):
output = [str(tag) for tag in self]
return "\n".join(output)
class MediaList(TagList, GroupedBasePathMixin):
@property
def uri(self):
return [media.uri for media in self]
class PlaylistList(TagList, GroupedBasePathMixin):
pass
class SessionDataList(TagList):
pass
class Start:
def __init__(self, time_offset, precise=None):
self.time_offset = float(time_offset)
self.precise = precise
def __str__(self):
output = ["TIME-OFFSET=" + str(self.time_offset)]
if self.precise and self.precise in ["YES", "NO"]:
output.append("PRECISE=" + str(self.precise))
return ext_x_start + ":" + ",".join(output)
class RenditionReport(BasePathMixin):
def __init__(self, base_uri, uri, last_msn, last_part=None):
self.base_uri = base_uri
self.uri = uri
self.last_msn = last_msn
self.last_part = last_part
def dumps(self):
report = []
report.append("URI=" + quoted(self.uri))
report.append("LAST-MSN=" + str(self.last_msn))
if self.last_part is not None:
report.append("LAST-PART=" + str(self.last_part))
return "#EXT-X-RENDITION-REPORT:" + ",".join(report)
def __str__(self):
return self.dumps()
class RenditionReportList(list, GroupedBasePathMixin):
def __str__(self):
output = [str(report) for report in self]
return "\n".join(output)
class ServerControl:
def __init__(
self,
can_skip_until=None,
can_block_reload=None,
hold_back=None,
part_hold_back=None,
can_skip_dateranges=None,
):
self.can_skip_until = can_skip_until
self.can_block_reload = can_block_reload
self.hold_back = hold_back
self.part_hold_back = part_hold_back
self.can_skip_dateranges = can_skip_dateranges
def __getitem__(self, item):
return getattr(self, item)
def dumps(self):
ctrl = []
if self.can_block_reload:
ctrl.append("CAN-BLOCK-RELOAD=%s" % self.can_block_reload)
for attr in ["hold_back", "part_hold_back"]:
if self[attr]:
ctrl.append(
"%s=%s"
% (denormalize_attribute(attr), number_to_string(self[attr]))
)
if self.can_skip_until:
ctrl.append("CAN-SKIP-UNTIL=%s" % number_to_string(self.can_skip_until))
if self.can_skip_dateranges:
ctrl.append("CAN-SKIP-DATERANGES=%s" % self.can_skip_dateranges)
return "#EXT-X-SERVER-CONTROL:" + ",".join(ctrl)
def __str__(self):
return self.dumps()
class Skip:
def __init__(self, skipped_segments, recently_removed_dateranges=None):
self.skipped_segments = skipped_segments
self.recently_removed_dateranges = recently_removed_dateranges
def dumps(self):
skip = []
skip.append("SKIPPED-SEGMENTS=%s" % self.skipped_segments)
if self.recently_removed_dateranges is not None:
skip.append(
"RECENTLY-REMOVED-DATERANGES=%s"
% quoted(self.recently_removed_dateranges)
)
return "#EXT-X-SKIP:" + ",".join(skip)
def __str__(self):
return self.dumps()
class PartInformation:
def __init__(self, part_target=None):
self.part_target = part_target
def dumps(self):
return "#EXT-X-PART-INF:PART-TARGET=%s" % number_to_string(self.part_target)
def __str__(self):
return self.dumps()
class PreloadHint(BasePathMixin):
def __init__(
self, type, base_uri, uri, byterange_start=None, byterange_length=None
):
self.hint_type = type
self.base_uri = base_uri
self.uri = uri
self.byterange_start = byterange_start
self.byterange_length = byterange_length
def __getitem__(self, item):
return getattr(self, item)
def dumps(self):
hint = []
hint.append("TYPE=" + self.hint_type)
hint.append("URI=" + quoted(self.uri))
for attr in ["byterange_start", "byterange_length"]:
if self[attr] is not None:
hint.append("{}={}".format(denormalize_attribute(attr), self[attr]))
return "#EXT-X-PRELOAD-HINT:" + ",".join(hint)
def __str__(self):
return self.dumps()
class SessionData:
def __init__(self, data_id, value=None, uri=None, language=None):
self.data_id = data_id
self.value = value
self.uri = uri
self.language = language
def dumps(self):
session_data_out = ["DATA-ID=" + quoted(self.data_id)]
if self.value:
session_data_out.append("VALUE=" + quoted(self.value))
elif self.uri:
session_data_out.append("URI=" + quoted(self.uri))
if self.language:
session_data_out.append("LANGUAGE=" + quoted(self.language))
return "#EXT-X-SESSION-DATA:" + ",".join(session_data_out)
def __str__(self):
return self.dumps()
class DateRangeList(TagList):
pass
class DateRange:
def __init__(self, **kwargs):
self.id = kwargs["id"]
self.start_date = kwargs.get("start_date")
self.class_ = kwargs.get("class")
self.end_date = kwargs.get("end_date")
self.duration = kwargs.get("duration")
self.planned_duration = kwargs.get("planned_duration")
self.scte35_cmd = kwargs.get("scte35_cmd")
self.scte35_out = kwargs.get("scte35_out")
self.scte35_in = kwargs.get("scte35_in")
self.end_on_next = kwargs.get("end_on_next")
self.x_client_attrs = [
(attr, kwargs.get(attr)) for attr in kwargs if attr.startswith("x_")
]
def dumps(self):
daterange = []
daterange.append("ID=" + quoted(self.id))
# whilst START-DATE is technically REQUIRED by the spec, this is
# contradicted by an example in the same document (see
# https://tools.ietf.org/html/rfc8216#section-8.10), and also by
# real-world implementations, so we make it optional here
if self.start_date:
daterange.append("START-DATE=" + quoted(self.start_date))
if self.class_:
daterange.append("CLASS=" + quoted(self.class_))
if self.end_date:
daterange.append("END-DATE=" + quoted(self.end_date))
if self.duration:
daterange.append("DURATION=" + number_to_string(self.duration))
if self.planned_duration:
daterange.append(
"PLANNED-DURATION=" + number_to_string(self.planned_duration)
)
if self.scte35_cmd:
daterange.append("SCTE35-CMD=" + self.scte35_cmd)
if self.scte35_out:
daterange.append("SCTE35-OUT=" + self.scte35_out)
if self.scte35_in:
daterange.append("SCTE35-IN=" + self.scte35_in)
if self.end_on_next:
daterange.append("END-ON-NEXT=" + self.end_on_next)
# client attributes sorted alphabetically output order is predictable
for attr, value in sorted(self.x_client_attrs):
daterange.append("{}={}".format(denormalize_attribute(attr, value)))
return "#EXT-X-DATERANGE:" + ",".join(daterange)
def __str__(self):
return self.dumps()
class ContentSteering(BasePathMixin):
def __init__(self, base_uri, server_uri, pathway_id=None):
self.base_uri = base_uri
self.uri = server_uri
self.pathway_id = pathway_id
def dumps(self):
steering = []
steering.append("SERVER-URI=" + quoted(self.uri))
if self.pathway_id is not None:
steering.append("PATHWAY-ID=" + quoted(self.pathway_id))
return "#EXT-X-CONTENT-STEERING:" + ",".join(steering)
def __str__(self):
return self.dumps()
class ImagePlaylist(BasePathMixin):
"""
ImagePlaylist object representing a link to a
variant M3U8 image playlist with a specific bitrate.
Attributes:
`image_stream_info` is a named tuple containing the attributes:
`bandwidth`, `resolution` which is a tuple (w, h) of integers and `codecs`,
More info: https://github.com/image-media-playlist/spec/blob/master/image_media_playlist_v0_4.pdf
"""
def __init__(self, base_uri, uri, image_stream_info):
self.uri = uri
self.base_uri = base_uri
resolution = image_stream_info.get("resolution")
if resolution is not None:
values = resolution.split("x")
resolution_pair = (int(values[0]), int(values[1]))
else:
resolution_pair = None
self.image_stream_info = StreamInfo(
bandwidth=image_stream_info.get("bandwidth"),
average_bandwidth=image_stream_info.get("average_bandwidth"),
video=image_stream_info.get("video"),
# Audio, subtitles, closed captions, video range and hdcp level should not exist in
# EXT-X-IMAGE-STREAM-INF, so just hardcode them to None.
audio=None,
subtitles=None,
closed_captions=None,
program_id=image_stream_info.get("program_id"),
resolution=resolution_pair,
codecs=image_stream_info.get("codecs"),
video_range=None,
hdcp_level=None,
frame_rate=None,
pathway_id=image_stream_info.get("pathway_id"),
stable_variant_id=image_stream_info.get("stable_variant_id"),
)
def __str__(self):
image_stream_inf = []
if self.image_stream_info.program_id:
image_stream_inf.append("PROGRAM-ID=%d" % self.image_stream_info.program_id)
if self.image_stream_info.bandwidth:
image_stream_inf.append("BANDWIDTH=%d" % self.image_stream_info.bandwidth)
if self.image_stream_info.average_bandwidth:
image_stream_inf.append(
"AVERAGE-BANDWIDTH=%d" % self.image_stream_info.average_bandwidth
)
if self.image_stream_info.resolution:
res = (
str(self.image_stream_info.resolution[0])
+ "x"
+ str(self.image_stream_info.resolution[1])
)
image_stream_inf.append("RESOLUTION=" + res)
if self.image_stream_info.codecs:
image_stream_inf.append("CODECS=" + quoted(self.image_stream_info.codecs))
if self.uri:
image_stream_inf.append("URI=" + quoted(self.uri))
if self.image_stream_info.pathway_id:
image_stream_inf.append(
"PATHWAY-ID=" + quoted(self.image_stream_info.pathway_id)
)
if self.image_stream_info.stable_variant_id:
image_stream_inf.append(
"STABLE-VARIANT-ID=" + quoted(self.image_stream_info.stable_variant_id)
)
return "#EXT-X-IMAGE-STREAM-INF:" + ",".join(image_stream_inf)
class Tiles(BasePathMixin):
"""
Image tiles from a M3U8 playlist
`resolution`
resolution attribute from EXT-X-TILES tag
`layout`
layout attribute from EXT-X-TILES tag
`duration`
duration attribute from EXT-X-TILES tag
"""
def __init__(self, resolution, layout, duration):
self.resolution = resolution
self.layout = layout
self.duration = duration
def dumps(self):
tiles = []
tiles.append("RESOLUTION=" + self.resolution)
tiles.append("LAYOUT=" + self.layout)
tiles.append("DURATION=" + self.duration)
return "#EXT-X-TILES:" + ",".join(tiles)
def __str__(self):
return self.dumps()
def find_key(keydata, keylist):
if not keydata:
return None
for key in keylist:
if key:
# Check the intersection of keys and values
if (
keydata.get("uri", None) == key.uri
and keydata.get("method", "NONE") == key.method
and keydata.get("iv", None) == key.iv
):
return key
raise KeyError("No key found for key data")
def denormalize_attribute(attribute):
return attribute.replace("_", "-").upper()
def quoted(string):
return '"%s"' % string
def number_to_string(number):
with decimal.localcontext() as ctx:
ctx.prec = 20 # set floating point precision
d = decimal.Decimal(str(number))
return str(
d.quantize(decimal.Decimal(1))
if d == d.to_integral_value()
else d.normalize()
)