Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pyarrow / include / arrow / filesystem / s3fs.h
Size: Mime:
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"

namespace Aws::Auth {
class AWSCredentialsProvider;
class STSAssumeRoleCredentialsProvider;
}  // namespace Aws::Auth

namespace Aws::STS {
class STSClient;
}  // namespace Aws::STS

namespace arrow::fs {

/// Options for using a proxy for S3
struct ARROW_EXPORT S3ProxyOptions {
  std::string scheme;
  std::string host;
  int port = -1;
  std::string username;
  std::string password;

  /// Initialize from URI such as http://username:password@host:port
  /// or http://host:port
  static Result<S3ProxyOptions> FromUri(const std::string& uri);
  static Result<S3ProxyOptions> FromUri(const ::arrow::util::Uri& uri);

  bool Equals(const S3ProxyOptions& other) const;
};

enum class S3CredentialsKind : int8_t {
  /// Anonymous access (no credentials used)
  Anonymous,
  /// Use default AWS credentials, configured through environment variables
  Default,
  /// Use explicitly-provided access key pair
  Explicit,
  /// Assume role through a role ARN
  Role,
  /// Use web identity token to assume role, configured through environment variables
  WebIdentity
};

/// Pure virtual class for describing custom S3 retry strategies
class ARROW_EXPORT S3RetryStrategy {
 public:
  virtual ~S3RetryStrategy() = default;

  /// Simple struct where each field corresponds to a field in Aws::Client::AWSError
  struct AWSErrorDetail {
    /// Corresponds to AWSError::GetErrorType()
    int error_type;
    /// Corresponds to AWSError::GetMessage()
    std::string message;
    /// Corresponds to AWSError::GetExceptionName()
    std::string exception_name;
    /// Corresponds to AWSError::ShouldRetry()
    bool should_retry;
  };
  /// Returns true if the S3 request resulting in the provided error should be retried.
  virtual bool ShouldRetry(const AWSErrorDetail& error, int64_t attempted_retries) = 0;
  /// Returns the time in milliseconds the S3 client should sleep for until retrying.
  virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error,
                                                int64_t attempted_retries) = 0;
  /// Returns a stock AWS Default retry strategy.
  static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy(
      int64_t max_attempts);
  /// Returns a stock AWS Standard retry strategy.
  static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy(
      int64_t max_attempts);
};

/// Options for the S3FileSystem implementation.
struct ARROW_EXPORT S3Options {
  /// \brief Smart defaults for option values
  ///
  /// The possible values for this setting are explained in the AWS docs:
  /// https://docs.aws.amazon.com/sdkref/latest/guide/feature-smart-config-defaults.html
  std::string smart_defaults = "standard";

  /// \brief AWS region to connect to.
  ///
  /// If unset, the AWS SDK will choose a default value.  The exact algorithm
  /// depends on the SDK version.  Before 1.8, the default is hardcoded
  /// to "us-east-1".  Since 1.8, several heuristics are used to determine
  /// the region (environment variables, configuration profile, EC2 metadata
  /// server).
  std::string region;

  /// \brief Socket connection timeout, in seconds
  ///
  /// If negative, the AWS SDK default value is used (typically 1 second).
  double connect_timeout = -1;

  /// \brief Socket read timeout on Windows and macOS, in seconds
  ///
  /// If negative, the AWS SDK default value is used (typically 3 seconds).
  /// This option is ignored on non-Windows, non-macOS systems.
  double request_timeout = -1;

  /// If non-empty, override region with a connect string such as "localhost:9000"
  // XXX perhaps instead take a URL like "http://localhost:9000"?
  std::string endpoint_override;
  /// S3 connection transport, default "https"
  std::string scheme = "https";

  /// ARN of role to assume
  std::string role_arn;
  /// Optional identifier for an assumed role session.
  std::string session_name;
  /// Optional external identifier to pass to STS when assuming a role
  std::string external_id;
  /// Frequency (in seconds) to refresh temporary credentials from assumed role
  int load_frequency = 900;

  /// If connection is through a proxy, set options here
  S3ProxyOptions proxy_options;

  /// AWS credentials provider
  std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;

  /// Type of credentials being used. Set along with credentials_provider.
  S3CredentialsKind credentials_kind = S3CredentialsKind::Default;

  /// Whether to use virtual addressing of buckets
  ///
  /// If true, then virtual addressing is always enabled.
  /// If false, then virtual addressing is only enabled if `endpoint_override` is empty.
  ///
  /// This can be used for non-AWS backends that only support virtual hosted-style access.
  bool force_virtual_addressing = false;

  /// Whether OutputStream writes will be issued in the background, without blocking.
  bool background_writes = true;

  /// Whether to allow creation of buckets
  ///
  /// When S3FileSystem creates new buckets, it does not pass any non-default settings.
  /// In AWS S3, the bucket and all objects will be not publicly visible, and there
  /// will be no bucket policies and no resource tags. To have more control over how
  /// buckets are created, use a different API to create them.
  bool allow_bucket_creation = false;

  /// Whether to allow deletion of buckets
  bool allow_bucket_deletion = false;

