Многопоточный доступ к базам данных

© 2002 Галимарзанов Фанис

По этой теме очень мало информации, особенно в части, касающейся доступа к SQL-серверам (например IB). Мне пришлось несколько дней активно заниматься всем этим – не нашел достойной замены для VirtualTree и решил заполнять дерево с помощью потока. Отмечу, что мои ранние попытки использовать потоки для обращения к Interbase не увенчались успехом, да и дискуссии по теме на форуме epsylon.public.interbase не особо вдохновляли. Жизнь заставила пересмотреть подходы к проблеме и вот что получилось.

Как известно, запрос в потоке должен выполняться в отдельном контексте, т.е. поток должен иметь как минимум IBDataset, IBTransaction и IBSQL. Можно использовать IBDataBase.InternalTransaction, но лучше таки создать в потоке отдельный IBTransaction.

Рассмотрим процесс создания потока, способного обеспечить требуемые условия – создадим базовый класс и затем породим от него поток, способный, например, подсчитать количество записей в запросе.

unit basedbtr;
interface
  uses Windows,Classes, Messages,IBDatabase,IBSQL;
const
{Идентификаторы сообщений, используемых для передачи информации форме, создавшей поток}
  WM_THREAD_TEXT=WM_USER+1000;
  WM_THREAD_INTEGER=WM_THREAD_TEXT+1;
type
  TBaseDB_Thread = class(TThread)
    private
      {Локальные поля, хранящие информацию об окне владельца потока}
      fCurForm:HWND;
      {SQL текст}
      fNewSqlText:string;
      {Флаг нового запроса}
      fSetNewQuery:boolean;
      {Далее три компоненты, обеспечивающие наличие собственного контекста потока}
      Base:TIBdatabase;
      trWrite:TIbTransaction;
      ThreadSql:TIBSQL;
      {Процедуры для передачи информации владельцу потока}
      procedure Post_Text(MsgData:string);
      procedure Post_Integer(MsgData:integer);
    protected
      {Метод Execute пока ничего, кроме как перехода в состояние Suspend, не умеет}
      procedure Execute; override;
    public
      property CurForm:HWND read fCurForm write fCurForm;
      property NewSqlText:string read fNewSqlText write fNewSqlText;
      property SetNewQuery:boolean read fSetNewQuery write fSetNewQuery;
    constructor Create(OwnerHWND:HWND; //Дескриптор окна-родителя потока         aBaseName:String; //DatabaseName
        aBaseParams:TStrings; //Информация для IBDatabase.Params
        TransParams:TStrings); //Информация для IBTransactin.Params
    destructor Destroy; override;
  end;

{Породим от базового потока новый поток с требуемой функциональностью, которая реализуется в методе Execute}
TGetRecordCount_Thread = class(TBaseDB_Thread)
  protected
    procedure Execute; override;
end;

{Глобальная переменная, хранящая текст для передачи окну-владельцу}
  var MsgText:string;
implementation

constructor TBaseDB_Thread.Create;
begin
  fCurForm:=OwnerHWND;
  NewSqlText:='';
  FreeOnTerminate :=False;
  base:=TIBdatabase.Create(nil);
  Base.DatabaseName:=aBaseName;
  Base.LoginPrompt:=false;
  Base.Params.Assign(aBaseParams);
  trWrite:=TIBTransaction.Create(nil);
  trWrite.DefaultDatabase:=Base;
  trWrite.Params.Assign(TransParams);
  ThreadSql:=TIBSQL.Create(nil);
  ThreadSql.SQL.Text:='';
  ThreadSql.Transaction:=trWrite;
  Base.Connected:=true;
  inherited Create(true); {true - переведем поток в состояние Suspended}
end;

destructor TBaseDB_Thread.Destroy;
begin
  {Остановим поток}
  if not Suspended then Suspend;
  fNewSqlText:='';
  fSetNewQuery:=false;
  Terminate;
  Resume;
  WaitFor;
  ThreadSql.Free;
  trWrite.Free;
  Base.Free;
  inherited Destroy;
end;

{Ниже процедуры для передачи информации владельцу}
procedure TBaseDB_Thread.Post_Text(MsgData:string);
begin
  MsgText:=msgData;
  PostMessage(CurForm,WM_THREAD_TEXT,0,Integer(PChar(MsgText)));
end;

procedure TBaseDB_Thread.Post_Integer(MsgData:integer);
begin
  PostMessage(CurForm,WM_THREAD_INTEGER,0,MsgData);
end;

{"Пустой" метод базового класса}
procedure TBaseDB_Thread.Execute;
begin
  while not Terminated do
    suspend;
end;

