Leaked source code of windows server 2003
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

286 lines
7.6 KiB

  1. # ======================================================================
  2. #
  3. # Copyright (C) 2000-2001 Paul Kulchenko ([email protected])
  4. # SOAP::Lite is free software; you can redistribute it
  5. # and/or modify it under the same terms as Perl itself.
  6. #
  7. # $Id: SOAP::Transport::MQ.pm,v 0.51 2001/07/18 15:15:14 $
  8. #
  9. # ======================================================================
  10. package SOAP::Transport::MQ;
  11. use strict;
  12. use vars qw($VERSION);
  13. $VERSION = '0.51';
  14. use MQClient::MQSeries;
  15. use MQSeries::QueueManager;
  16. use MQSeries::Queue;
  17. use MQSeries::Message;
  18. use URI;
  19. use URI::Escape;
  20. use SOAP::Lite;
  21. # ======================================================================
  22. package URI::mq; # ok, lets do 'mq://' scheme
  23. require URI::_server; require URI::_userpass;
  24. @URI::mq::ISA=qw(URI::_server URI::_userpass);
  25. # mq://user@host:port?Channel=A;QueueManager=B;RequestQueue=C;ReplyQueue=D
  26. # ^^ ^^^^ ^^^^ ^^^^ ^^^^^^^^^ ^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^ ^^^^^^^^^^^^
  27. # ======================================================================
  28. package SOAP::Transport::MQ::Client;
  29. use vars qw(@ISA);
  30. @ISA = qw(SOAP::Client);
  31. use MQSeries qw(:constants);
  32. sub DESTROY { SOAP::Trace::objects('()') }
  33. sub new {
  34. my $self = shift;
  35. unless (ref $self) {
  36. my $class = ref($self) || $self;
  37. my(@params, @methods);
  38. while (@_) { $class->can($_[0]) ? push(@methods, shift() => shift) : push(@params, shift) }
  39. $self = bless {@params} => $class;
  40. while (@methods) { my($method, $params) = splice(@methods,0,2);
  41. $self->$method(ref $params eq 'ARRAY' ? @$params : $params)
  42. }
  43. SOAP::Trace::objects('()');
  44. }
  45. return $self;
  46. }
  47. sub BEGIN {
  48. no strict 'refs';
  49. for my $method (qw(requestqueue replyqueue)) {
  50. my $field = '_' . $method;
  51. *$method = sub {
  52. my $self = shift->new;
  53. @_ ? ($self->{$field} = shift, return $self) : return $self->{$field};
  54. }
  55. }
  56. }
  57. sub endpoint {
  58. my $self = shift;
  59. return $self->SUPER::endpoint unless @_;
  60. my $endpoint = shift;
  61. # nothing to do if new endpoint is the same as the current one
  62. return $self if $self->SUPER::endpoint eq $endpoint;
  63. my $uri = URI->new($endpoint);
  64. my %parameters = (%$self, map {URI::Escape::uri_unescape($_)} map {split/=/,$_,2} split /[&;]/, $uri->query || '');
  65. $ENV{MQSERVER} = sprintf "%s/TCP/%s(%s)", $parameters{Channel}, $uri->host, $uri->port
  66. if $uri->host;
  67. my $qmgr = MQSeries::QueueManager->new(QueueManager => $parameters{QueueManager}) ||
  68. die "Unable to connect to queue manager $parameters{QueueManager}\n";
  69. $self->requestqueue(MQSeries::Queue->new (
  70. QueueManager => $qmgr,
  71. Queue => $parameters{RequestQueue},
  72. Mode => 'output',
  73. ) || die "Unable to open $parameters{RequestQueue}\n");
  74. $self->replyqueue(MQSeries::Queue->new (
  75. QueueManager => $qmgr,
  76. Queue => $parameters{ReplyQueue},
  77. Mode => 'input',
  78. ) || die "Unable to open $parameters{ReplyQueue}\n");
  79. $self->SUPER::endpoint($endpoint);
  80. }
  81. sub send_receive {
  82. my($self, %parameters) = @_;
  83. my($envelope, $endpoint) =
  84. @parameters{qw(envelope endpoint)};
  85. $self->endpoint($endpoint ||= $self->endpoint);
  86. %parameters = (%$self, %parameters);
  87. my $expiry = $parameters{Expiry} || 60000;
  88. SOAP::Trace::debug($envelope);
  89. my $request = MQSeries::Message->new (
  90. MsgDesc => {Format => MQFMT_STRING, Expiry => $expiry},
  91. Data => $envelope,
  92. );
  93. $self->requestqueue->Put(Message => $request) ||
  94. die "Unable to put message to queue\n";
  95. my $reply = MQSeries::Message->new (
  96. MsgDesc => {CorrelId => $request->MsgDesc('MsgId')},
  97. );
  98. my $result = $self->replyqueue->Get (
  99. Message => $reply,
  100. Wait => $expiry,
  101. );
  102. my $msg = $reply->Data if $result > 0;
  103. SOAP::Trace::debug($msg);
  104. my $code = $result > 0 ? undef :
  105. $result < 0 ? 'Timeout' : 'Error occured while waiting for response';
  106. $self->code($code);
  107. $self->message($code);
  108. $self->is_success(!defined $code || $code eq '');
  109. $self->status($code);
  110. return $msg;
  111. }
  112. # ======================================================================
  113. package SOAP::Transport::MQ::Server;
  114. use Carp ();
  115. use vars qw(@ISA $AUTOLOAD);
  116. @ISA = qw(SOAP::Server);
  117. use MQSeries qw(:constants);
  118. sub new {
  119. my $self = shift;
  120. unless (ref $self) {
  121. my $class = ref($self) || $self;
  122. my $uri = URI->new(shift);
  123. $self = $class->SUPER::new(@_);
  124. my %parameters = (%$self, map {URI::Escape::uri_unescape($_)} map {split/=/,$_,2} split /[&;]/, $uri->query || '');
  125. $ENV{MQSERVER} = sprintf "%s/TCP/%s(%s)", $parameters{Channel}, $uri->host, $uri->port
  126. if $uri->host;
  127. my $qmgr = MQSeries::QueueManager->new(QueueManager => $parameters{QueueManager}) ||
  128. Carp::croak "Unable to connect to queue manager $parameters{QueueManager}";
  129. $self->requestqueue(MQSeries::Queue->new (
  130. QueueManager => $qmgr,
  131. Queue => $parameters{RequestQueue},
  132. Mode => 'input',
  133. ) || Carp::croak "Unable to open $parameters{RequestQueue}");
  134. $self->replyqueue(MQSeries::Queue->new (
  135. QueueManager => $qmgr,
  136. Queue => $parameters{ReplyQueue},
  137. Mode => 'output',
  138. ) || Carp::croak "Unable to open $parameters{ReplyQueue}");
  139. }
  140. return $self;
  141. }
  142. sub BEGIN {
  143. no strict 'refs';
  144. for my $method (qw(requestqueue replyqueue)) {
  145. my $field = '_' . $method;
  146. *$method = sub {
  147. my $self = shift->new;
  148. @_ ? ($self->{$field} = shift, return $self) : return $self->{$field};
  149. }
  150. }
  151. }
  152. sub handle {
  153. my $self = shift->new;
  154. my $msg = 0;
  155. while (1) {
  156. my $request = MQSeries::Message->new;
  157. # nonblock waiting
  158. $self->requestqueue->Get (
  159. Message => $request,
  160. ) || die "Error occured while waiting for requests\n";
  161. return $msg if $self->requestqueue->Reason == MQRC_NO_MSG_AVAILABLE;
  162. my $reply = MQSeries::Message->new (
  163. MsgDesc => {
  164. CorrelId => $request->MsgDesc('MsgId'),
  165. Expiry => $request->MsgDesc('Expiry'),
  166. },
  167. Data => $self->SUPER::handle($request->Data),
  168. );
  169. $self->replyqueue->Put (
  170. Message => $reply,
  171. ) || die "Unable to put reply message\n";
  172. $msg++;
  173. }
  174. }
  175. # ======================================================================
  176. 1;
  177. __END__
  178. =head1 NAME
  179. SOAP::Transport::MQ - Server/Client side MQ support for SOAP::Lite
  180. =head1 SYNOPSIS
  181. =over 4
  182. =item Client
  183. use SOAP::Lite
  184. uri => 'http://my.own.site.com/My/Examples',
  185. proxy => 'mq://server:port?Channel=CHAN1;QueueManager=QM_SOAP;RequestQueue=SOAPREQ1;ReplyQueue=SOAPRESP1',
  186. ;
  187. print getStateName(1);
  188. =item Server
  189. use SOAP::Transport::MQ;
  190. my $server = SOAP::Transport::MQ::Server
  191. ->new('mq://server:port?Channel=CHAN1;QueueManager=QM_SOAP;RequestQueue=SOAPREQ1;ReplyQueue=SOAPRESP1')
  192. # specify list of objects-by-reference here
  193. -> objects_by_reference(qw(My::PersistentIterator My::SessionIterator My::Chat))
  194. # specify path to My/Examples.pm here
  195. -> dispatch_to('/Your/Path/To/Deployed/Modules', 'Module::Name', 'Module::method')
  196. ;
  197. print "Contact to SOAP server\n";
  198. do { $server->handle } while sleep 1;
  199. =back
  200. =head1 DESCRIPTION
  201. =head1 COPYRIGHT
  202. Copyright (C) 2000-2001 Paul Kulchenko. All rights reserved.
  203. This library is free software; you can redistribute it and/or modify
  204. it under the same terms as Perl itself.
  205. =head1 AUTHOR
  206. Paul Kulchenko (paulclinger@yahoo.com)
  207. =cut