Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
omni-code / tools / download_file.py
Size: Mime:
import os
import tempfile
import requests
from typing import Dict, Any, Optional
from omniagents.core.tools import rich_function_tool, RichToolOutput

@rich_function_tool
def download_file(url: str, output_path: Optional[str] = None) -> RichToolOutput:
    """
    Downloads a file from a given URL and saves it locally to a temporary location.
    
    Args:
        url: The URL of the file to download.
        output_path: Optional path where the file should be saved. If not provided, 
                    will save to a temporary location with filename from URL.
    
    Returns:
        Dictionary with status, file path, and metadata.
    
    Usage:
        This function downloads a file from a URL and saves it to a temporary location.
        It can be chained with convert_to_markdown for document analysis.
        
    Example:
        result = download_file(
            url="https://example.com/document.pdf"
        )
        if result["success"]:
            markdown_result = convert_to_markdown(result["file_path"])
    """
    try:
        # Make the request to the URL
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        }
        
        response = requests.get(url, headers=headers, stream=True, timeout=30)
        
        if response.status_code != 200:
            error_msg = f"Request failed with status code {response.status_code}"
            ui_metadata = {
                "value": error_msg,
                "display_type": "error",
                "summary": f"HTTP {response.status_code} error",
                "preview": response.text[:500] if response.text else error_msg,
                "truncated": len(response.text) > 500 if response.text else False,
                "metadata": {
                    "error_type": "http_error",
                    "status_code": response.status_code,
                    "url": url,
                    "success": False,
                    "file_path": None
                }
            }
            return RichToolOutput(error_msg, ui_metadata)
        
        # Get content type and filename
        content_type = response.headers.get('Content-Type', '')
        
        # Determine filename or extension for the temporary file
        if output_path:
            # User provided a specific path to use
            file_dir = os.path.dirname(os.path.abspath(output_path))
            filename = os.path.basename(output_path)
            
            # Create directory for output_path if it doesn't exist
            os.makedirs(file_dir, exist_ok=True)
            
            # Use the provided output path
            final_path = output_path
        else:
            # -----------------------------------------------------
            # 1. Try to extract filename from Content-Disposition
            # -----------------------------------------------------
            filename = None
            disposition_header = response.headers.get('Content-Disposition', '')
            if 'filename=' in disposition_header:
                # Content-Disposition takes precedence because it is set
                # by the server explicitly and usually contains the
                # canonical filename for the download.
                filename = disposition_header.split('filename=')[1].strip(' "\'')
                filename_source = 'header'
            else:
                # -------------------------------------------------
                # 2. Fallback to the filename in the URL path
                # -------------------------------------------------
                filename = url.split('/')[-1].split('?')[0]
                filename_source = 'url'

            # Ensure filename is at least an empty string if nothing found
            filename = filename or ''

            # Determine file extension based on filename or content type
            extension = os.path.splitext(filename)[1]
            if not extension:
                # Map common content types to extensions when the filename
                # does not already contain one.
                extension_map = {
                    'application/pdf': '.pdf',
                    'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
                    'application/msword': '.doc',
                    'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
                    'application/vnd.ms-excel': '.xls',
                    'text/plain': '.txt',
                    'text/html': '.html',
                    'image/jpeg': '.jpg',
                    'image/png': '.png'
                }
                extension = extension_map.get(content_type.split(';')[0], '')

            # -----------------------------------------------------
            # Decide how to create the temporary file:
            #   - If the filename came from the header, we preserve the
            #     provided name by creating a temporary directory and
            #     joining the filename.
            #   - Otherwise (filename from URL or generated), we rely on
            #     mkstemp so that the OS chooses a safe unique location.
            # -----------------------------------------------------
            if filename_source == 'header' and filename:
                temp_dir = tempfile.mkdtemp(prefix='download_')
                final_path = os.path.join(temp_dir, filename)
            else:
                # If no extension could be determined yet, append the one
                # we inferred from the content type (may be empty string).
                suffix = extension if extension else ''
                fd, final_path = tempfile.mkstemp(suffix=suffix)
                os.close(fd)  # Close the file descriptor
        
        # Save the file
        with open(final_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192): 
                f.write(chunk)
        
        file_size = os.path.getsize(final_path)
        
        # Create LLM-friendly output
        llm_output = f"Successfully downloaded file from {url} to {final_path} ({file_size} bytes)"
        
        ui_metadata = {
            "value": final_path,  # The actual result - the file path
            "display_type": "download",
            "summary": f"Downloaded {os.path.basename(final_path)}",
            "preview": f"File saved to: {final_path}\nSize: {file_size} bytes\nType: {content_type}",
            "truncated": False,
            "metadata": {
                "success": True,
                "file_path": final_path,
                "url": url,
                "content_type": content_type,
                "file_size": file_size,
                "is_temporary": output_path is None
            }
        }
        return RichToolOutput(llm_output, ui_metadata)
        
    except requests.RequestException as e:
        error_msg = f"Network error occurred: {str(e)}"
        ui_metadata = {
            "value": error_msg,
            "display_type": "error",
            "summary": "Network error",
            "preview": str(e),
            "truncated": False,
            "metadata": {
                "error_type": "network_error",
                "url": url,
                "error": str(e),
                "success": False,
                "file_path": None
            }
        }
        return RichToolOutput(error_msg, ui_metadata)
    except Exception as e:
        error_msg = f"Error during file download: {str(e)}"
        ui_metadata = {
            "value": error_msg,
            "display_type": "error",
            "summary": "Download error",
            "preview": str(e),
            "truncated": False,
            "metadata": {
                "error_type": "download_error",
                "url": url,
                "error": str(e),
                "success": False,
                "file_path": None
            }
        }
        return RichToolOutput(error_msg, ui_metadata)