Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
snowflake-connector-python / src / snowflake / connector / cpp / ArrowIterator / CArrowTableIterator.cpp
Size: Mime:
//
// Copyright (c) 2012-2021 Snowflake Computing Inc. All rights reserved.
//

#include "CArrowTableIterator.hpp"
#include "SnowflakeType.hpp"
#include "Python/Common.hpp"
#include "Util/time.hpp"
#include <memory>
#include <string>
#include <vector>

namespace sf
{

/**
 * This function is to make sure the arrow table can be successfully converted to pandas dataframe
 * using arrow's to_pandas method. Since some Snowflake arrow columns are not supported, this method
 * can map those to supported ones.
 * Specifically,
 *    All Snowflake fixed number with scale > 0 (expect decimal) will be converted to Arrow float64/double column
 *    All Snowflake time columns will be converted to Arrow Time column with unit = second, milli, or, micro.
 *    All Snowflake timestamp columns will be converted to Arrow timestamp columns
 *    Specifically,
 *    timestampntz will be converted to Arrow timestamp with UTC
 *    timestampltz will be converted to Arrow timestamp with session time zone
 *    timestamptz will be converted to Arrow timestamp with UTC
 *    Since Arrow timestamp use int64_t internally so it may be out of range for small and large timestamps
 */
void CArrowTableIterator::reconstructRecordBatches()
{
  // Type conversion, the code needs to be optimized
  for (unsigned int batchIdx = 0; batchIdx <  m_cRecordBatches->size(); batchIdx++)
  {
    std::shared_ptr<arrow::RecordBatch> currentBatch = (*m_cRecordBatches)[batchIdx];
    std::shared_ptr<arrow::Schema> schema = currentBatch->schema();
    // These copies will be used if rebuilding the RecordBatch if necessary
    bool needsRebuild = false;
    std::vector<std::shared_ptr<arrow::Field>> futureFields;
    std::vector<std::shared_ptr<arrow::Array>> futureColumns;

    for (int colIdx = 0; colIdx < currentBatch->num_columns(); colIdx++)
    {
      std::shared_ptr<arrow::Array> columnArray = currentBatch->column(colIdx);
      std::shared_ptr<arrow::Field> field = schema->field(colIdx);
      std::shared_ptr<arrow::DataType> dt = field->type();
      std::shared_ptr<const arrow::KeyValueMetadata> metaData = field->metadata();
      SnowflakeType::Type st = SnowflakeType::snowflakeTypeFromString(
          metaData->value(metaData->FindKey("logicalType")));

      // reconstruct columnArray in place
      switch (st)
      {
        case SnowflakeType::Type::FIXED:
        {
          int scale = metaData
                          ? std::stoi(metaData->value(metaData->FindKey("scale")))
                          : 0;
          if (scale > 0 && dt->id() != arrow::Type::type::DECIMAL)
          {
            logger->debug(
              __FILE__,
              __func__,
              __LINE__,
              "Convert fixed number column to double column, column scale %d, column type id: %d",
              scale,
              dt->id()
            );
            convertScaledFixedNumberColumn(
                batchIdx,
                colIdx,
                field,
                columnArray,
                scale,
                futureFields,
                futureColumns,
                needsRebuild
            );
          }
          break;
        }

        case SnowflakeType::Type::ANY:
        case SnowflakeType::Type::ARRAY:
        case SnowflakeType::Type::BOOLEAN:
        case SnowflakeType::Type::CHAR:
        case SnowflakeType::Type::OBJECT:
        case SnowflakeType::Type::BINARY:
        case SnowflakeType::Type::VARIANT:
        case SnowflakeType::Type::TEXT:
        case SnowflakeType::Type::REAL:
        case SnowflakeType::Type::DATE:
        {
          // Do not need to convert
          break;
        }

        case SnowflakeType::Type::TIME:
        {
          int scale = metaData
                          ? std::stoi(metaData->value(metaData->FindKey("scale")))
                          : 9;

          convertTimeColumn(batchIdx, colIdx, field, columnArray, scale, futureFields, futureColumns, needsRebuild);
          break;
        }

        case SnowflakeType::Type::TIMESTAMP_NTZ:
        {
          int scale = metaData
                          ? std::stoi(metaData->value(metaData->FindKey("scale")))
                          : 9;

          convertTimestampColumn(batchIdx, colIdx, field, columnArray, scale, futureFields, futureColumns, needsRebuild);
          break;
        }

        case SnowflakeType::Type::TIMESTAMP_LTZ:
        {
          int scale = metaData
                          ? std::stoi(metaData->value(metaData->FindKey("scale")))
                          : 9;

          convertTimestampColumn(batchIdx, colIdx, field, columnArray, scale, futureFields, futureColumns, needsRebuild, m_timezone);
          break;
        }

        case SnowflakeType::Type::TIMESTAMP_TZ:
        {
          int scale = metaData
                          ? std::stoi(metaData->value(metaData->FindKey("scale")))
                          : 9;
          int byteLength =
            metaData
                ? std::stoi(metaData->value(metaData->FindKey("byteLength")))
                : 16;

          convertTimestampTZColumn(batchIdx, colIdx, field, columnArray, scale, byteLength, futureFields, futureColumns, needsRebuild, m_timezone);
          break;
        }

        default:
        {
          std::string errorInfo = Logger::formatString(
              "[Snowflake Exception] unknown snowflake data type : %s",
              metaData->value(metaData->FindKey("logicalType")).c_str());
          logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
          PyErr_SetString(PyExc_Exception, errorInfo.c_str());
          return;
        }
      }
    }

    if (needsRebuild)
    {
      std::shared_ptr<arrow::Schema> futureSchema = arrow::schema(futureFields, schema->metadata());
      (*m_cRecordBatches)[batchIdx] = arrow::RecordBatch::Make(futureSchema, currentBatch->num_rows(), futureColumns);
    }
  }
}

CArrowTableIterator::CArrowTableIterator(
PyObject* context,
std::vector<std::shared_ptr<arrow::RecordBatch>>* batches,
const bool number_to_decimal
)
: CArrowIterator(batches),
m_context(context),
m_pyTableObjRef(nullptr),
m_convert_number_to_decimal(number_to_decimal)
{
  py::UniqueRef tz(PyObject_GetAttrString(m_context, "_timezone"));
  PyArg_Parse(tz.get(), "s", &m_timezone);
}

std::shared_ptr<ReturnVal> CArrowTableIterator::next()
{
  bool firstDone = this->convertRecordBatchesToTable();
  if (firstDone && m_cTable)
  {
    m_pyTableObjRef.reset(arrow::py::wrap_table(m_cTable));
    return std::make_shared<ReturnVal>(m_pyTableObjRef.get(), nullptr);
  }
  else
  {
    return std::make_shared<ReturnVal>(Py_None, nullptr);
  }
}

void CArrowTableIterator::replaceColumn(
    const unsigned int batchIdx,
    const int colIdx,
    const std::shared_ptr<arrow::Field>& newField,
    const std::shared_ptr<arrow::Array>& newColumn,
    std::vector<std::shared_ptr<arrow::Field>>& futureFields,
    std::vector<std::shared_ptr<arrow::Array>>& futureColumns,
    bool& needsRebuild)
{
  // replace the targeted column
  if (needsRebuild == false)
  {
    // First time of modifying batches, we have to make a deep copy of fields and columns
    std::shared_ptr<arrow::RecordBatch> currentBatch = (*m_cRecordBatches)[batchIdx];
    futureFields = currentBatch->schema()->fields();
    futureColumns = currentBatch->columns();
    needsRebuild = true;
  }
  futureFields[colIdx] = newField;
  futureColumns[colIdx] = newColumn;
}

template <typename T>
double CArrowTableIterator::convertScaledFixedNumberToDouble(
  const unsigned int scale,
  T originalValue
)
{
  if (scale < 9)
  {
    // simply use divide to convert decimal value in double
    return (double) originalValue / sf::internal::powTenSB4[scale];
  }
  else
  {
    // when scale is large, convert the value to string first and then convert it to double
    // otherwise, it may loss precision
    std::string valStr = std::to_string(originalValue);
    int negative = valStr.at(0) == '-' ? 1:0;
    unsigned int digits = valStr.length() - negative;
    if (digits <= scale)
    {
      int numOfZeroes = scale - digits + 1;
      valStr.insert(negative, std::string(numOfZeroes, '0'));
    }
    valStr.insert(valStr.length() - scale, ".");
    std::size_t offset = 0;
    return std::stod(valStr, &offset);
  }
}

void CArrowTableIterator::convertScaledFixedNumberColumn(
  const unsigned int batchIdx,
  const int colIdx,
  const std::shared_ptr<arrow::Field> field,
  const std::shared_ptr<arrow::Array> columnArray,
  const unsigned int scale,
  std::vector<std::shared_ptr<arrow::Field>>& futureFields,
  std::vector<std::shared_ptr<arrow::Array>>& futureColumns,
  bool& needsRebuild
)
{
// Convert scaled fixed number to either Double, or Decimal based on setting
  if (m_convert_number_to_decimal){
    convertScaledFixedNumberColumnToDecimalColumn(
      batchIdx,
      colIdx,
      field,
      columnArray,
      scale,
      futureFields,
      futureColumns,
      needsRebuild
      );
  } else {
    convertScaledFixedNumberColumnToDoubleColumn(
      batchIdx,
      colIdx,
      field,
      columnArray,
      scale,
      futureFields,
      futureColumns,
      needsRebuild
      );
  }
}

void CArrowTableIterator::convertScaledFixedNumberColumnToDecimalColumn(
  const unsigned int batchIdx,
  const int colIdx,
  const std::shared_ptr<arrow::Field> field,
  const std::shared_ptr<arrow::Array> columnArray,
  const unsigned int scale,
  std::vector<std::shared_ptr<arrow::Field>>& futureFields,
  std::vector<std::shared_ptr<arrow::Array>>& futureColumns,
  bool& needsRebuild
)
{
  // Convert to decimal columns
  const std::shared_ptr<arrow::DataType> field_type = field->type();
  const std::shared_ptr<arrow::DataType> destType = arrow::decimal128(38, scale);
  std::shared_ptr<arrow::Field> doubleField = std::make_shared<arrow::Field>(
      field->name(), destType, field->nullable());
  arrow::Decimal128Builder builder(destType, m_pool);
  arrow::Status ret;
  for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
  {
    if (columnArray->IsValid(rowIdx))
    {
      arrow::Decimal128 val;
      switch (field_type->id())
      {
        case arrow::Type::type::INT8:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int8Array>(columnArray)->Value(rowIdx);
          val = arrow::Decimal128(originalVal);
          break;
        }
        case arrow::Type::type::INT16:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int16Array>(columnArray)->Value(rowIdx);
          val = arrow::Decimal128(originalVal);
          break;
        }
        case arrow::Type::type::INT32:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int32Array>(columnArray)->Value(rowIdx);
          val = arrow::Decimal128(originalVal);
          break;
        }
        case arrow::Type::type::INT64:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx);
          val = arrow::Decimal128(originalVal);
          break;
        }
        default:
          std::string errorInfo = Logger::formatString(
              "[Snowflake Exception] unknown arrow internal data type(%d) "
              "for FIXED data",
              field_type->id());
          logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
          return;
      }
      ret = builder.Append(val);
    }
    else
    {
      ret = builder.AppendNull();
    }
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to append Decimal value: internal data type(%d), errorInfo: %s",
      field_type->id(),  ret.message().c_str());
  }

  std::shared_ptr<arrow::Array> doubleArray;
  ret = builder.Finish(&doubleArray);
  SF_CHECK_ARROW_RC(ret,
    "[Snowflake Exception] arrow failed to finish Decimal array, errorInfo: %s",
    ret.message().c_str());

  // replace the targeted column
  replaceColumn(batchIdx, colIdx, doubleField, doubleArray, futureFields, futureColumns, needsRebuild);
}

