Functions in data structures

Ulf Wiger <>
Wed Jun 18 16:47:36 CEST 2003


On Wed, 18 Jun 2003, Joachim Durchholz wrote:

>Changes in the Mnesia schema on the server comes next. I've
>been thinking about adding a version number to each table
>name, and add a layer that retrieves and converts data from
>previous-version tables if the current-version table
>doesn't have the data. Updates simply go to the latest
>version of the table, resulting in a convert-on-the-fly
>schema. (The critical point here is conversion, which must
>be 100% correct - but that would have to be correct whether
>the conversion is done in batch mode or on the fly.)

There is a transform_table() function in mnesia, which can
be used for making in-service schema updates and converting
all affected data within a transaction. This requires the
users of the data to be suspended during the transformation,
and that the users jump into new versions of the code before
they resume.

One suggestion is to begin using mnesia through an access
module and not using the mnesia functions directly. In fact,
the only function you really need to stub is the
mnesia:activity() function. If you consistently use your own
wrapper function

myMod:activity(Type, F) ->
  mnesia:activity(Type, F).

You can later change this function to

myMod:activity(Type, F) ->
  mnesia:activity(Type, F, [], myAccessModule)

where you can do all sorts of esoteric stuff in
myAccessModule (the rdbms contribution uses this method)

*** Example:
In the event of really tricky upgrades, you can use some
clever tricks in this access module, basically hiding the
nastiness from the applications.

I've attached what should be regarded as a prototype attempt
at handling redundancy reboot upgrades including schema
changes to mnesia. The idea is that one begins the upgrade
by loading an "upgrade module" on all nodes, containing no
other functionality than such that prepares for the upgrade.
The included mnesia access module provides a way to upgrade
a table definition using a forward transform for already
upgraded nodes, and a backward transform for not yet
upgraded nodes.

I've tested it for simple cases. It's never been used in a
real live upgrade, but it perhaps illustrates the potential.

/Uffe
-- 
Ulf Wiger, Senior Specialist,
   / / /   Architecture & Design of Carrier-Class Software
  / / /    Strategic Product & System Management
 / / /     Ericsson AB, Connectivity and Control Nodes

-------------- next part --------------
%%% The contents of this file are subject to the Erlang Public License,
%%% Version 1.0, (the "License"); you may not use this file except in
%%% compliance with the License. You may obtain a copy of the License at
%%% http://www.erlang.org/license/EPL1_0.txt
%%%
%%% Software distributed under the License is distributed on an "AS IS"
%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%%% the License for the specific language governing rights and limitations
%%% under the License.
%%%
%%% The Original Code is sysDb-0.1
%%%
%%% The Initial Developer of the Original Code is Ericsson Telecom
%%% AB. Portions created by Ericsson are Copyright (C), 1998, Ericsson
%%% Telecom AB. All Rights Reserved.
%%%
%%% Contributor(s):
%%%
%%% #0.   BASIC INFORMATION
%%% ----------------------------------------------------------
%%% %CCaseFile:	sysDb.erl %
%%% Author:          Ulf Wiger <>
%%% Description:     mnesia rwapper for enhanced upgrade support
%%%
%%% Modules used:    mnesia
%%%
%%% ----------------------------------------------------------
-module(sysDb).
-id('94/190 55-CNA 121 70 Ux').
-vsn('/main/R7A/10').
-date('01-04-11').

-ifdef(debug).
-define(dbg(Fmt, Args), io:format("~p-~p: " ++ Fmt, [?MODULE,?LINE|Args])).
-else.
-define(dbg(Fmt,Args), no_debug).
-endif.

%%% #2.    EXPORT LISTS
%%% ----------------------------------------------------------
%%% #2.1   EXPORTED INTERFACE FUNCTIONS
%%% ----------------------------------------------------------

%% Substitute for mnesia:transaction/1.
-export([transaction/1,
	 activity/2]).

-export([begin_upgrade/0,
	 activate_new_tables/0,
	 end_upgrade/0]).

