天天看點

PG存儲函數一則(1)——拆分json字段

PG的json字段是很好用的,但是,上司說是弄資料倉庫還是什麼的,非要把json字段拆分成15張表,腦殘的需求昂,但是還是要做啊。貼存儲過程的示例主要是展示存儲過程的寫法,邏輯性還是比較強的,存儲函數,啊存儲函數存儲函數,學MySQL比較容易稱呼成存儲過程,PG裡面是存儲函數哈,存儲函數不像java等開發語言那樣有很多可以調用的函數,很多情況下要手動調整。大家借鑒啦。

原始表:org_raw_gs,org_raw_gz,org_raw_heb,org_raw_hen這樣的表,org_raw_省份的字尾,表結構全部是一樣的:

PG存儲函數一則(1)——拆分json字段

我們看一條記錄:

PG存儲函數一則(1)——拆分json字段

主要是body字段,body是一個jsonb字段:

{“基本資訊”: {“”: “”, “順序”: [“企業名稱”, “營業執照注冊号”, “注冊位址”, “工商登記機關”, “歸類行業”, “注冊資本(萬元)”, “市場主體類型”, “經營範圍”, “一般經營範圍”, “成立日期”, “營業期限起始日期”, “營業期限到期日期”, “注冊位址聯系電話”, “法定代表人(負責人)姓名”, “企業類型”, “注冊資金币種”, “核準日期”, “登記狀态”, “經營狀态”, “”], “企業名稱”: “泰安路迎豐糧油店”, “企業類型”: “個體工商戶”, “歸類行業”: “”, “成立日期”: “2009-03-09 00:00:00”, “核準日期”: “2014-01-07 00:00:00”, “注冊位址”: “金川區泰安路”, “登記狀态”: “吊銷”, “經營狀态”: “”, “經營範圍”: “米、面粉、植物油零售。”, “一般經營範圍”: “”, “工商登記機關”: “金昌市工商行政管理局金川分局”, “市場主體類型”: “”, “注冊資金币種”: “萬人民币”, “營業執照注冊号”: “620300600029787”, “注冊位址聯系電話”: “”, “注冊資本(萬元)”: “.5”, “營業期限到期日期”: “2013-03-08 00:00:00”, “營業期限起始日期”: “2009-03-09 00:00:00”, “法定代表人(負責人)姓名”: “王治秀”}}

把這個長字段貼到json.cn中可以看出他是一個兩層的json結構,其中有的值是數組,現在我們就是做循環,把鍵值對拆成字段和值:

---------------------
--建立存儲過程---------
---------------------
-- Function: public.split_json(character varying)

-- DROP FUNCTION public.split_json(character varying);

CREATE OR REPLACE FUNCTION split_json(tb_name varchar) RETURNS int AS
$$

DECLARE 
xiangqing RECORD;
keyvalue_1 RECORD;
ifcolumn_exist int;
to_tb_name varchar();
  text_var1 text;
  text_var2 text;
  text_var3 text;