void CArrowTableIterator::convertScaledFixedNumberColumnToDoubleColumn(
  const unsigned int batchIdx,
  const int colIdx,
  const std::shared_ptr<arrow::Field> field,
  const std::shared_ptr<arrow::Array> columnArray,
  const unsigned int scale,
  std::vector<std::shared_ptr<arrow::Field>>& futureFields,
  std::vector<std::shared_ptr<arrow::Array>>& futureColumns,
  bool& needsRebuild
)
{
  // Convert to arrow double/float64 column
  std::shared_ptr<arrow::Field> doubleField = std::make_shared<arrow::Field>(
      field->name(), arrow::float64(), field->nullable());
  arrow::DoubleBuilder builder(m_pool);
  arrow::Status ret;
  auto dt = field->type();
  for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
  {
    if (columnArray->IsValid(rowIdx))
    {
      double val;
      switch (dt->id())
      {
        case arrow::Type::type::INT8:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int8Array>(columnArray)->Value(rowIdx);
          val = convertScaledFixedNumberToDouble(scale, originalVal);
          break;
        }
        case arrow::Type::type::INT16:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int16Array>(columnArray)->Value(rowIdx);
          val = convertScaledFixedNumberToDouble(scale, originalVal);
          break;
        }
        case arrow::Type::type::INT32:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int32Array>(columnArray)->Value(rowIdx);
          val = convertScaledFixedNumberToDouble(scale, originalVal);
          break;
        }
        case arrow::Type::type::INT64:
        {
          auto originalVal = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx);
          val = convertScaledFixedNumberToDouble(scale, originalVal);
          break;
        }
        default:
          std::string errorInfo = Logger::formatString(
              "[Snowflake Exception] unknown arrow internal data type(%d) "
              "for FIXED data",
              dt->id());
          logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
          return;
      }
      ret = builder.Append(val);
    }
    else
    {
      ret = builder.AppendNull();
    }
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to append Double value: internal data type(%d), errorInfo: %s",
      dt->id(),  ret.message().c_str());
  }

  std::shared_ptr<arrow::Array> doubleArray;
  ret = builder.Finish(&doubleArray);
  SF_CHECK_ARROW_RC(ret,
    "[Snowflake Exception] arrow failed to finish Double array, errorInfo: %s",
    ret.message().c_str());

  // replace the targeted column
  replaceColumn(batchIdx, colIdx, doubleField, doubleArray, futureFields, futureColumns, needsRebuild);
}