{Здесь Execute наследника}
procedure TGetRecordCount_Thread.Execute;
  begin
    while not Terminated do begin
      {Если установлен флаг нового запроса}
      if fSetNewQuery then
        begin
          {Сбросим флаг}
          fSetNewQuery:=false;
          {Сообщим владельцу о начале работы потока }
          Post_text('Resume');
          try
            if not trWrite.InTransaction then trWrite.StartTransaction;
            ThreadSql.Sql.Text:=NewSqlText;
            ThreadSql.ExecQuery;
            {Сообщим владельцу результат}
            Post_Integer(ThreadSql.Fields[0].AsInteger);
            trWrite.Commit;
            ThreadSql.Close;
          except
            raise;
            Post_Text('Error');
          end;
        end;
      {Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
      if not Terminated and not SetNewQuery then begin
        {Вначале сообщим владельцу}
        Post_Text('Suspend');
        suspend;
      end;
    end;
  end;
end.

В прилагаемом к статье проекте на Delphi (10.4K) реализованы дополнительные функции для обеспечения нормальной работы потока. Рассмотрим их.

procedure TForm1.IBQuery1AfterOpen(DataSet: TDataSet);
begin
  if Trs.Suspended
    then SetTrsData
    else Timer1.Enabled:=true;
end;

Событие должно вызвать метод потока Resume для выполнения запроса о количестве записей. Но мы не знаем, стоит ли это делать, может прежний запрос касается миллионов записей и выражение WHERE столь сложен, чтоб его выполнить за секунды. Проверим состояние потока – если он Suspended, то можно сходу инициализировать поток новыми параметрами и запустить его, иначе – воспользуемся таймером Timer1.

procedure TForm1.Timer1Timer(Sender: TObject);
begin
  if Trs.Suspended then begin
    Timer1.Enabled:=false;
    SetTrsData;
  end;
end;

Здесь процедура рестарта потока

procedure TForm1.SetTrsData;
begin
  Trs.NewSqlText:=Memo2.Lines.Text;
  Trs.SetNewQuery:=true;
  Trs.Resume;
end;

Перед тем, как переоткрыть запрос для Grid, остановим таймер – вдруг он сработает и нарушит порядок

procedure TForm1.IBQuery1BeforeOpen(DataSet: TDataSet);
begin
  Timer1.Enabled:=false;
end;

Вот и все. Данный пример показывает, как создавать поток для использования его в качестве дополнительного инструмента как для мелких «поручений» типа рассмотренного выше, так и для исполнения тяжелых процедур, связанных с изменениями (INSERT or UPDATE), способных ввести основной поток приложения в ступор.

Предположим, что необходимо заполнять TREE данными из запроса.

TMySql_Thread = class(TBaseDB_Thread)
  Private
    {Это флаг актуальности данных}
    FActual:boolean;
    {Набор параметров для IBSQL}
    FParam1:integer;
    FParam2:integer;
  protected
    procedure Execute; override;
  public
    property Actual:boolean read fActual write fActual;
    property Param1:integer read fParam1 write fParam1;
    property Param2:integer read fParam2 write fParam2;
  end;

procedure TMySql_Thread.Execute;
begin
  while not Terminated do begin
    {Если установлен флаг нового запроса}
    if fSetNewQuery then
      begin
      {Сбросим флаг}
      fSetNewQuery:=false;
      fActual:=true;
      {Сообщим владельцу о начале работы потока }
      Post_text('Resume');
      try
        if not trWrite.InTransaction then trWrite.StartTransaction;
        ThreadSql.Sql.Text:=NewSqlText;
        ThreadSql.Params[0].asInteger:=fParam1;
        ThreadSql.Params[1].asInteger:=fParam2;
        ThreadSql.ExecQuery;
        {Здесь цикл вставки данных в Tree, причем анализируются Terminated и флаг актуальности потока}
        While not Terminated and fActual and not ThreadSql.Eof do
          Begin
            {Вызываем процедуру вставки}
            Synchronize(AddToList);
            ThreadSql.Next;
          End;
        trWrite.Commit;
        ThreadSql.Close;
      except
        raise;
        Post_Text('Error');
      end;
      end;
    {Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
    if not Terminated and not SetNewQuery then begin
      {Вначале сообщим владельцу}
      Post_Text('Suspend');
      suspend;
    end;
  end;
end;

В прилагаемом к статье проекте на Delphi (10.4K) реализованы все изложенные идеи. Надеюсь, что все это поможет использовать потоки в полной мере.

С уважением Галимарзанов Фанис (Fanis).

Будут вопросы - меня всегда найдете на epsylon.public.interbase

P.S. Автор – профессиональный судоводитель, бывший капитан, посему прошу особо не пинать за упрощенное изложение.

Copyright© 2002 Галимарзанов Фанис  Специально для Delphi Plus



Опубликовал admin
26 Ноя, Среда 2003г.



Программирование для чайников.