# -*- coding: utf-8 -*- use feature ":5.10"; use strict; state $basea58 = [split // , '123456789abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ']; state $i; state $base58 = {map {$_ => $i++} @$basea58}; state $stack; package brickd; use IO::Socket::INET; use IO::Select; use Time::HiRes qw( sleep ); use threads; use threads::shared; use Thread::Queue; sub new { my $class = shift @_; my $ref = {}; if( ref($class) ) { if( $_[0]->isa('brick') ) { my ($class_name,$brick_uid,$para) = @_; $ref = $class_name->new($class,$brick_uid,$para); } else { die " $_[0] ist kein brick\n"; } } else { $ref = bless $ref,$class; ($ref->{Host},$ref->{Port}) = @_; $ref->{Stack} = 0; $ref->{IPCon} = IO::Socket::INET->new("$ref->{Host}:$ref->{Port}") || die "no connect to $ref->{Host}:$ref->{Port}\n"; $ref->{IPCon}->autoflush(1); $ref->{Queue} = Thread::Queue->new(); $ref->{Bricks} = undef; my %p :shared; $ref->{pending} = \%p; my %r :shared; $ref->{timing} = \%r; my %c :shared; $ref->{callback} = \%c; $ref->{Write} = threads->create(sub { print threads->tid()." startet\n"; while(1) { my ($stack,$buff,$func) = @{$ref->{Queue}->dequeue()}; my @out = IO::Select->new($ref->{IPCon})->can_write(); $ref->{IPCon}->syswrite($buff) if $buff; threads->yield(); } }); $ref->{Read} = threads->create(sub { print threads->tid()." startet\n"; my $sel = IO::Select->new(); $sel->add($ref->{IPCon}); while(1) { my @read = $sel->can_read($^O =~ /Win/i ? 0.001 : 2); if( $read[0] ) { my ($stack_id,$func_id,$p_len,$buff) = $ref->next_packet; warn "leer Read\n" if !$func_id; if ( exists $ref->{callback}->{"$stack_id $func_id"} ) { $ref->{callback}->{"$stack_id $func_id"}->enqueue([$stack_id,$func_id,$p_len,$buff]); } elsif( exists $ref->{pending}->{"$stack_id $func_id"} ) { my $stru = $ref->{pending}->{"$stack_id $func_id"}; $stru->{Queue}->enqueue([$stack_id,$func_id,$p_len,$buff]); delete $ref->{timing}->{$stru->{Time}}; delete $ref->{pending}->{"$stack_id $func_id"}; } } for my $tim ( keys %{$ref->{timing}} ) { if( (Time::HiRes::time - $tim ) > 4.0 ) { my $stru = $ref->{timing}->{$tim}; delete $ref->{pending}->{"$stru->{Stack} $stru->{Func}"} if exists $ref->{pending}->{"$stru->{Stack} $stru->{Func}"}; delete $ref->{timing}->{$stru->{Time}}; $stru->{Queue}->enqueue([]); } } threads->yield(); } }); $ref->{Write}->detach(); $ref->{Read}->detach(); } return $ref; } sub next_packet { my $class = shift @_; my ($stack_id,$func_id,$p_len,$buff); $class->{IPCon}->sysread($buff,4); ($stack_id,$func_id,$p_len ) = unpack("CCS",$buff); undef $buff; $class->{IPCon}->sysread($buff,$p_len - 4) if $p_len > 4; return ($stack_id,$func_id,$p_len,$buff); } package brick; use Math::Int64 qw(uint64 uint64_to_hex hex_to_uint64 uint64_to_string); use Time::HiRes qw( sleep time); use threads; use threads::shared; use Thread::Queue; my $typ = { bool => { len => 1, mask => 'C', }, char => { len => 1, mask => 'A', }, 'char[4]' => { len => 4, mask => 'A4', }, float => { len => 4, mask => 'f', }, int16 => { len => 2, mask => 's', }, 'int16[10]' => { len => 20, mask => 'H40', pack => num_to_syswrite('H40','s10'), unpack => num_from_sysread('s10','H40') }, int32 => { len => 4, mask => 'l', }, 'string[16]' => { len => 16, mask => 'A16', }, 'string[20]' => { len => 20, mask => 'A20', }, 'string[32]' => { len => 32, mask => 'A32', }, 'string[40]' => { len => 20, mask => 'A40', }, 'string[50]' => { len => 50, mask => 'A50', }, 'string[60]' => { len => 60, mask => 'A60', }, uint8 => { len => 1, mask => 'C', }, 'uint8[4]' => { len => 4, mask => 'H8', pack => num_to_syswrite('H8','C4'), unpack => num_from_sysread('C4','H8') }, 'uint8[6]' => { len => 6, mask => 'H12', pack => num_to_syswrite('H12','C6'), unpack => num_from_sysread('C6','H12') }, 'uint8[32]' => { len => 32, mask => 'H64', pack => num_to_syswrite('H64','C32'), unpack => num_from_sysread('C32','H64') }, uint16 => { len => 2, mask => 'S', }, 'uint16[2]' => { len => 4, mask => 'H8', pack => num_to_syswrite('H8','S2'), unpack => num_from_sysread('S2','H8') }, uint32 => { len => 4, mask => 'L', }, uint64 => { len => 8, mask => 'H16', pack => \&uid_to_syswrite, unpack => \&uid_from_sysread }, }; sub syswrite { my $class = shift @_; my( $buff, $func ) = @_; if( exists $func->{resp} ) { my %stru :shared; $stru{Time} = Time::HiRes::time; $stru{Queue} = $class->{Queue}; $stru{Stack} = $class->{Stack}; $stru{Func} = $func->{id}; $class->{brickd}->{pending}->{"$class->{Stack} $stru{Func}"} = \%stru; $class->{brickd}->{timing}->{$stru{Time}} = \%stru; } $class->{brickd}->{Queue}->enqueue([$class->{Stack},$buff,$func->{id}]); } sub meta { my $class = shift; my ( $funktion ) = @_; my $f_list = { get_stack_id => { id => 255, para => [qw/uint64/], resp => [qw/uint64 uint8 uint8 uint8 string[40] uint8/]}, enumerate => { id => 254,}, CALLBACK_ENUMERATE => { id => 253, resp => [qw/uint64 string[40] uint8 bool/]}, }; if( exists $f_list->{$funktion} ) { return $f_list->{$funktion}; } else { return; } } sub callback { my $class = shift; my ( $id ) = @_; my $cb_list = { 253 => 'CALLBACK_ENUMERATE', }; if( exists $cb_list->{$id}) { return $cb_list->{$id}; } else { return ; } } sub new { my $class = shift; my ($v1,$v2,$v3,$brick_uid,$class_name,$ref,$para ) ; if( ref($class) ) { if( $_[0]->isa('brick') ) { ($class_name,$brick_uid,$para) = @_; $ref = {}; $ref = bless $ref,$class_name; $ref->{Stack} = 0; $ref->{Host} = $class->{Host}; $ref->{Port} = $class->{Port}; $ref->{brickd} = $class->{brickd}; } else { die " $_[0] ist kein brick\n"; } } elsif (ref($_[0]) eq 'brickd') { $ref = {}; $ref = bless $ref,$class; $ref->{Stack} = 0; ($ref->{brickd}, $brick_uid,$para) = @_; $ref->{Host} = $ref->{brickd}->{IPCon}->peerhost; $ref->{Port} = $ref->{brickd}->{IPCon}->peerport; } else { $ref = {}; $ref = bless $ref,$class; $ref->{Stack} = 0; ($ref->{Host},$ref->{Port}, $brick_uid,$para) = @_; $ref->{brickd} = brickd->new($ref->{Host},$ref->{Port}) || die "no connect to $ref->{Host}:$ref->{Port}\n"; } $ref->{Queue} = Thread::Queue->new(); if( $brick_uid ) { ($ref->{UID},$v1,$v2,$v3,$ref->{Name},$ref->{Stack}) = $ref->get_stack_id($brick_uid); die "kann ".ref($ref)." mit $brick_uid nicht finden\n" if !$ref->{Stack}; $ref->{Version} = "$v1.$v2.$v3"; if( $ref->{UID} ne $brick_uid ) { die "konnte ".ref($ref)." mit ID $brick_uid nicht finden\n"; } push @{$stack->{$ref->{brickd}}->{$ref->{Stack}}}, $ref; } if( ref($para) eq 'HASH' ) { for my $func ( keys %$para ) { if( $ref->meta( $func ) ) { $ref->$func( ref($para->{$func}) eq 'ARRAY' ? @{$para->{$func}} : $para->{$func}); } else { $ref->{$func} = $para->{$func}; } } } elsif( ref($para) eq 'ARRAY' ) { while ( @$para ) { my $func = shift @$para; my $p = shift @$para; if( $ref->meta( $func ) ) { $ref->$func( ref($p) eq 'ARRAY' ? @{$p} : $p); } else { $ref->{$func} = $p; } } } if( $ref->{debug} ) { warn "$ref->{Name}\t$ref->{Version}\t$ref->{Stack}\n"; } return $ref; } sub decode58 { my @val = split // , shift; my $ret = uint64(0); my $base = uint64(1); for my $c ( reverse @val ) { $ret += $base58->{$c} * $base; $base *= 58; } return $ret; } sub encode58 { my $val = shift; my $ret =''; while ( $val ) { my $rest = $val; $rest = $rest / 58 ; $ret = $basea58->[$val % 58].$ret; $val = $rest; } return $ret; } sub uid_to_syswrite { return join "" , reverse split /(..)/, uint64_to_hex(decode58( shift )); } sub uid_from_sysread { return encode58(hex_to_uint64(join( '', reverse( split (/(..)/ , shift ))))); } sub num_to_syswrite { my ($un,$pa) = @_; return sub{my $data = shift @_; return unpack($un,pack($pa,@$data))}; } sub num_from_sysread { my ($un,$pa) = @_; return sub{ my $data; @$data = unpack($un,pack($pa,shift @_)); return $data; } } sub DESTROY { my $class = shift; if( $class->{IPCon} ) { delete $stack->{$class->{IPCon}}->{$class->{Stack}} if $class->{Stack}; if(!( keys %{$stack->{$class->{IPCon}}} )) { delete $stack->{$class->{IPCon}}; $class->{IPCon}->close if $class->{IPCon}; } } } sub AUTOLOAD { my $class = shift; my $name = $brick::AUTOLOAD; my ( $len,$buff,$func,$pack_mask, @pack, $unpack_mask, @unpack,$stack_id,$func_id,$p_len,@ret ) ; $name =~ s/.*://; # strip fully-qualified portion $func = $class->meta($name); if( $func ) { if( $class->callback($func->{id}) ) { my $ret = $class->{callback}->{$func->{id}}; $class->{callback}->{$func->{id}} = shift @_; return $ret; } $pack_mask = "CCS"; $len = 4; if( exists $func->{para} ) { for my $para ( @{$func->{para}} ) { if( exists $typ->{$para}->{pack} ) { push @pack, $typ->{$para}->{pack}(shift @_); } else { push @pack, shift @_; } $len += $typ->{$para}->{len}; $pack_mask .= $typ->{$para}->{mask}; } } unshift @pack, $len; unshift @pack, $func->{id}; unshift @pack, $class->{Stack}; $buff = pack($pack_mask,@pack); $class->syswrite($buff,$func); threads->yield(); if( exists $func->{resp}){ undef $len; ($stack_id,$func_id,$p_len,$buff ) = @{$class->{Queue}->dequeue()}; if (!$func_id){ warn "keine Antwort vom brickd $class->{Stack},$func->{id}\n"; return ; } for my $para ( @{$func->{resp}} ) { $unpack_mask .= $typ->{$para}->{mask}; $len += $typ->{$para}->{len}; } if( $len != ($p_len-4) ) { warn " $name erwartet $len hat aber ".($p_len-4)."\n" if exists $class->{debug}; } @unpack = unpack $unpack_mask, $buff; for my $para ( @{$func->{resp}} ) { if( exists $typ->{$para}->{unpack} ) { push @ret, $typ->{$para}->{unpack}(shift @unpack); } else { push @ret, shift @unpack; } } return wantarray ? @ret : shift @ret; } } else { warn "unbekannte Funtion $name von $class->{Name}\n" if exists $class->{debug}; } } sub wait { my $class = shift @_; my $timeout = shift @_; my (%stack,$ret); my $queue = Thread::Queue->new(); for my $br ( $class, @_ ) { next if !$class->isa('brick'); $stack{$br->{Stack}} = $br; for my $c ( keys %{$br->{callback}} ) { $br->{brickd}->{callback}->{"$br->{Stack} $c"} = $queue; } } MAIN_LOOP: while(1){ if( $timeout > 0 ) { my %stru :shared; $stru{Time} = Time::HiRes::time + $timeout; $stru{Queue} = $queue; $class->{brickd}->{timing}->{$stru{Time}} = \%stru; } my ($stack_id,$func_id,$p_len,$buff ) = @{$queue->dequeue()}; warn "wait ($stack_id,$func_id,$p_len)\n" if $class->{wait}; last if $timeout > 0 and !$func_id; next if !$func_id; my $func = $stack{$stack_id}->callback($func_id); if( $func and exists $stack{$stack_id}->{callback}->{$func_id}) { my (@unpack,$len,$unpack_mask); if( $stack{$stack_id}->meta($func)->{resp} ) { for my $para ( @{$stack{$stack_id}->meta($func)->{resp}} ) { $unpack_mask .= $typ->{$para}->{mask}; $len += $typ->{$para}->{len}; } if( $len != $p_len-4 ) { warn " $func erwartet $len hat aber ".($p_len-4)."\n" if $stack{$stack_id}->{wait}; } @unpack = unpack $unpack_mask, $buff if $unpack_mask; } $ret = $stack{$stack_id}->{callback}->{$func_id}(@unpack); } else { if ( $stack{$stack_id}->{debug} ) { my (@unpack,$len,$unpack_mask); for my $para ( @{$stack{$stack_id}->meta($func)->{resp}} ) { $unpack_mask .= $typ->{$para}->{mask}; $len += $typ->{$para}->{len}; } @unpack = unpack $unpack_mask, $buff if $unpack_mask; warn "no callback $func_id for stack $stack_id (@unpack)\n" if $class->{wait}; } redo MAIN_LOOP; } last if $timeout != -1; } return $ret; }