void CArrowTableIterator::convertTimeColumn(
  const unsigned int batchIdx,
  const int colIdx,
  const std::shared_ptr<arrow::Field> field,
  const std::shared_ptr<arrow::Array> columnArray,
  const int scale,
  std::vector<std::shared_ptr<arrow::Field>>& futureFields,
  std::vector<std::shared_ptr<arrow::Array>>& futureColumns,
  bool& needsRebuild
)
{
  std::shared_ptr<arrow::Field> tsField;
  std::shared_ptr<arrow::Array> tsArray;
  arrow::Status ret;
  auto dt = field->type();
  // Convert to arrow time column
  if (scale == 0)
  {
    auto timeType = arrow::time32(arrow::TimeUnit::SECOND);
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::Time32Builder builder(timeType, m_pool);


    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int32_t originalVal = std::static_pointer_cast<arrow::Int32Array>(columnArray)->Value(rowIdx);
        // unit is second
        ret = builder.Append(originalVal);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d)"
        ", errorInfo: %s",
        dt->id(), ret.message().c_str());
    }

    ret = builder.Finish(&tsArray);
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }
  else if (scale <= 3)
  {
    auto timeType = arrow::time32(arrow::TimeUnit::MILLI);
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::Time32Builder builder(timeType, m_pool);

    arrow::Status ret;
    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int32_t val = std::static_pointer_cast<arrow::Int32Array>(columnArray)->Value(rowIdx)
          * sf::internal::powTenSB4[3 - scale];
        // unit is millisecond
        ret = builder.Append(val);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d)"
        ", errorInfo: %s",
        dt->id(), ret.message().c_str());
    }

    ret = builder.Finish(&tsArray);
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }
  else if (scale <= 6)
  {
    auto timeType = arrow::time64(arrow::TimeUnit::MICRO);
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::Time64Builder builder(timeType, m_pool);

    arrow::Status ret;
    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int64_t val;
        switch (dt->id())
        {
          case arrow::Type::type::INT32:
            val = std::static_pointer_cast<arrow::Int32Array>(columnArray)->Value(rowIdx);
            break;
          case arrow::Type::type::INT64:
            val = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx);
            break;
          default:
            std::string errorInfo = Logger::formatString(
                "[Snowflake Exception] unknown arrow internal data type(%d) "
                "for FIXED data",
                dt->id());
            logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
            return;
        }
        val *= sf::internal::powTenSB4[6 - scale];
        // unit is microsecond
        ret = builder.Append(val);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d), errorInfo: %s",
        dt->id(),  ret.message().c_str());
    }

    ret = builder.Finish(&tsArray);
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }
  else
  {
    // Note: Python/Pandas Time does not support nanoseconds,
    // So truncate the time values to microseconds
    auto timeType = arrow::time64(arrow::TimeUnit::MICRO);
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::Time64Builder builder(timeType, m_pool);

    arrow::Status ret;
    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int64_t val;
        switch (dt->id())
        {
          case arrow::Type::type::INT32:
            val = std::static_pointer_cast<arrow::Int32Array>(columnArray)->Value(rowIdx);
            break;
          case arrow::Type::type::INT64:
            val = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx);
            break;
          default:
            std::string errorInfo = Logger::formatString(
                "[Snowflake Exception] unknown arrow internal data type(%d) "
                "for FIXED data",
                dt->id());
            logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
            return;
        }
        val /= sf::internal::powTenSB4[scale - 6];
        // unit is microsecond
        ret = builder.Append(val);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d), errorInfo: %s",
        dt->id(),  ret.message().c_str());
    }

    ret = builder.Finish(&tsArray);
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }

  // replace the targeted column
  replaceColumn(batchIdx, colIdx, tsField, tsArray, futureFields, futureColumns, needsRebuild);
}