  /// Whether to allow pessimistic directory creation in CreateDir function
  ///
  /// By default, CreateDir function will try to create the directory without checking its
  /// existence. It's an optimization to try directory creation and catch the error,
  /// rather than issue two dependent I/O calls.
  /// Though for key/value storage like Google Cloud Storage, too many creation calls will
  /// breach the rate limit for object mutation operations and cause serious consequences.
  /// It's also possible you don't have creation access for the parent directory. Set it
  /// to be true to address these scenarios.
  bool check_directory_existence_before_creation = false;

  /// Whether to allow file-open methods to return before the actual open.
  ///
  /// Enabling this may reduce the latency of `OpenInputStream`, `OpenOutputStream`,
  /// and similar methods, by reducing the number of roundtrips necessary. It may also
  /// allow usage of more efficient S3 APIs for small files.
  /// The downside is that failure conditions such as attempting to open a file in a
  /// non-existing bucket will only be reported when actual I/O is done (at worse,
  /// when attempting to close the file).
  bool allow_delayed_open = false;

  /// \brief Default metadata for OpenOutputStream.
  ///
  /// This will be ignored if non-empty metadata is passed to OpenOutputStream.
  std::shared_ptr<const KeyValueMetadata> default_metadata;

  /// Optional retry strategy to determine which error types should be retried, and the
  /// delay between retries.
  std::shared_ptr<S3RetryStrategy> retry_strategy;

  /// Optional customer-provided key for server-side encryption (SSE-C).
  ///
  /// This should be the 32-byte AES-256 key, unencoded.
  std::string sse_customer_key;

  /// Optional path to a single PEM file holding all TLS CA certificates
  ///
  /// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
  /// if the corresponding global filesystem option is also empty, the underlying
  /// TLS library's defaults will be used.
  ///
  /// Note this option may be ignored on some systems (Windows, macOS).
  std::string tls_ca_file_path;

  /// Optional path to a directory holding TLS CA
  ///
  /// The given directory should contain CA certificates as individual PEM files
  /// named along the OpenSSL "hashed" format.
  ///
  /// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
  /// if the corresponding global filesystem option is also empty, the underlying
  /// TLS library's defaults will be used.
  ///
  /// Note this option may be ignored on some systems (Windows, macOS).
  std::string tls_ca_dir_path;

  /// Whether to verify the S3 endpoint's TLS certificate
  ///
  /// This option applies if the scheme is "https".
  bool tls_verify_certificates = true;

  S3Options();

  /// Configure with the default AWS credentials provider chain.
  void ConfigureDefaultCredentials();

  /// Configure with anonymous credentials.  This will only let you access public buckets.
  void ConfigureAnonymousCredentials();

  /// Configure with explicit access and secret key.
  void ConfigureAccessKey(const std::string& access_key, const std::string& secret_key,
                          const std::string& session_token = "");

  /// Configure with credentials from an assumed role.
  void ConfigureAssumeRoleCredentials(
      const std::string& role_arn, const std::string& session_name = "",
      const std::string& external_id = "", int load_frequency = 900,
      const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);

  /// Configure with credentials from role assumed using a web identity token
  void ConfigureAssumeRoleWithWebIdentityCredentials();

  std::string GetAccessKey() const;
  std::string GetSecretKey() const;
  std::string GetSessionToken() const;

  bool Equals(const S3Options& other) const;

  /// \brief Initialize with default credentials provider chain
  ///
  /// This is recommended if you use the standard AWS environment variables
  /// and/or configuration file.
  static S3Options Defaults();

  /// \brief Initialize with anonymous credentials.
  ///
  /// This will only let you access public buckets.
  static S3Options Anonymous();

  /// \brief Initialize with explicit access and secret key.
  ///
  /// Optionally, a session token may also be provided for temporary credentials
  /// (from STS).
  static S3Options FromAccessKey(const std::string& access_key,
                                 const std::string& secret_key,
                                 const std::string& session_token = "");

  /// \brief Initialize from an assumed role.
  static S3Options FromAssumeRole(
      const std::string& role_arn, const std::string& session_name = "",
      const std::string& external_id = "", int load_frequency = 900,
      const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);

  /// \brief Initialize from an assumed role with web-identity.
  /// Uses the AWS SDK which uses environment variables to
  /// generate temporary credentials.
  static S3Options FromAssumeRoleWithWebIdentity();

  static Result<S3Options> FromUri(const ::arrow::util::Uri& uri,
                                   std::string* out_path = NULLPTR);
  static Result<S3Options> FromUri(const std::string& uri,
                                   std::string* out_path = NULLPTR);
};

/// S3-backed FileSystem implementation.
///
/// Some implementation notes:
/// - buckets are special and the operations available on them may be limited
///   or more expensive than desired.
class ARROW_EXPORT S3FileSystem : public FileSystem {
 public:
  ~S3FileSystem() override;

  std::string type_name() const override { return "s3"; }

  /// Return the original S3 options when constructing the filesystem
  S3Options options() const;
  /// Return the actual region this filesystem connects to
  std::string region() const;

