Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / Scrapy   python

Repository URL to install this package:

/ extensions / feedexport.py

"""
Feed Exports extension

See documentation in docs/topics/feed-exports.rst
"""

import os
import sys
import logging
import posixpath
from tempfile import NamedTemporaryFile
from datetime import datetime
import six
from six.moves.urllib.parse import urlparse
from ftplib import FTP

from zope.interface import Interface, implementer
from twisted.internet import defer, threads
from w3lib.url import file_uri_to_path

from scrapy import signals
from scrapy.utils.ftp import ftp_makedirs_cwd
from scrapy.exceptions import NotConfigured
from scrapy.utils.misc import load_object
from scrapy.utils.log import failure_to_exc_info
from scrapy.utils.python import without_none_values
from scrapy.utils.boto import is_botocore

logger = logging.getLogger(__name__)


class IFeedStorage(Interface):
    """Interface that all Feed Storages must implement"""

    def __init__(uri):
        """Initialize the storage with the parameters given in the URI"""

    def open(spider):
        """Open the storage for the given spider. It must return a file-like
        object that will be used for the exporters"""

    def store(file):
        """Store the given file stream"""


@implementer(IFeedStorage)
class BlockingFeedStorage(object):

    def open(self, spider):
        path = spider.crawler.settings['FEED_TEMPDIR']
        if path and not os.path.isdir(path):
            raise OSError('Not a Directory: ' + str(path))

        return NamedTemporaryFile(prefix='feed-', dir=path)

    def store(self, file):
        return threads.deferToThread(self._store_in_thread, file)

    def _store_in_thread(self, file):
        raise NotImplementedError


@implementer(IFeedStorage)
class StdoutFeedStorage(object):

    def __init__(self, uri, _stdout=None):
        if not _stdout:
            _stdout = sys.stdout if six.PY2 else sys.stdout.buffer
        self._stdout = _stdout

    def open(self, spider):
        return self._stdout

    def store(self, file):
        pass


@implementer(IFeedStorage)
class FileFeedStorage(object):

    def __init__(self, uri):
        self.path = file_uri_to_path(uri)

    def open(self, spider):
        dirname = os.path.dirname(self.path)
        if dirname and not os.path.exists(dirname):
            os.makedirs(dirname)
        return open(self.path, 'ab')

    def store(self, file):
        file.close()


class S3FeedStorage(BlockingFeedStorage):

    def __init__(self, uri):
        from scrapy.conf import settings
        u = urlparse(uri)
        self.bucketname = u.hostname
        self.access_key = u.username or settings['AWS_ACCESS_KEY_ID']
        self.secret_key = u.password or settings['AWS_SECRET_ACCESS_KEY']
        self.is_botocore = is_botocore()
        self.keyname = u.path[1:]  # remove first "/"
        if self.is_botocore:
            import botocore.session
            session = botocore.session.get_session()
            self.s3_client = session.create_client(
                's3', aws_access_key_id=self.access_key,
                aws_secret_access_key=self.secret_key)
        else:
            import boto
            self.connect_s3 = boto.connect_s3

    def _store_in_thread(self, file):
        file.seek(0)
        if self.is_botocore:
            self.s3_client.put_object(
                Bucket=self.bucketname, Key=self.keyname, Body=file)
        else:
            conn = self.connect_s3(self.access_key, self.secret_key)
            bucket = conn.get_bucket(self.bucketname, validate=False)
            key = bucket.new_key(self.keyname)
            key.set_contents_from_file(file)
            key.close()


class FTPFeedStorage(BlockingFeedStorage):

    def __init__(self, uri):
        u = urlparse(uri)
        self.host = u.hostname
        self.port = int(u.port or '21')
        self.username = u.username
        self.password = u.password
        self.path = u.path

    def _store_in_thread(self, file):
        file.seek(0)
        ftp = FTP()
        ftp.connect(self.host, self.port)
        ftp.login(self.username, self.password)
        dirname, filename = posixpath.split(self.path)
        ftp_makedirs_cwd(ftp, dirname)
        ftp.storbinary('STOR %s' % filename, file)
        ftp.quit()