-export([table_transform/4,
	 table_transform/5]).

-export([init_tables/0]).

%% Update
-export([lock/4,
	 write/5,
	 delete/5,
	 delete_object/5,
	 read/5,
	 match_object/5,
	 all_keys/4,
	 index_match_object/6,
	 index_read/6,
	 table_info/4]).



%%% ----------------------------------------------------------
%%% #2.2   EXPORTED INTERNAL FUNCTIONS
%%% ----------------------------------------------------------

-define(TEMP, sysDbTemp).
-define(TEMP_BAG, sysDbTempBag).
-define(LOCAL, sysDb_local).
-define(GLOBALS, sysDbGlobals).

-record(?TEMP, {key, value}).
-record(?TEMP_BAG, {key, value}).
-record(?LOCAL, {key, value}).
-record(?GLOBALS, {key, value}).



%%% #3.    CODE
%%% #---------------------------------------------------------
%%% #3.1   CODE FOR EXPORTED INTERFACE FUNCTIONS
%%% #---------------------------------------------------------


%%%----------------------------------------------------------------------
%%% -type activity(Fun : function())->
%%%     ResultFromFun.
%%% Input: Function object for Mnesia transaction
%%% Output: Result from 
%%% Exceptions: EXIT if transaction aborted
%%% Description: This is a wrapper around mnesia:activity/1.
%%%    It starts a mnesia activity with this module as the activity
%%%    module. This enables all SYSDB integrity checks.
%%%----------------------------------------------------------------------


transaction(Fun) ->
    case is_upgrade() of
	false ->
	    mnesia:transaction(Fun);
	true ->
	    case catch mnesia:activity(transaction, Fun, [], ?MODULE) of
		{'EXIT', Reason} ->
		    {aborted, Reason};
		Result ->
		    {atomic, Result}
	    end
    end.

activity(Type, Fun) ->
    case is_upgrade() of
	false ->
	    mnesia:activity(Type, Fun);
	true ->
	    mnesia:activity(Type, Fun, [], ?MODULE)
    end.

begin_upgrade() ->
    mnesia:activity(sync_dirty, 
		    fun() ->
			    mnesia:write(#?GLOBALS{key = is_upgrade, 
						   value = true})
		    end).

table_transform(Tab, FwdFun, BwdFun, NewAttributeList) ->
    RecName = mnesia:table_info(Tab, record_name),
    table_transform(Tab, FwdFun, BwdFun, NewAttributeList, RecName).

table_transform(Tab, FwdFun, BwdFun, NewAttributeList, NewRecordName) ->
    Key = {conversion, Tab},
    Val = {FwdFun, BwdFun, NewAttributeList, NewRecordName},
    mnesia:activity(sync_dirty,
		    fun() ->
			    mnesia:write(#?GLOBALS{key = Key,
						   value = Val})
		    end).


activate_new_tables() ->
    Tabs = ets:match_object(?GLOBALS, #?GLOBALS{key = {conversion, '_'},
						value = '_'}),
    lists:foreach(
      fun(#?GLOBALS{key = {conversion, Tab}}) ->
	      mnesia:dirty_write(#?LOCAL{key = {version, Tab},
					 value = new})
      end, Tabs).