  bool Equals(const FileSystem& other) const override;
  Result<std::string> PathFromUri(const std::string& uri_string) const override;
  Result<std::string> MakeUri(std::string path) const override;

  /// \cond FALSE
  using FileSystem::CreateDir;
  using FileSystem::DeleteDirContents;
  using FileSystem::DeleteDirContentsAsync;
  using FileSystem::GetFileInfo;
  using FileSystem::OpenAppendStream;
  using FileSystem::OpenOutputStream;
  /// \endcond

  Result<FileInfo> GetFileInfo(const std::string& path) override;
  Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;

  FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;

  Status CreateDir(const std::string& path, bool recursive) override;

  Status DeleteDir(const std::string& path) override;
  Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override;
  Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override;
  Status DeleteRootDirContents() override;

  Status DeleteFile(const std::string& path) override;

  Status Move(const std::string& src, const std::string& dest) override;

  Status CopyFile(const std::string& src, const std::string& dest) override;

  /// Create a sequential input stream for reading from a S3 object.
  ///
  /// NOTE: Reads from the stream will be synchronous and unbuffered.
  /// You way want to wrap the stream in a BufferedInputStream or use
  /// a custom readahead strategy to avoid idle waits.
  Result<std::shared_ptr<io::InputStream>> OpenInputStream(
      const std::string& path) override;
  /// Create a sequential input stream for reading from a S3 object.
  ///
  /// This override avoids a HEAD request by assuming the FileInfo
  /// contains correct information.
  Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override;

  /// Create a random access file for reading from a S3 object.
  ///
  /// See OpenInputStream for performance notes.
  Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
      const std::string& path) override;
  /// Create a random access file for reading from a S3 object.
  ///
  /// This override avoids a HEAD request by assuming the FileInfo
  /// contains correct information.
  Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
      const FileInfo& info) override;

  /// Create a sequential output stream for writing to a S3 object.
  ///
  /// NOTE: Writes to the stream will be buffered.  Depending on
  /// S3Options.background_writes, they can be synchronous or not.
  /// It is recommended to enable background_writes unless you prefer
  /// implementing your own background execution strategy.
  Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
      const std::string& path,
      const std::shared_ptr<const KeyValueMetadata>& metadata) override;

  Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
      const std::string& path,
      const std::shared_ptr<const KeyValueMetadata>& metadata) override;

  /// Create a S3FileSystem instance from the given options.
  static Result<std::shared_ptr<S3FileSystem>> Make(
      const S3Options& options, const io::IOContext& = io::default_io_context());

 protected:
  explicit S3FileSystem(const S3Options& options, const io::IOContext&);

  class Impl;
  std::shared_ptr<Impl> impl_;
};

enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace };

struct ARROW_EXPORT S3GlobalOptions {
  /// The log level for S3-originating messages.
  S3LogLevel log_level;

  /// The number of threads to configure when creating AWS' I/O event loop
  ///
  /// Defaults to 1 as recommended by AWS' doc when the # of connections is
  /// expected to be, at most, in the hundreds
  ///
  /// For more details see Aws::Crt::Io::EventLoopGroup
  int num_event_loop_threads = 1;

  /// Whether to install a process-wide SIGPIPE handler
  ///
  /// The AWS SDK may sometimes emit SIGPIPE signals for certain errors;
  /// by default, they would abort the current process.
  /// This option, if enabled, will install a process-wide signal handler
  /// that logs and otherwise ignore incoming SIGPIPE signals.
  ///
  /// This option has no effect on Windows.
  bool install_sigpipe_handler = false;

  /// \brief Initialize with default options
  ///
  /// For log_level, this method first tries to extract a suitable value from the
  /// environment variable ARROW_S3_LOG_LEVEL.
  static S3GlobalOptions Defaults();
};

/// \brief Initialize the S3 APIs with the specified set of options.
///
/// It is required to call this function at least once before using S3FileSystem.
///
/// Once this function is called you MUST call FinalizeS3 before the end of the
/// application in order to avoid a segmentation fault at shutdown.
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);

/// \brief Ensure the S3 APIs are initialized, but only if not already done.
///
/// If necessary, this will call InitializeS3() with some default options.
ARROW_EXPORT
Status EnsureS3Initialized();

/// Whether S3 was initialized, and not finalized.
ARROW_EXPORT
bool IsS3Initialized();

/// Whether S3 was finalized.
ARROW_EXPORT
bool IsS3Finalized();

/// \brief Shutdown the S3 APIs.
///
/// This can wait for some S3 concurrent calls to finish so as to avoid
/// race conditions.
/// After this function has been called, all S3 calls will fail with an error.
///
/// Calls to InitializeS3() and FinalizeS3() should be serialized by the
/// application (this also applies to EnsureS3Initialized() and
/// EnsureS3Finalized()).
ARROW_EXPORT
Status FinalizeS3();

/// \brief Ensure the S3 APIs are shutdown, but only if not already done.
///
/// If necessary, this will call FinalizeS3().
ARROW_EXPORT
Status EnsureS3Finalized();

ARROW_EXPORT
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);

}  // namespace arrow::fs