Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
upsert / lib / upsert / merge_function / postgresql.rb
Size: Mime:
class Upsert
  class MergeFunction
    # @private
    module Postgresql
      def self.included(klass)
        klass.extend ClassMethods
      end

      module ClassMethods
        def clear(connection)
          # http://stackoverflow.com/questions/7622908/postgresql-drop-function-without-knowing-the-number-type-of-parameters
          connection.execute(%{
            CREATE OR REPLACE FUNCTION pg_temp.upsert_delfunc(text)
              RETURNS void AS
            $BODY$
            DECLARE
              _sql text;
            BEGIN
            FOR _sql IN
              SELECT 'DROP FUNCTION ' || quote_ident(n.nspname)
                               || '.' || quote_ident(p.proname)
                               || '(' || pg_catalog.pg_get_function_identity_arguments(p.oid) || ');'
              FROM   pg_catalog.pg_proc p
              LEFT   JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
              WHERE  p.proname = $1
              AND    pg_catalog.pg_function_is_visible(p.oid) -- you may or may not want this
            LOOP
              EXECUTE _sql;
            END LOOP;
            END;
            $BODY$
              LANGUAGE plpgsql;
          })
          connection.execute(%{SELECT proname FROM pg_proc WHERE proname LIKE '#{MergeFunction::NAME_PREFIX}%'}).each do |row|
            k = row['proname']
            next if k == 'upsert_delfunc'
            Upsert.logger.info %{[upsert] Dropping function #{k.inspect}}
            connection.execute %{SELECT pg_temp.upsert_delfunc('#{k}')}
          end
        end
      end

      attr_reader :quoted_setter_names
      attr_reader :quoted_selector_names

      def initialize(controller, *args)
        super(controller, *args[0...-1])
        @quoted_setter_names = setter_keys.map { |k| connection.quote_ident k }
        @quoted_selector_names = selector_keys.map { |k| connection.quote_ident k }
        @assume_native = args.last
      end

      def execute(row)
        use_pg_native? ? pg_native(row) : pg_function(row)
      end

      def pg_function(row)
        values = []
        values += row.selector.values
        values += row.setter.values
        hstore_delete_handlers.each do |hstore_delete_handler|
          values << row.hstore_delete_keys.fetch(hstore_delete_handler.name, [])
        end
        Upsert.logger.debug do
          %{[upsert]\n\tSelector: #{row.selector.inspect}\n\tSetter: #{row.setter.inspect}}
        end

        first_try = true
        begin
          create! if connection.in_transaction? && !function_exists?
          execute_parameterized(sql, values.map { |v| connection.bind_value v })
        rescue self.class::ERROR_CLASS => pg_error
          if pg_error.message =~ /function #{name}.* does not exist/i
            if first_try
              Upsert.logger.info %{[upsert] Function #{name.inspect} went missing, trying to recreate}
              first_try = false
              create!
              retry
            end
            Upsert.logger.info %{[upsert] Failed to create function #{name.inspect} for some reason}
            raise pg_error
          else
            raise pg_error
          end
        end
      end

      def function_exists?
        # The ::int is a hack until jruby+jdbc is happy with bigints being returned
        @function_exists ||= controller.connection.execute("SELECT count(*)::int AS cnt FROM pg_proc WHERE lower(proname) = lower('#{name}')").first["cnt"].to_i > 0
      end

      # strangely ? can't be used as a placeholder
      def sql
        @sql ||= begin
          bind_params = []
          i = 1
          (selector_keys.length + setter_keys.length).times do
            bind_params << "$#{i}"
            i += 1
          end
          hstore_delete_handlers.length.times do
            bind_params << "$#{i}::text[]"
            i += 1
          end
          %{SELECT #{name}(#{bind_params.join(', ')})}
        end
      end

      def use_pg_native?
        assume_native? || (server_version >= 95 && unique_index_on_selector?)
      end

      def assume_native?
        @assume_native
      end

      def server_version
        @server_version ||=
          controller.connection.execute("SHOW server_version").first["server_version"].split('.')[0..1].join('').to_i
      end

      def schema_query
        execute_parameterized(
          %{
            SELECT array_agg(column_name::text) AS index_columns FROM information_schema.constraint_column_usage
              JOIN pg_catalog.pg_constraint ON constraint_name::text = conname::text
              WHERE table_name = $1 AND conrelid = $1::regclass::oid AND contype = 'u'
              GROUP BY table_catalog, table_name, constraint_name
          },
          [table_name]
        )
      end

      def pg_native(row)
        bind_setter_values = row.setter.values.map { |v| connection.bind_value v }

        upsert_sql = %{
          INSERT INTO #{quoted_table_name} (#{quoted_setter_names.join(',')})
          VALUES (#{insert_bind_placeholders(row).join(', ')})
          ON CONFLICT(#{quoted_selector_names.join(', ')})
          DO UPDATE SET (#{quoted_setter_names.join(', ')}) = (#{conflict_bind_placeholders(row).join(', ')})
        }

        execute_parameterized(upsert_sql, bind_setter_values)
      end

      def hstore_delete_function(sql, row, column_definition)
        parts = []
        if row.hstore_delete_keys.key?(column_definition.name)
          parts << "DELETE("
        end
        parts << sql
        if row.hstore_delete_keys.key?(column_definition.name)
          keys = row.hstore_delete_keys[column_definition.name].map { |k| "'#{k.to_s.gsub("'", "\\'")}'" }
          parts << ", ARRAY[#{keys.join(', ')}])"
        end

        parts.join(" ")
      end

      def insert_bind_placeholders(row)
        if row.hstore_delete_keys.empty?
          @insert_bind_placeholders ||= setter_column_definitions.each_with_index.map do |column_definition, i|
            "$#{i + 1}"
          end
        else
          setter_column_definitions.each_with_index.map do |column_definition, i|
            idx = i + 1
            if column_definition.hstore?
              hstore_delete_function("$#{idx}", row, column_definition)
            else
              "$#{idx}"
            end
          end
        end
      end

      def conflict_bind_placeholders(row)
        if row.hstore_delete_keys.empty?
          @conflict_bind_placeholders ||= setter_column_definitions.each_with_index.map do |column_definition, i|
            idx = i + 1
            if column_definition.hstore?
              "CASE WHEN #{quoted_table_name}.#{column_definition.quoted_name} IS NULL THEN $#{idx} ELSE" \
                + " (#{quoted_table_name}.#{column_definition.quoted_name} || $#{idx})" \
                + " END"
            else
              "$#{idx}"
            end
          end
        else
          setter_column_definitions.each_with_index.map do |column_definition, i|
            idx = i + 1
            if column_definition.hstore?
              "CASE WHEN #{quoted_table_name}.#{column_definition.quoted_name} IS NULL THEN " \
                + hstore_delete_function("$#{idx}", row, column_definition) \
                + " ELSE " \
                + hstore_delete_function("(#{quoted_table_name}.#{column_definition.quoted_name} || $#{idx})", row, column_definition) \
                + " END"
            else
              "$#{idx}"
            end
          end
        end
      end

      class HstoreDeleteHandler
        attr_reader :merge_function
        attr_reader :column_definition
        def initialize(merge_function, column_definition)
          @merge_function = merge_function
          @column_definition = column_definition
        end
        def name
          column_definition.name
        end
        def to_arg
          "#{quoted_name} text[]"
        end
        # use coalesce(foo, '{}':text[])
        def to_setter
          "#{column_definition.quoted_name} = DELETE(#{column_definition.quoted_name}, #{quoted_name})"
        end
        def to_pgsql
          %{
            IF array_length(#{quoted_name}, 1) > 0 THEN
              UPDATE #{merge_function.quoted_table_name} SET #{to_setter}
                WHERE #{merge_function.selector_column_definitions.map(&:to_selector).join(' AND ') };
            END IF;
          }.gsub(/\s+/, ' ')
        end
        private
        def quoted_name
          @quoted_name ||= merge_function.connection.quote_ident "_delete_#{column_definition.name}"
        end
      end

      def hstore_delete_handlers
        @hstore_delete_handlers ||= setter_column_definitions.select do |column_definition|
          column_definition.hstore?
        end.map do |column_definition|
          HstoreDeleteHandler.new self, column_definition
        end
      end

      def selector_column_definitions
        column_definitions.select { |cd| selector_keys.include?(cd.name) }
      end

      def setter_column_definitions
        column_definitions.select { |cd| setter_keys.include?(cd.name) }
      end

      def update_column_definitions
        setter_column_definitions.select { |cd| cd.name !~ CREATED_COL_REGEX }
      end

      # the "canonical example" from http://www.postgresql.org/docs/9.1/static/plpgsql-control-structures.html#PLPGSQL-UPSERT-EXAMPLE
      # differentiate between selector and setter
      def create!
        Upsert.logger.info "[upsert] Creating or replacing database function #{name.inspect} on table #{table_name.inspect} for selector #{selector_keys.map(&:inspect).join(', ')} and setter #{setter_keys.map(&:inspect).join(', ')}"
        first_try = true
        connection.execute(%{
          CREATE OR REPLACE FUNCTION #{name}(#{(selector_column_definitions.map(&:to_selector_arg) + setter_column_definitions.map(&:to_setter_arg) + hstore_delete_handlers.map(&:to_arg)).join(', ')}) RETURNS VOID AS
          $$
          DECLARE
            first_try INTEGER := 1;
          BEGIN
            LOOP
              -- first try to update the key
              UPDATE #{quoted_table_name} SET #{update_column_definitions.map(&:to_setter).join(', ')}
                WHERE #{selector_column_definitions.map(&:to_selector).join(' AND ') };
              IF found THEN
                #{hstore_delete_handlers.map(&:to_pgsql).join(' ')}
                RETURN;
              END IF;
              -- not there, so try to insert the key
              -- if someone else inserts the same key concurrently,
              -- we could get a unique-key failure
              BEGIN
                INSERT INTO #{quoted_table_name}(#{setter_column_definitions.map(&:quoted_name).join(', ')}) VALUES (#{setter_column_definitions.map(&:to_setter_value).join(', ')});
                #{hstore_delete_handlers.map(&:to_pgsql).join(' ')}
                RETURN;
              EXCEPTION WHEN unique_violation THEN
                -- seamusabshere 9/20/12 only retry once
                IF (first_try = 1) THEN
                  first_try := 0;
                ELSE
                  RETURN;
                END IF;
                -- Do nothing, and loop to try the UPDATE again.
              END;
            END LOOP;
          END;
          $$
          LANGUAGE plpgsql;
        })
      rescue
        if first_try and $!.message =~ /tuple concurrently updated/
          first_try = false
          retry
        else
          raise $!
        end
      end
    end
  end
end