end_upgrade() ->
    %% We split the conversion/copy operations into multiple transactions
    %% In order to make sure there are no updates in the meantime.
    %% Currently, no action is taken to ensure that already started 
    %% activities actually finish before we commit the table transforms.
    %% however, chances are high that ets, dirty, and sync_dirty operations
    %% are lightweight enough that they will finish well before the transforms
    %% below are activated. Regular transactions are trickier, since they
    %% *might* end up waiting behind on of our table locks, and then continue
    %% believing that an upgrade is still in progress. We should deal with 
    %% this in lock() below... exactly how, I'm not sure.
    mnesia:activity(sync_dirty,
		    fun() ->
			    mnesia:write(#?GLOBALS{key = is_upgrade,
						   value = wait})
		    end),
    Tabs = ets:match_object(?GLOBALS, #?GLOBALS{key = {conversion, '_'},
						value = '_'}),

    lists:foreach(
      fun(#?GLOBALS{key = {conversion, Tab},
		    value = {FwF, BwF, NewAttrs, NewRecName}}) ->
	      {atomic, ok} = 
		  mnesia:transform_table(Tab, FwF, 
					 NewAttrs, NewRecName)
      end, Tabs),
    ok=io:format("transforms complete~n", []),
    lists:foreach(
      fun(#?GLOBALS{key = {conversion, Tab},
		    value = {FwF, BwF, NewAttrs, NewRecName}}) ->
	      F = fun() ->
			  mnesia:delete({?GLOBALS, {conversion, Tab}}),
			  copy_new_objs(Tab)
		  end,
	      mnesia:activity(transaction, F)
      end, Tabs),
    ok=io:format("copy complete~n", []),
    mnesia:activity(sync_dirty,
		    fun() ->
			    mnesia:delete({?GLOBALS, is_upgrade})
		    end).


init_tables() ->
    Nodes = mnesia:system_info(running_db_nodes),
    create_table(?LOCAL, [{ram_copies, Nodes},
			  {type, set},
			  {local_content, true},
			  {attributes, record_info(fields, ?LOCAL)}]),
    create_table(?TEMP, [{ram_copies, Nodes},
			 {type, set},
			 {attributes, record_info(fields, ?TEMP)}]),
    create_table(?TEMP_BAG, [{ram_copies, Nodes},
			     {type, bag},
			     {attributes, record_info(fields, ?TEMP_BAG)}]),
    create_table(?GLOBALS, [{ram_copies, Nodes},
			    {type, set},
			    {attributes, record_info(fields, ?GLOBALS)}]).

%% mnesia callbacks =====================================
%%   for each callback, an internal function is implemented.
%%   the internal functions are not exported, since they are only
%%   meant to be used for operations generated by the dictionary.
%%   No integrity checks are performed on the internal functions.


lock(ActivityId, Opaque, LockItem, LockKind) -> 
    mnesia:lock(ActivityId, Opaque, LockItem, LockKind).


write(ActivityId, Opaque, Tab, Rec, LockKind) ->
    case table_conversion_type(Tab) of
	none ->
	    mnesia:write(ActivityId, Opaque, Tab, Rec, LockKind);
	{Version, FwdF, BwdF} ->
	    {OldRec, NewRec} = 
		case Version of
		    old ->
			{Rec, FwdF(Rec)};
		    new ->
			{BwdF(Rec), Rec}
		end,
	    TempKey = {_, RecKey} = temp_key(Tab, Rec),
	    case mnesia:table_info(Tab, type) of
		bag ->
		    %% in order to implement some of the other access 
		    %% functions relatively safely, we must move all
		    %% objects with the same key to ?TEMP_BAG
		    case mnesia:read(ActivityId, Opaque, ?TEMP_BAG, 
				     RecKey, LockKind) of
			[] ->
			    OldObjs = mnesia:read(ActivityId, Opaque, 
						  Tab, RecKey, LockKind),
			    lists:foreach(
			      fun(X) ->
				      TempKeyX = temp_key(Tab, X),
				      mnesia:write(ActivityId, Opaque,
						   ?TEMP_BAG,
						   {TempKeyX, X}, LockKind)
			      end, OldObjs);
			_ ->
			    %% This has been done previously
			    ok
		    end,
		    mnesia:write(ActivityId, Opaque, ?TEMP_BAG, 
				 {TempKey, NewRec}, LockKind);
		_ ->
		    mnesia:write(ActivityId, Opaque, ?TEMP, 
				 #?TEMP{key = TempKey, 
					value = NewRec}, 
				 LockKind)
	    end,
	    mnesia:write(ActivityId, Opaque, Tab, OldRec, LockKind)
    end.