void CArrowTableIterator::convertTimestampColumn(
  const unsigned int batchIdx,
  const int colIdx,
  const std::shared_ptr<arrow::Field> field,
  const std::shared_ptr<arrow::Array> columnArray,
  const int scale,
  std::vector<std::shared_ptr<arrow::Field>>& futureFields,
  std::vector<std::shared_ptr<arrow::Array>>& futureColumns,
  bool& needsRebuild,
  const std::string timezone
)
{
  std::shared_ptr<arrow::Field> tsField;
  std::shared_ptr<arrow::Array> tsArray;
  arrow::Status ret;
  std::shared_ptr<arrow::DataType> timeType;
  auto dt = field->type();
  // Convert to arrow time column
  if (scale == 0)
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::SECOND, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::SECOND);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::TimestampBuilder builder(timeType, m_pool);


    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int64_t originalVal = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx);
        // unit is second
        ret = builder.Append(originalVal);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d), errorInfo: %s",
        dt->id(),  ret.message().c_str());
    }

    ret = builder.Finish(&tsArray); SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }
  else if (scale <= 3)
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MILLI, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MILLI);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::TimestampBuilder builder(timeType, m_pool);

    arrow::Status ret;
    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int64_t val = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx)
          * sf::internal::powTenSB4[3 - scale];
        // unit is millisecond
        ret = builder.Append(val);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d), errorInfo: %s",
        dt->id(),  ret.message().c_str());
    }

    ret = builder.Finish(&tsArray);
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }
  else if (scale <= 6)
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MICRO, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MICRO);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::TimestampBuilder builder(timeType, m_pool);

    arrow::Status ret;
    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int64_t val;
        switch (dt->id())
        {
          case arrow::Type::type::INT64:
            val = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx);
            break;
          default:
            std::string errorInfo = Logger::formatString(
                "[Snowflake Exception] unknown arrow internal data type(%d) "
                "for FIXED data",
                dt->id());
            logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
            return;
        }
        val *= sf::internal::powTenSB4[6 - scale];
        // unit is microsecond
        ret = builder.Append(val);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d), errorInfo: %s",
        dt->id(),  ret.message().c_str());
    }

    ret = builder.Finish(&tsArray);
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }
  else
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::NANO, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::NANO);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
    arrow::TimestampBuilder builder(timeType, m_pool);
    std::shared_ptr<arrow::StructArray> structArray;
    if (dt->id() == arrow::Type::type::STRUCT)
    {
      structArray = std::dynamic_pointer_cast<arrow::StructArray>(columnArray);
    }
    arrow::Status ret;
    for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
    {
      if (columnArray->IsValid(rowIdx))
      {
        int64_t val;
        switch (dt->id())
        {
          case arrow::Type::type::INT64:
            val = std::static_pointer_cast<arrow::Int64Array>(columnArray)->Value(rowIdx);
            val *= sf::internal::powTenSB4[9 - scale];
            break;
          case arrow::Type::type::STRUCT:
            {
              int64_t epoch = std::static_pointer_cast<arrow::Int64Array>(
                structArray->GetFieldByName(sf::internal::FIELD_NAME_EPOCH))->Value(rowIdx);
              int32_t fraction = std::static_pointer_cast<arrow::Int32Array>(
                structArray->GetFieldByName(sf::internal::FIELD_NAME_FRACTION))->Value(rowIdx);
              val = epoch * sf::internal::powTenSB4[9] + fraction;
            }
            break;
          default:
            std::string errorInfo = Logger::formatString(
                "[Snowflake Exception] unknown arrow internal data type(%d) "
                "for FIXED data",
                dt->id());
            logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
            return;
        }
        // unit is nanosecond
        ret = builder.Append(val);
      }
      else
      {
        ret = builder.AppendNull();
      }
      SF_CHECK_ARROW_RC(ret,
        "[Snowflake Exception] arrow failed to append value: internal data type(%d), errorInfo: %s",
        dt->id(),  ret.message().c_str());
    }

    ret = builder.Finish(&tsArray);
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
      ret.message().c_str());
  }

  // replace the targeted column
  replaceColumn(batchIdx, colIdx, tsField, tsArray, futureFields, futureColumns, needsRebuild);
}

void CArrowTableIterator::convertTimestampTZColumn(
  const unsigned int batchIdx,
  const int colIdx,
  const std::shared_ptr<arrow::Field> field,
  const std::shared_ptr<arrow::Array> columnArray,
  const int scale,
  const int byteLength,
  std::vector<std::shared_ptr<arrow::Field>>& futureFields,
  std::vector<std::shared_ptr<arrow::Array>>& futureColumns,
  bool& needsRebuild,
  const std::string timezone
)
{
  std::shared_ptr<arrow::Field> tsField;
  std::shared_ptr<arrow::Array> tsArray;
  std::shared_ptr<arrow::DataType> timeType;
  auto dt = field->type();
  // Convert to arrow time column
  std::shared_ptr<arrow::StructArray> structArray;
  structArray = std::dynamic_pointer_cast<arrow::StructArray>(columnArray);
  auto epochArray = std::static_pointer_cast<arrow::Int64Array>(
          structArray->GetFieldByName(sf::internal::FIELD_NAME_EPOCH));
  auto fractionArray = std::static_pointer_cast<arrow::Int32Array>(
          structArray->GetFieldByName(sf::internal::FIELD_NAME_FRACTION));

  if (scale == 0)
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::SECOND, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::SECOND);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
  }
  else if (scale <= 3)
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MILLI, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MILLI);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
  }
  else if (scale <= 6)
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MICRO, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::MICRO);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
  }
  else
  {
    if (!timezone.empty())
    {
      timeType = arrow::timestamp(arrow::TimeUnit::NANO, timezone);
    }
    else
    {
      timeType = arrow::timestamp(arrow::TimeUnit::NANO);
    }
    tsField = std::make_shared<arrow::Field>(
      field->name(), timeType, field->nullable());
  }

  arrow::TimestampBuilder builder(timeType, m_pool);
  arrow::Status ret;
  for(int64_t rowIdx = 0; rowIdx < columnArray->length(); rowIdx++)
  {
    if (columnArray->IsValid(rowIdx))
    {
      if (byteLength == 8)
      {
        // two fields
        int64_t epoch = epochArray->Value(rowIdx);
        // append value
        if (scale == 0)
        {
          ret = builder.Append(epoch);
        }
        else if (scale <= 3)
        {
          ret = builder.Append(epoch * sf::internal::powTenSB4[3-scale]);
        }
        else if (scale <= 6)
        {
          ret = builder.Append(epoch * sf::internal::powTenSB4[6-scale]);
        }
        else
        {
          ret = builder.Append(epoch * sf::internal::powTenSB4[9 - scale]);
        }
      }
      else if (byteLength == 16)
      {
        // three fields
        int64_t epoch = epochArray->Value(rowIdx);
        int32_t fraction = fractionArray->Value(rowIdx);
        if (scale == 0)
        {
          ret = builder.Append(epoch);
        }
        else if (scale <= 3)
        {
          ret = builder.Append(epoch * sf::internal::powTenSB4[3-scale]
                  + fraction / sf::internal::powTenSB4[6]);
        }
        else if (scale <= 6)
        {
          ret = builder.Append(epoch * sf::internal::powTenSB4[6] + fraction / sf::internal::powTenSB4[3]);
        }
        else
        {
          ret = builder.Append(epoch * sf::internal::powTenSB4[9] + fraction);
        }
      }
      else
      {
        std::string errorInfo = Logger::formatString(
          "[Snowflake Exception] unknown arrow internal data type(%d) "
          "for TIMESTAMP_TZ data",
          dt->id());
        logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str());
        PyErr_SetString(PyExc_Exception, errorInfo.c_str());
        return;
      }
    }
    else
    {
      ret = builder.AppendNull();
    }
    SF_CHECK_ARROW_RC(ret,
      "[Snowflake Exception] arrow failed to append value: internal data type(%d), errorInfo: %s",
      dt->id(),  ret.message().c_str());
  }

  ret = builder.Finish(&tsArray);
  SF_CHECK_ARROW_RC(ret,
    "[Snowflake Exception] arrow failed to finish array, errorInfo: %s",
    ret.message().c_str());

  // replace the targeted column
  replaceColumn(batchIdx, colIdx, tsField, tsArray, futureFields, futureColumns, needsRebuild);
}

bool CArrowTableIterator::convertRecordBatchesToTable()
{
  // only do conversion once and there exist some record batches
  if (!m_cTable && !m_cRecordBatches->empty())
  {
    reconstructRecordBatches();
    arrow::Result<std::shared_ptr<arrow::Table>> ret = arrow::Table::FromRecordBatches(*m_cRecordBatches);
    SF_CHECK_ARROW_RC_AND_RETURN(ret, false,
      "[Snowflake Exception] arrow failed to build table from batches, errorInfo: %s",
      ret.status().message().c_str());
    m_cTable = ret.ValueOrDie();

    return true;
  }
  return false;
}

} // namespace sf