class SpiderSlot(object):
    def __init__(self, file, exporter, storage, uri):
        self.file = file
        self.exporter = exporter
        self.storage = storage
        self.uri = uri
        self.itemcount = 0


class FeedExporter(object):

    def __init__(self, settings):
        self.settings = settings
        self.urifmt = settings['FEED_URI']
        if not self.urifmt:
            raise NotConfigured
        self.format = settings['FEED_FORMAT'].lower()
        self.storages = self._load_components('FEED_STORAGES')
        self.exporters = self._load_components('FEED_EXPORTERS')
        if not self._storage_supported(self.urifmt):
            raise NotConfigured
        if not self._exporter_supported(self.format):
            raise NotConfigured
        self.store_empty = settings.getbool('FEED_STORE_EMPTY')
        self.export_fields = settings.getlist('FEED_EXPORT_FIELDS') or None
        uripar = settings['FEED_URI_PARAMS']
        self._uripar = load_object(uripar) if uripar else lambda x, y: None

    @classmethod
    def from_crawler(cls, crawler):
        o = cls(crawler.settings)
        crawler.signals.connect(o.open_spider, signals.spider_opened)
        crawler.signals.connect(o.close_spider, signals.spider_closed)
        crawler.signals.connect(o.item_scraped, signals.item_scraped)
        return o

    def open_spider(self, spider):
        uri = self.urifmt % self._get_uri_params(spider)
        storage = self._get_storage(uri)
        file = storage.open(spider)
        exporter = self._get_exporter(file, fields_to_export=self.export_fields)
        exporter.start_exporting()
        self.slot = SpiderSlot(file, exporter, storage, uri)

    def close_spider(self, spider):
        slot = self.slot
        if not slot.itemcount and not self.store_empty:
            return
        slot.exporter.finish_exporting()
        logfmt = "%s %%(format)s feed (%%(itemcount)d items) in: %%(uri)s"
        log_args = {'format': self.format,
                    'itemcount': slot.itemcount,
                    'uri': slot.uri}
        d = defer.maybeDeferred(slot.storage.store, slot.file)
        d.addCallback(lambda _: logger.info(logfmt % "Stored", log_args,
                                            extra={'spider': spider}))
        d.addErrback(lambda f: logger.error(logfmt % "Error storing", log_args,
                                            exc_info=failure_to_exc_info(f),
                                            extra={'spider': spider}))
        return d

    def item_scraped(self, item, spider):
        slot = self.slot
        slot.exporter.export_item(item)
        slot.itemcount += 1
        return item

    def _load_components(self, setting_prefix):
        conf = without_none_values(self.settings.getwithbase(setting_prefix))
        d = {}
        for k, v in conf.items():
            try:
                d[k] = load_object(v)
            except NotConfigured:
                pass
        return d

    def _exporter_supported(self, format):
        if format in self.exporters:
            return True
        logger.error("Unknown feed format: %(format)s", {'format': format})

    def _storage_supported(self, uri):
        scheme = urlparse(uri).scheme
        if scheme in self.storages:
            try:
                self._get_storage(uri)
                return True
            except NotConfigured:
                logger.error("Disabled feed storage scheme: %(scheme)s",
                             {'scheme': scheme})
        else:
            logger.error("Unknown feed storage scheme: %(scheme)s",
                         {'scheme': scheme})

    def _get_exporter(self, *args, **kwargs):
        return self.exporters[self.format](*args, **kwargs)

    def _get_storage(self, uri):
        return self.storages[urlparse(uri).scheme](uri)

    def _get_uri_params(self, spider):
        params = {}
        for k in dir(spider):
            params[k] = getattr(spider, k)
        ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')
        params['time'] = ts
        self._uripar(params, spider)
        return params