BEGIN
    to_tb_name:=concat('to_',tb_name);
    EXECUTE 'drop table if exists '||to_tb_name;
    EXECUTE 'create table '||to_tb_name||'(id varchar(100),org_id varchar(50),name varchar(80),time varchar(80),primary key(id))';
    <<loop_to_table>>
    FOR xiangqing IN EXECUTE 'SELECT id,org_id,name,body,time FROM '||tb_name::regclass||' where body is not null' LOOP
       IF xiangqing.body ::varchar = '' THEN
           RAISE NOTICE 'body is 空串';
       ELSE 
         EXECUTE 'insert into '
                                ||to_tb_name::regclass
                                ||'(id,org_id,name,time) values ($1,$2,$3,$4)'
                                USING xiangqing.id,xiangqing.org_id,xiangqing.name,xiangqing.time;
         <<loop_outer_key>>
         FOR keyvalue_1 IN (select key,value::varchar from jsonb_each(xiangqing.body::jsonb)) LOOP  
           RAISE NOTICE '外層鍵--> %',keyvalue_1.key::varchar;
             IF trim(keyvalue_1.key::varchar)<>'' THEN    
                         --檢視字段存在否,如果不存在,alter table添加字段
                         select count(*) INTO ifcolumn_exist from information_schema.columns where table_name = to_tb_name and column_name=keyvalue_1.key::varchar;
                         IF (ifcolumn_exist=) THEN
                                 --字段不存在就修改表結構
                                    EXECUTE 'alter table ' 
                                                    || to_tb_name::regclass
                                                    ||' add column "'
                                                    || keyvalue_1.key
                                                    || '" text';
             END IF;
             --然後更新字段值
                         EXECUTE 'update '
                                        || to_tb_name::regclass
                                        ||' set "'
                                        || keyvalue_1.key
                                        || '"=$1 where id=$2'
                                        USING keyvalue_1.value,xiangqing.id;

                         --嘗試去分隔var_value,有異常則抛出,能分隔則繼續往下處理
             --建立一個子塊
                         DECLARE keyvalue_2 RECORD;
                         BEGIN
                 <<sub_block>>
                 FOR keyvalue_2 IN (select key,value from jsonb_each(keyvalue_1.value::jsonb)) LOOP
                    RAISE NOTICE '内層鍵--> %',keyvalue_2.key::varchar;
                    IF trim(keyvalue_2.key::varchar) <> '' THEN
                    --檢視字段存在否,如果不存在,alter table添加字段
                                select count(*) INTO ifcolumn_exist from information_schema.columns where table_name = to_tb_name and column_name=keyvalue_2.key::varchar;
                                IF (ifcolumn_exist=) THEN
                                                 --字段不存在就修改表結構
                                                EXECUTE 'alter table ' 
                                                                || to_tb_name::regclass
                                                                ||' add column "'
                                                                || keyvalue_2.key
                                                                || '" varchar(200)';
                    END IF;
                                      EXECUTE 'update '
                                                        || to_tb_name::regclass
                                                        ||' set "'
                                                        || keyvalue_2.key
                                                        || '"=$1 where id = $2'
                                                        USING trim('"' from keyvalue_2.value::varchar),xiangqing.id;



                    END IF;--結束判斷keyvalue_2  empty

                 END LOOP sub_block;
             EXCEPTION
                 WHEN  OTHERS  THEN 
                 GET STACKED DIAGNOSTICS text_var1 = MESSAGE_TEXT,
                                         text_var2 = PG_EXCEPTION_DETAIL,
                                         text_var3 = PG_EXCEPTION_HINT;
                 RAISE NOTICE '% accurs exception\n %\n%\n%', keyvalue_1.key::varchar,
text_var1,text_var2,text_var3;
                 continue;             
             END;    
             END IF;--結束判斷鍵值是否empty
         --退出條件
         END LOOP loop_outer_key;  
     END IF; --結束判斷body是否為空串 
    --退出條件    
    END LOOP loop_to_table;

    return ;
END;


$$ LANGUAGE plpgsql ;

--調用存儲過程
select split_json('org_raw_gs');
--有一些non json object的報錯資訊忽略就好
           

結果得到很多很多的字段:

PG存儲函數一則(1)——拆分json字段

然後根據需要的資訊拆分成15張表,有這樣的一個需求就是有一個數組字段就是表的主體内容,比如說”工商公示資訊-變更資訊”就是一個數組字段,我們看一下:

[[“變更事項”,”變更前内容”,”變更後内容”,”變更日期”],[“負責人變更”,”付伯權”,”範石濤”,”2015年8月28日”],[“經營範圍變更”,”辦理人民币存款 貸款、結算業務;辦理票據貼現業務;代理發行、兌付政府債券;代理發行金融債券;代理收付款項;外彙存款;外彙貸款;外彙彙款;外币兌換,結彙、售彙;國際結算;外彙票據的承兌和貼現;資信調查、咨詢、見證業務;通過上級行辦理代客外彙買賣;外彙信用卡的發放,代理國外信用卡付款;個人黃金買賣業務;經中國銀行業監督管理委員會等監管部門準許的其他業務;安裝工程保險、機動車輛保險、家庭财産保險、建築工程保險、健康保險、企業财産保險、人壽保險、意外傷害保險(保險兼業代理許可證有效期至2013年12月16日) “,”辦理人民币存款 貸款、結算業務;辦理票據貼現業務;代理發行、兌付政府債券;代理發行金融債券;代理收付款項;代理保險業務(保險法律法規和行政規章制度許可範圍内的險種,保險兼業代理許可證有效期至2016年12月16日);外彙存款;外彙貸款;外彙彙款;外币兌換,結彙、售彙;國際結算;外彙票據的承兌和貼現;資信調查、咨詢、見證業務;通過上級行辦理代客外彙買賣;外彙信用卡的發放,代理國外信用卡付款;個人黃金買賣業務;經中國銀行業監督管理委員會等監管部門準許的其他業務。(依法須經準許的項目,經相關部門準許後方可開展經營活動)”,”2014年9月19日”]]

我們看到這個數組第一行是鍵,從第二行開始是對應的值,目标表結構如下:

create table gs_penalty_info (
auto_id serial,
id varchar()  ,   -- '唯一id' ,
corp_id varchar() ,   -- '注冊号' ,
corp_name varchar() ,   -- '名稱' ,
--no varchar() null,   -- '序号' ,
--case_no varchar() null,   -- '行政處罰決定書文号' ,
--case_type varchar() null,   -- '違法行為類型' ,
--case_content varchar() null,   -- '行政處罰内容' ,
--case_org varchar() null,   -- '作出行政處罰決定機關名稱' ,
--case_date timestamp null,   -- '作出行政處罰決定日期' ,
--case_detail varchar() null,   -- '詳情' ,
--create_date timestamp ,   -- '建立時間' ,
--create_by varchar() ,   -- '建立者' ,
--update_date timestamp ,   -- '更新時間' ,
--update_by varchar() ,   -- '更新者' ,
--source varchar() ,   -- '資料來源' ,
primary key (auto_id)
);
           

其中一些字段我們先不建立,在拆分的時候添加字段,比較通用的做法是這樣:

CREATE OR REPLACE FUNCTION split_array(tmp_tb_name varchar,target_tb varchar,colu_name varchar,begin_xh int, end_xh int) RETURNS int AS
$$

--------過程參數說明:
---------tmp_tb_name:中間表名稱,比如 to_org_raw_heb
---------target_tb:目标表,張表的某一張,
---------colu_name:數組字段的名稱,比如:"動産抵押登記資訊"這個字段
---------begin_xh,end_xh:在中間表中加了一個auto_incr的自增字段,可以小區間小區間的調用,防止資料量過大更新不成功

--------函數主體
DECLARE 
target RECORD;
one_dimen int;
two_dimen int;
i int;
j int;
split_zd varchar;
return_i int default 0;
ifcol_exist int default 0;
iftb_exist int default 0;
seq_name varchar default '';
province varchar(5);
  text_var1 text;
  text_var2 text;
  text_var3 text;
BEGIN
    province:=substring(tmp_tb_name from  for );
    --如果表不存在就建立,已存在就算了;
    EXECUTE 'select count(*) from pg_class where relname='
            ||quote_literal(target_tb)    
            INTO iftb_exist;
    IF iftb_exist=0 THEN
        EXECUTE 'create table '||target_tb||'(auto_id serial, id varchar(), corp_id varchar(),corp_name varchar(), province varchar(),primary key (auto_id))';    
    END IF;
<<record_loop>>
FOR target IN EXECUTE 'select id,org_id,name,'||quote_ident(colu_name) || ' as gd_info,auto_incr  from '
               || tmp_tb_name::regclass
               || ' where  auto_incr BETWEEN '
               || begin_xh ||' and '||end_xh ||' and org_id is not null and org_id<>'
               || quote_literal('')||' and '
               ||quote_ident(colu_name)
               ||' is not null'
LOOP
    RAISE NOTICE '第%行:',target.auto_incr::varchar;
    split_zd=replace(replace(target.gd_info,'[','{'),']','}');  
     one_dimen=array_length((replace(replace(split_zd,'[','{'),']','}'))::text[][],1);
     two_dimen=array_length((replace(replace(split_zd,'[','{'),']','}'))::text[][],2);
     <<addco>> FOR j IN 1..two_dimen LOOP
           EXECUTE 'select count(*)  from information_schema.columns where table_name = $ and column_name = $'
                   INTO ifcol_exist
                   USING target_tb,(split_zd::text[][])[1][j];
           IF (ifcol_exist=0) THEN
                                     --字段不存在就修改表結構
                                EXECUTE 'alter table '
                        ||target_tb::regclass
                        ||' add column "' 
                                                || (split_zd::text[][])[1][j]
                                                || '" text';              
                        END IF;
            EXIT addco WHEN j=two_dimen;
            END LOOP addco;
      seq_name=concat(target_tb,'_auto_id_seq');
      <<data_in>>  For i in 2..one_dimen LOOP
                EXECUTE 'insert into '
                    ||target_tb::regclass
                    ||'(ID,CORP_ID,CORP_NAME,province,"' 
                    || (split_zd::text[][])[1][1]
                                        || '") values ($,$,$,$,$) '
                    USING target.id,target.org_id,target.name,province,(split_zd::text[][])[i][1];
                <<data_up>>FOR j in 2..two_dimen LOOP
                                         EXECUTE 'update  '
                          ||target_tb::regclass
                          ||' set "' 
                                                    || (split_zd::text[][])[1][j]
                                                    || '"= ($) where auto_id=$ '
                          USING (split_zd::text[][])[i][j],currval(seq_name::regclass);  
                END LOOP data_up;
        END LOOP data_in;   
    return_i = currval(seq_name::regclass);            
END LOOP record_loop ; 
return return_i;
EXCEPTION WHEN OTHERS THEN
                          GET STACKED DIAGNOSTICS text_var1 = MESSAGE_TEXT,
                          text_var2 = PG_EXCEPTION_DETAIL,
                          text_var3 = PG_EXCEPTION_HINT;
                    RAISE NOTICE '%\n%\n%',text_var1,text_var2,text_var3;
END;
---------函數主體

$$
  LANGUAGE plpgsql;

           

調用示例: