Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# !/usr/bin/python
# coding=utf-8
#
# Copyright (C) 2018-2025 by dream-alpha
#
# In case of reuse of this source code please do not remove this copyright.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# For more information on the GNU General Public License see:
# <http://www.gnu.org/licenses/>.


import json
from time import strftime, localtime
from twisted.internet import threads, reactor
from .Debug import logger
from .UnicodeUtils import convertUni2Str
from .WebRequests import WebRequests
from .Index import idx
from .DateTimeUtils import iso_to_epoch
from .CacheUtils import saveEvents


BASE_URL = "https://tvfueralle.de"


class TVFAData(WebRequests):
    def __init__(self, channel_dict):
        WebRequests.__init__(self)
        self.channel_dict = channel_dict

    def parseEvent(self, event):
        content = event.get('content', {})
        # logger.debug("content: %s", content)
        texts = content.get('texts', {})
        # very_short_value = ""
        long_value = ""
        if isinstance(texts, dict):
            # very_short_value = texts.get('VeryShort', {}).get('value', "")
            long_value = texts.get('Long', {}).get('value', "")
        photo_url = event.get('photo', {}).get('url', '')
        if photo_url:
            photo_url = BASE_URL + photo_url
        startTime = iso_to_epoch(event.get('startTime'))
        startHM = strftime("%H:%M", localtime(startTime))

        formatted_event = [None] * (len(idx) + 1)

        formatted_event[idx['startHM']] = startHM
        formatted_event[idx['title']] = event.get('title')
        formatted_event[idx['subtitle']] = content.get('subtitle')
        formatted_event[idx['year']] = content.get('year')
        formatted_event[idx['startTime']] = startTime
        formatted_event[idx['country']] = content.get('country')
        formatted_event[idx['category']] = ""
        formatted_event[idx['genre']] = ""
        formatted_event[idx['endTime']] = iso_to_epoch(event.get('endTime'))
        formatted_event[idx['duration']] = event.get('duration') / 60
        formatted_event[idx['channel']] = ""
        formatted_event[idx['urlsendung']] = ""
        formatted_event[idx['has_video']] = False
        formatted_event[idx['photo_url']] = photo_url
        formatted_event[idx['description']] = long_value
        formatted_event[idx['video_url']] = ""

        # logger.debug("formatted_event: %s", formatted_event)
        return formatted_event

    def downloadEvents(self, day, page_channel_list, all_events, callback):
        logger.info("page_channel_list: %s, day: %s", page_channel_list, day)
        threads.deferToThread(self.downloading, day,
                              page_channel_list, all_events, callback)

    def downloading(self, day, page_channel_list, all_events, callback):
        logger.info("page_channel_list: %s, day: %s", page_channel_list, day)
        page_channel_id_list = []
        channel_id2service_ref = {}
        for service_ref in page_channel_list:
            channel_id = self.channel_dict.get(
                service_ref, {}).get("tvfa_id", "")
            channel_id2service_ref[channel_id] = service_ref
            page_channel_id_list.append(channel_id)

        url = BASE_URL + "/api/broadcasts/%s" % day
        logger.debug("url: %s", url)
        result = self.getContent(url)
        # logger.info("result: %s", result)
        if result:
            events = convertUni2Str(json.loads(result)).get("events", {})
            # logger.debug("events: %s", events)
            if day not in all_events:
                all_events[day] = {}
            for event in events:
                # logger.debug("event: %s", event)
                channel_id = event.get('channel', -1)
                if channel_id in page_channel_id_list:
                    service_ref = channel_id2service_ref.get(channel_id, "")
                    if service_ref not in all_events[day]:
                        all_events[day][service_ref] = []
                    parsed_event = self.parseEvent(event)
                    if parsed_event:
                        all_events[day][service_ref].append(parsed_event)
        saveEvents(all_events)
        reactor.callFromThread(callback, all_events)  # pylint: disable=E1101