delete(ActivityId, Opaque, Tab, Key, LockKind) ->
    case table_conversion_type(Tab) of
	none ->
	    mnesia:delete(ActivityId, Opaque, Tab, Key, LockKind);
	_ ->
	    case mnesia:table_info(Tab, type) of
		bag ->
		    mnesia:delete(ActivityId, Opaque, ?TEMP_BAG, 
				  {Tab, Key}, LockKind);
		_ ->
		    %% set or ordered_set
		    mnesia:delete(ActivityId, Opaque, ?TEMP, 
				  {Tab, Key}, LockKind)
	    end,
	    mnesia:delete(ActivityId, Opaque, Tab, Key, LockKind)
    end.

delete_object(ActivityId, Opaque, Tab, Rec, LockKind) ->
    %% WARNING!!!
    %% We cannot be sure that a transform hits the target, since 
    %% attributes which have been added/removed are most likely initialized
    %% with default values in one direction or the other. Thus, an object
    %% written during the upgrade may not be delete:able with this function.
    case table_conversion_type(Tab) of
	none ->
	    mnesia:delete_object(ActivityId, Opaque, Tab, Rec, LockKind);
	{new, FwF, BwF} ->
	    NewRec = FwF(Rec),
	    Key = temp_key(Tab, Rec),
	    mnesia:delete_object(ActivityId, Opaque, Tab, Rec, LockKind),
	    TempTab = case mnesia:table_info(Tab, type) of
			  bag -> ?TEMP_BAG;
			  _ ->   ?TEMP
		      end,
	    mnesia:delete_object(ActivityId, Opaque, TempTab, 
				 {Key, NewRec}, LockKind);
	{old, FwF, BwF} ->
	    OldRec = BwF(Rec),
	    Key = temp_key(Tab, Rec),
	    mnesia:delete_object(ActivityId, Opaque, Tab, OldRec, LockKind),
	    TempTab = case mnesia:table_info(Tab, type) of
			  bag -> ?TEMP_BAG;
			  _ ->   ?TEMP
		      end,
	    mnesia:delete_object(ActivityId, Opaque, TempTab, 
				 {Key, Rec}, LockKind)
    end.


read(ActivityId, Opaque, Tab, Key, LockKind) ->
    case table_conversion_type(Tab) of
	none ->
	    mnesia:read(ActivityId, Opaque, Tab, Key, LockKind);
	{Version, FwF, BwF} ->
	    case mnesia:table_info(Tab, type) of
		bag ->
		    case mnesia:read(ActivityId, Opaque, ?TEMP_BAG, 
				     {Tab, Key}, LockKind) of
			[] ->
			    mnesia:read(ActivityId, Opaque, Tab, 
					Key, LockKind);
			Objs ->
			    %% see the implementation of write() -- we know
			    %% that the ?TEMP_BAG table holds all objects 
			    %% with Key.
			    case Version of
				new ->
				    [O || #?TEMP_BAG{value = O} <- Objs];
				old ->
				    [BwF(O) || #?TEMP_BAG{value = O} <- Objs]
			    end
		    end;
		_ ->
		    case mnesia:read(ActivityId, Opaque, ?TEMP, 
				     {Tab, Key}, LockKind) of
			[] ->
			    case {mnesia:read(ActivityId, Opaque, Tab, 
					      Key, LockKind), Version} of
				{[], _} ->
				    [];
				{[Obj], new} ->
				    [FwF(Obj)];
				{[Obj], old} ->
				    [Obj]
			    end;
			[#?TEMP{value = Obj}] ->
			    case Version of
				new ->
				    [Obj];
				old ->
				    [BwF(Obj)]
			    end
		    end
	    end
    end.



match_object(ActivityId, Opaque, Tab, Pattern, LockKind) ->
    case table_conversion_type(Tab) of
	none ->
	    mnesia:match_object(ActivityId, Opaque, Tab, Pattern, LockKind);
	{old, _, _} ->
	    mnesia:match_object(ActivityId, Opaque, Tab, Pattern, LockKind);
	{new, FwF, BwF} ->
	    match_object1(ActivityId, Opaque, Tab, Pattern, LockKind)
    end.

match_object1(ActivityId, Opaque, Tab, Pattern, LockKind) ->
    case is_var(Pattern) of
	true ->
	    %% must search whole table
	    search_whole_tab(ActivityId, Opaque, Tab, Pattern, LockKind);
	false ->
	    KeyPos = mnesia:table_info(Tab, keypos),
	    if size(Pattern) >= KeyPos ->
		    Key = element(KeyPos, Pattern),
		    case is_ground(Key) of
			true ->
			    Objs = read(ActivityId, Opaque, 
					Tab, Key, LockKind),
			    match_objs(Objs, Pattern);
			false ->
			    %% must search whole table
			    search_whole_tab(ActivityId, Opaque, 
					     Tab, Pattern, LockKind)
		    end;
	       true ->
		    []
	    end
    end.


match_objs([], _) ->
    [];
match_objs(Objs, Pattern) ->
    Tab = ets:new(match_tab, [set]),
    MatchPat = {x, Pattern},
    match_objs(Objs, Tab, MatchPat).

match_objs([], Tab, Pattern) ->
    [];
match_objs([O|Objs], Tab, Pattern) ->
    %% It's not trivial to write an Erlang version of the ETS pattern match,
    %% so I cheat and do this instead.
    ets:insert(Tab, {x, O}),
    case ets:match_object(Tab, Pattern) of
	[] ->
	    match_objs(Objs, Tab, Pattern);
	[_] ->
	    [O|match_objs(Objs, Tab, Pattern)]
    end.

search_whole_tab(ActivityId, Opaque, Tab, Pattern, LockKind) ->
    ETS = ets:new(match_tab, [set]),
    MatchPat = {x, Pattern},
    Keys = mnesia:all_keys(ActivityId, Opaque, Tab, 
			   ETS, Pattern, MatchPat, LockKind),
    search_whole_tab(Keys, ActivityId, Opaque, Tab, 
		     ETS, Pattern, MatchPat, LockKind).

search_whole_tab([K|Keys], ActivityId, Opaque, Tab, 
		 ETS, Pattern, MatchPat, LockKind) ->
    Objs = read(ActivityId, Opaque, Tab, K, LockKind),
    MatchedObjs = match_objs(Objs, ETS, MatchPat),
    MatchedObjs ++ search_whole_tab(Keys, ActivityId, Opaque, Tab,
				    ETS, Pattern, MatchPat, LockKind);
search_whole_tab([], ActivityId, Opaque, Tab, 
		 ETS, Pattern, MatchPat, LockKind) ->
    [].

		      
%% is_var(Atom) -> bool().

is_var(P) when atom(P) ->
    case atom_to_list(P) of
        [$_] -> true;
        [$$|Cs] -> digits(Cs);
        Other -> false
    end;
is_var(P) -> false.

digits([C|Cs]) when integer(C), C >= $0, C =< $9 -> digits(Cs);
digits([C|Cs]) -> false;
digits([]) -> true.
%% is_ground(Term) -> bool().


is_ground([H|T]) ->
    case is_ground(H) of
        true -> is_ground(T);
        false -> false
    end;
is_ground([]) -> true;
is_ground(P) when tuple(P) ->
    is_ground_tuple(P, 1, size(P));
is_ground(P) -> not is_var(P).

is_ground_tuple(P, I, Size) when I > Size -> true;
is_ground_tuple(P, I, Size) ->
    case is_ground(element(I, P)) of
        true -> is_ground_tuple(P, I+1, Size);
        false -> false
    end.


%%% ================== The following callback functions have not been
%%% ================== dealt with yet.

all_keys(ActivityId, Opaque, Tab, LockKind) ->
    mnesia:all_keys(ActivityId, Opaque, Tab, LockKind).


index_match_object(ActivityId, Opaque, Tab, Pattern, Attr, LockKind) ->
    mnesia:index_match_object(ActivityId, Opaque, Tab, 
			      Pattern, Attr, LockKind).


index_read(ActivityId, Opaque, Tab, SecondaryKey, Attr, LockKind) ->
    mnesia:index_read(ActivityId, Opaque, Tab, SecondaryKey, Attr, LockKind).


table_info(ActivityId, Opaque, Tab, InfoItem) ->
    mnesia:table_info(ActivityId, Opaque, Tab, InfoItem).




%%% #---------------------------------------------------------
%%% #3.2   CODE FOR EXPORTED INTERNAL FUNCTIONS
%%% #---------------------------------------------------------


%%% #---------------------------------------------------------
%%% #3.3   CODE FOR INTERNAL FUNCTIONS
%%% #---------------------------------------------------------

temp_key(Tab, Obj) ->
    {Tab, key_of(Tab, Obj)}.

key_of(Tab, Obj) ->
    Pos = mnesia:table_info(Tab, keypos),
    element(Pos, Obj).

table_conversion_type(Tab) ->
    case ets:lookup(?GLOBALS, {conversion, Tab}) of
	[] ->
	    none;
	[#?GLOBALS{value = {FwF, BwF, NewAttrs, NewRecName}}] ->
	    Version = case ets:lookup(?LOCAL, {version, Tab}) of
			  [] ->
			      %% we're still using the old
			      old;
			  [#?LOCAL{value = new}] ->
			      new
		      end,
	    {Version, FwF, BwF}
    end.

copy_new_objs(Tab) ->
    case mnesia:table_info(Tab, type) of
	bag ->
	    Pat = #?TEMP_BAG{key = {Tab, '_'}, value = '_'},
	    NewObjs = ets:match_object(?TEMP_BAG, Pat),
	    Buf = ets:new(buf, [set]),
	    KeyPos = mnesia:table_info(Tab, keypos),
	    copy_bag_objs(NewObjs, Tab, KeyPos, Buf),
	    ets:delete(Buf);
	_ ->
	    %% ordered_set or set
	    Pat = #?TEMP{key = {Tab, '_'}, value = '_'},
	    NewObjs = ets:match_object(?TEMP, Pat),
	    lists:foreach(
	      fun(#?TEMP{key = TempKey, value = Obj}) ->
		      mnesia:delete({?TEMP, TempKey}),
		      mnesia:write(Obj)
	      end, NewObjs)
    end.

copy_bag_objs([#?TEMP_BAG{key=TempKey, value=Obj}|Objs], Tab, KeyPos, Buf) ->
    Key = element(KeyPos, Obj),
    case ets:lookup(Buf, Key) of
	[] ->
	    ets:insert(Buf, {Key, 1}),
	    mnesia:delete({Tab, Key}),
	    mnesia:delete({?TEMP_BAG, TempKey}),
	    mnesia:write(Obj);
	_ ->
	    mnesia:write(Obj)
    end,
    copy_bag_objs(Objs, Tab, KeyPos, Buf);
copy_bag_objs([], Tab, KeyPos, Buf) ->
    ok.

create_table(Tab, Opts) ->
    case mnesia:create_table(Tab, Opts) of
	{atomic, ok} ->
	    ok;
	Other ->
	    exit(Other)
    end.

is_upgrade() ->
    case ets:lookup(?GLOBALS, is_upgrade) of
	[] ->
	    false;
	[#?GLOBALS{value = wait}] ->
	    receive 
	    after 100 -> 
		    is_upgrade() 
	    end;
	[#?GLOBALS{value = true}] ->
	    true
    end.

%%%----------------------------------------------------------------------
%%% #3.3.1   Code for Additional Actions (post-transaction triggers).
%%%----------------------------------------------------------------------

%%% #4     CODE FOR TEMPORARY CORRECTIONS
%%% #---------------------------------------------------------




More information about the erlang-questions mailing list